boto3与IoT的集成与应用指南
安装SDK
sudo pip3 install AWSIoTPythonSDK
pip install AWSIoTPythonSDK
SELECT * FROM 'my/topic'
分区键 id ${id}
给start.sh这个⽂件添加权限,并执⾏脚本
运⾏脚本接⼊IoT
chmod +x start.sh
./start.sh
在AWS IoT中的MQTT测试客户端程序上,对 可以对 MQTT 主题进⾏订阅,从⽽收到相应主 题下的
MQTT 消息。这⾥订阅主题输⼊ ”#",即订阅所有主题下的消息
创建⼀个表,分区键名为car_name,类型是字符串,排序件名称为timestamp,类型是数字
创建IoT规则
AWS之服务存储EFS在Amazon EC2上的挂载——针对EC2进行托管文件存储
sudo yum install -y amazon-efs-utils
mkdir efs
sudo mount -t nfs4 -o nfsvers=4.1,rsize=1048576,wsize=1048576,hard,timeo=600,retrans=2,noresvport 文件挂载系统的DNS名称:/ efs
df -h
需要开启2049端口 efs网络也要
代码注意 pip install awsiotsdk
create_dynamodb_table 函数:
使用 boto3 客户端连接到 DynamoDB。
检查表是否存在,如果不存在则创建新表。
表的主键设置为:
deviceId 作为分区键(HASH 键)。
timestamp 作为排序键(RANGE 键)。
表的读写容量设置为 1(可以根据需要调整)。
create_iot_resources 函数:
在创建 IoT 资源之前,调用 create_dynamodb_table 函数创建 DynamoDB 表(如果不存在)。
配置文件 CONFIG:
dynamodb_table 是 DynamoDB 表的名称。
dynamodb_role_arn 是 IAM 角色的 ARN,用于 IoT 规则引擎将数据写入 DynamoDB。
运行代码前的准备工作
安装依赖:
bash
复制代码
pip install boto3 awscrt
配置 AWS 凭证:
确保你已经配置了 AWS 凭证(AWS_ACCESS_KEY_ID 和 AWS_SECRET_ACCESS_KEY)。
如果没有配置,可以在代码中直接添加:
python
复制代码
import boto3
boto3.Session(
aws_access_key_id='YOUR_ACCESS_KEY',
aws_secret_access_key='YOUR_SECRET_KEY',
region_name='us-east-1'
)
替换配置文件中的占位符:
region:你的 AWS 区域(例如 us-east-1)。
dynamodb_table:DynamoDB 表的名称(例如 SensorData)。
dynamodb_role_arn:IAM 角色的 ARN(例如 arn:aws:iam::692859911256:role/dynamo)。
运行代码
运行代码后,它会自动创建所有必要的 AWS 资源,包括:
IoT 设备。
IoT 证书和密钥。
IoT 安全策略。
IoT 数据转发规则。
DynamoDB 表(如果不存在)。
然后,设备会连接到 MQTT 代理,并每隔 5 秒发送一次模拟数据到 DynamoDB 表中。
注意事项
IAM 角色:
确保 dynamodb_role_arn 对应的 IAM 角色有权限访问 DynamoDB 表。
可以使用 AWS IAM 控制台创建 IAM 角色,并附加以下策略:
json
复制代码
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"dynamodb:PutItem"
],
"Resource": [
"arn:aws:dynamodb:*:*:table/SensorData"
]
}
]
}
** DynamoDB 表设计**:
如果你的数据结构不同,可以修改主键和属性类型(例如将 timestamp 从字符串改为数字类型)。
通过这段代码,你可以完全自动化 IoT 项目中所有资源的创建过程,包括 DynamoDB 表。
每段代码的含义
第一部分:导入依赖库
python
复制
import boto3
import json
from awscrt import io, mqtt
from awsiot import mqtt_connection_builder
boto3:AWS SDK for Python,用于操作AWS服务
json:处理JSON数据格式
awscrt 和 awsiot:AWS IoT设备端连接库,用于建立MQTT连接
第二部分:配置参数
python
复制
CONFIG = {
"region": "us-east-1",
"thing_name": "MyTemperatureSensor",
"policy_name": "DevicePolicy",
"rule_name": "SensorToDynamoDB",
"dynamodb_role_arn": "arn:aws:iam::123456789012:role/IoTDynamoDBRole",
"dynamodb_table": "SensorData",
"mqtt_topic": "sensor/data"
}
定义全局配置字典,包含:
region:AWS区域
thing_name:IoT设备名称
policy_name:安全策略名称
rule_name:数据转发规则名称
dynamodb_*:DynamoDB相关配置
mqtt_topic:设备发布数据的MQTT主题
第三部分:资源创建函数
python
复制
def create_iot_resources():
iot = boto3.client('iot', region_name=CONFIG['region'])
# 1. 创建设备
thing = iot.create_thing(thingName=CONFIG['thing_name'])
# 2. 创建证书和密钥
cert = iot.create_keys_and_certificate(setAsActive=True)
with open('device-cert.pem', 'w') as f: … # 保存证书
# 3. 创建策略
policy_doc = { "Statement": […] } # 权限定义
iot.create_policy(policyName=CONFIG['policy_name'], …)
# 4. 关联策略和证书
iot.attach_policy(policyName=…, target=cert_arn)
# 5. 关联设备和证书
iot.attach_thing_principal(thingName=…, principal=…)
# 6. 创建规则引擎
rule_sql = "SELECT * FROM 'sensor/data'"
actions = [{ "dynamoDB": … }] # 转发到DynamoDB
iot.create_topic_rule(ruleName=…, topicRulePayload=…)
功能:通过AWS API创建完整的IoT资源链
关键操作:
create_thing:注册物联网设备
create_keys_and_certificate:生成X.509证书
create_policy:定义设备权限策略
attach_policy:绑定策略到证书
attach_thing_principal:绑定设备与证书
create_topic_rule:创建SQL规则转发数据到DynamoDB
第四部分:设备连接测试
python
复制
def connect_and_publish():
# 获取IoT终端节点
endpoint = iot.describe_endpoint(…)['endpointAddress']
# 配置MQTT连接
mqtt_conn = mqtt_connection_builder.mtls_from_path(
endpoint=endpoint,
cert_filepath="device-cert.pem",
pri_key_filepath="device-key.pem",
client_id=CONFIG['thing_name']
)
# 建立连接
connect_future = mqtt_conn.connect()
connect_future.result() # 阻塞直到连接成功
# 持续发布数据
while True:
payload = json.dumps({"temperature": 25.5})
mqtt_conn.publish(topic=CONFIG['mqtt_topic'], payload=payload)
time.sleep(5)
功能:验证设备能否连接AWS IoT Core并发送数据
关键点:
mtls_from_path:使用证书建立双向TLS连接
publish:向指定主题发布JSON格式数据
持续每5秒发送一次传感器数据
第五部分:主程序逻辑
python
复制
if __name__ == "__main__":
resource_info = create_iot_resources()
print("保留的资源信息:")
print(json.dumps(resource_info, indent=2))
connect_and_publish()
执行流程:
调用create_iot_resources创建所有资源
打印生成的资源ARN等信息
启动设备连接和数据发送
关键AWS服务交互图示
mermaid
复制
sequenceDiagram
participant Device
participant IoT_Core
participant DynamoDB
Device->>IoT_Core: 1. 创建设备/证书
IoT_Core->>Device: 返回证书文件
Device->>IoT_Core: 2. 建立MQTT连接
Device->>IoT_Core: 3. 发布传感器数据
IoT_Core->>DynamoDB: 4. 根据规则存储数据
安全注意事项
证书文件保护:
python
复制
# Linux/Mac自动设置权限
import os
os.chmod('device-key.pem', 0o400)
密钥文件必须设为只读
建议将证书存储在安全位置
策略最小权限原则:
json
复制
"Action": ["iot:Connect", "iot:Publish"]
仅授予设备必要权限
避免使用iot:*通配符
扩展建议
添加HTTPS支持:
python
复制
from awsiot import mqtt_connection_builder
mqtt_conn.enable_metrics_collection() # 启用加密指标收集
增强错误处理:
python
复制
try:
response = iot.create_thing(…)
except iot.exceptions.ResourceAlreadyExistsException:
print("设备已存在,跳过创建")
添加云监控:
python
复制
cloudwatch = boto3.client('cloudwatch')
cloudwatch.put_metric_alarm(…) # 设置设备离线报警
通过以上代码解析,可以完整理解从设备注册到数据上云的全流程实现。每个模块都承担明确的职责,共同组成可运行的物联网解决方案。
import boto3
import json
import time
from awscrt import io, mqtt
from awsiot import mqtt_connection_builder
# 配置参数(根据实际情况修改)
CONFIG = {
"region": "us-east-1",
"thing_name": "MyTemperatureSensor",
"policy_name": "DevicePolicy",
"rule_name": "SensorToDynamoDB",
"dynamodb_role_arn": "arn:aws:iam::692859911256:role/dynamo",
"dynamodb_table": "SensorData", # DynamoDB 表名称
"mqtt_topic": "sensor/data"
}
# 创建 DynamoDB 表
def create_dynamodb_table(region, table_name):
# 初始化 DynamoDB 客户端
dynamodb = boto3.client('dynamodb', region_name=region)
# 检查表是否存在
try:
response = dynamodb.describe_table(TableName=table_name)
print(f"表 {table_name} 已存在。")
return True
except dynamodb.exceptions.ResourceNotFoundException:
print(f"表 {table_name} 不存在,正在创建…")
# 创建表
dynamodb.create_table(
TableName=table_name,
KeySchema=[
{
"AttributeName": "deviceId",
"KeyType": "HASH" # 分区键(hash key)
},
{
"AttributeName": "timestamp",
"KeyType": "RANGE" # 排序键(range key)
}
],
AttributeDefinitions=[
{
"AttributeName": "deviceId",
"AttributeType": "S" # 字符串类型
},
{
"AttributeName": "timestamp",
"AttributeType": "S" # 字符串类型(或 "N" 表示数字类型)
}
],
ProvisionedThroughput={
"ReadCapacityUnits": 1,
"WriteCapacityUnits": 1
}
)
# 等待表创建完成
while True:
response = dynamodb.describe_table(TableName=table_name)
if response['Table']['TableStatus'] == 'ACTIVE':
print(f"表 {table_name} 创建完成。")
return True
time.sleep(5)
return False
def create_iot_resources():
"""创建所有 IoT 资源但不清理"""
iot = boto3.client('iot', region_name=CONFIG['region'])
# 1. 创建设备
print("正在创建设备…")
thing = iot.create_thing(thingName=CONFIG['thing_name'])
# 2. 创建证书和密钥
print("生成证书…")
cert = iot.create_keys_and_certificate(setAsActive=True)
with open('device-cert.pem', 'w') as f:
f.write(cert['certificatePem'])
with open('device-key.pem', 'w') as f:
f.write(cert['keyPair']['PrivateKey'])
# 3. 创建策略
print("创建安全策略…")
policy_doc = {
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Action": ["iot:Connect", "iot:Publish"],
"Resource": "*"
}]
}
iot.create_policy(
policyName=CONFIG['policy_name'],
policyDocument=json.dumps(policy_doc)
)
# 4. 关联策略和证书
iot.attach_policy(
policyName=CONFIG['policy_name'],
target=cert['certificateArn'])
# 5. 关联设备和证书
iot.attach_thing_principal(
thingName=CONFIG['thing_name'],
principal=cert['certificateArn'])
# 6. 创建规则引擎
print("创建数据转发规则…")
rule_sql = f"SELECT * FROM '{CONFIG['mqtt_topic']}'"
actions = [{
"dynamoDB": {
"tableName": CONFIG['dynamodb_table'],
"roleArn": CONFIG['dynamodb_role_arn'],
"hashKeyField": "deviceId",
"hashKeyValue": "clientid()",
"rangeKeyField": "timestamp",
"rangeKeyValue": "timestamp()"
}
}]
iot.create_topic_rule(
ruleName=CONFIG['rule_name'],
topicRulePayload={
"sql": rule_sql,
"actions": actions,
"ruleDisabled": False
}
)
# 创建 DynamoDB 表(如果不存在)
create_dynamodb_table(CONFIG['region'], CONFIG['dynamodb_table'])
print("资源创建完成!永久保留中…")
return {
"thing_name": CONFIG['thing_name'],
"cert_arn": cert['certificateArn'],
"cert_id": cert['certificateId'],
"policy_name": CONFIG['policy_name'],
"rule_name": CONFIG['rule_name']
}
def connect_and_publish():
"""设备连接测试"""
iot = boto3.client('iot', region_name=CONFIG['region'])
endpoint = iot.describe_endpoint(endpointType='iot:Data-ATS')['endpointAddress']
# 配置MQTT连接
event_loop_group = io.EventLoopGroup(1)
mqtt_conn = mqtt_connection_builder.mtls_from_path(
endpoint=endpoint,
cert_filepath="device-cert.pem",
pri_key_filepath="device-key.pem",
client_id=CONFIG['thing_name'],
event_loop_group=event_loop_group
)
print("正在连接MQTT服务器…")
connect_future = mqtt_conn.connect()
connect_future.result()
# 持续发布数据(按 Ctrl+C 停止)
try:
while True:
payload = json.dumps({
"temperature": 25.5,
"humidity": 60,
"status": "normal"
})
mqtt_conn.publish(
topic=CONFIG['mqtt_topic'],
payload=payload,
qos=mqtt.QoS.AT_LEAST_ONCE
)
print(f"已发送: {payload}")
time.sleep(5)
except KeyboardInterrupt:
mqtt_conn.disconnect()
print("\n已停止发送")
if __name__ == "__main__":
resource_info = create_iot_resources()
print("保留的资源信息(请妥善保存):")
print(json.dumps(resource_info, indent=2))
connect_and_publish()
作者:aaawsaaa