boto3与IoT的集成与应用指南

安装SDK 
sudo pip3 install AWSIoTPythonSDK

pip install AWSIoTPythonSDK

SELECT * FROM 'my/topic'

分区键 id  ${id}

给start.sh这个⽂件添加权限,并执⾏脚本
运⾏脚本接⼊IoT
chmod +x start.sh
./start.sh

在AWS IoT中的MQTT测试客户端程序上,对 可以对 MQTT 主题进⾏订阅,从⽽收到相应主 题下的
MQTT 消息。这⾥订阅主题输⼊ ”#",即订阅所有主题下的消息
创建⼀个表,分区键名为car_name,类型是字符串,排序件名称为timestamp,类型是数字
创建IoT规则

AWS之服务存储EFS在Amazon EC2上的挂载——针对EC2进行托管文件存储
sudo yum install -y amazon-efs-utils
mkdir efs
sudo mount -t nfs4 -o nfsvers=4.1,rsize=1048576,wsize=1048576,hard,timeo=600,retrans=2,noresvport 文件挂载系统的DNS名称:/ efs
df -h
需要开启2049端口 efs网络也要

代码注意 pip install awsiotsdk   
create_dynamodb_table 函数:

使用 boto3 客户端连接到 DynamoDB。
检查表是否存在,如果不存在则创建新表。
表的主键设置为:
deviceId 作为分区键(HASH 键)。
timestamp 作为排序键(RANGE 键)。
表的读写容量设置为 1(可以根据需要调整)。
create_iot_resources 函数:

在创建 IoT 资源之前,调用 create_dynamodb_table 函数创建 DynamoDB 表(如果不存在)。
配置文件 CONFIG:

dynamodb_table 是 DynamoDB 表的名称。
dynamodb_role_arn 是 IAM 角色的 ARN,用于 IoT 规则引擎将数据写入 DynamoDB。
运行代码前的准备工作
安装依赖:

bash
复制代码
pip install boto3 awscrt
配置 AWS 凭证:

确保你已经配置了 AWS 凭证(AWS_ACCESS_KEY_ID 和 AWS_SECRET_ACCESS_KEY)。
如果没有配置,可以在代码中直接添加:
python
复制代码
import boto3

boto3.Session(
    aws_access_key_id='YOUR_ACCESS_KEY',
    aws_secret_access_key='YOUR_SECRET_KEY',
    region_name='us-east-1'
)
替换配置文件中的占位符:

region:你的 AWS 区域(例如 us-east-1)。
dynamodb_table:DynamoDB 表的名称(例如 SensorData)。
dynamodb_role_arn:IAM 角色的 ARN(例如 arn:aws:iam::692859911256:role/dynamo)。
运行代码
运行代码后,它会自动创建所有必要的 AWS 资源,包括:

IoT 设备。
IoT 证书和密钥。
IoT 安全策略。
IoT 数据转发规则。
DynamoDB 表(如果不存在)。
然后,设备会连接到 MQTT 代理,并每隔 5 秒发送一次模拟数据到 DynamoDB 表中。

注意事项
IAM 角色:

确保 dynamodb_role_arn 对应的 IAM 角色有权限访问 DynamoDB 表。
可以使用 AWS IAM 控制台创建 IAM 角色,并附加以下策略:
json
复制代码
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "dynamodb:PutItem"
            ],
            "Resource": [
                "arn:aws:dynamodb:*:*:table/SensorData"
            ]
        }
    ]
}
** DynamoDB 表设计**:
如果你的数据结构不同,可以修改主键和属性类型(例如将 timestamp 从字符串改为数字类型)。
通过这段代码,你可以完全自动化 IoT 项目中所有资源的创建过程,包括 DynamoDB 表。
每段代码的含义
第一部分:导入依赖库
python
复制
import boto3
import json
from awscrt import io, mqtt
from awsiot import mqtt_connection_builder
boto3:AWS SDK for Python,用于操作AWS服务

json:处理JSON数据格式

awscrt 和 awsiot:AWS IoT设备端连接库,用于建立MQTT连接

第二部分:配置参数
python
复制
CONFIG = {
    "region": "us-east-1",
    "thing_name": "MyTemperatureSensor",
    "policy_name": "DevicePolicy",
    "rule_name": "SensorToDynamoDB",
    "dynamodb_role_arn": "arn:aws:iam::123456789012:role/IoTDynamoDBRole",
    "dynamodb_table": "SensorData",
    "mqtt_topic": "sensor/data"
}
定义全局配置字典,包含:

region:AWS区域

thing_name:IoT设备名称

policy_name:安全策略名称

rule_name:数据转发规则名称

dynamodb_*:DynamoDB相关配置

mqtt_topic:设备发布数据的MQTT主题

第三部分:资源创建函数
python
复制
def create_iot_resources():
    iot = boto3.client('iot', region_name=CONFIG['region'])
    
    # 1. 创建设备
    thing = iot.create_thing(thingName=CONFIG['thing_name'])
    
    # 2. 创建证书和密钥
    cert = iot.create_keys_and_certificate(setAsActive=True)
    with open('device-cert.pem', 'w') as f: …  # 保存证书
    
    # 3. 创建策略
    policy_doc = { "Statement": […] }  # 权限定义
    iot.create_policy(policyName=CONFIG['policy_name'], …)
    
    # 4. 关联策略和证书
    iot.attach_policy(policyName=…, target=cert_arn)
    
    # 5. 关联设备和证书
    iot.attach_thing_principal(thingName=…, principal=…)
    
    # 6. 创建规则引擎
    rule_sql = "SELECT * FROM 'sensor/data'"
    actions = [{ "dynamoDB": … }]  # 转发到DynamoDB
    iot.create_topic_rule(ruleName=…, topicRulePayload=…)
功能:通过AWS API创建完整的IoT资源链

关键操作:

create_thing:注册物联网设备

create_keys_and_certificate:生成X.509证书

create_policy:定义设备权限策略

attach_policy:绑定策略到证书

attach_thing_principal:绑定设备与证书

create_topic_rule:创建SQL规则转发数据到DynamoDB

第四部分:设备连接测试
python
复制
def connect_and_publish():
    # 获取IoT终端节点
    endpoint = iot.describe_endpoint(…)['endpointAddress']
    
    # 配置MQTT连接
    mqtt_conn = mqtt_connection_builder.mtls_from_path(
        endpoint=endpoint,
        cert_filepath="device-cert.pem",
        pri_key_filepath="device-key.pem",
        client_id=CONFIG['thing_name']
    )
    
    # 建立连接
    connect_future = mqtt_conn.connect()
    connect_future.result()  # 阻塞直到连接成功
    
    # 持续发布数据
    while True:
        payload = json.dumps({"temperature": 25.5})
        mqtt_conn.publish(topic=CONFIG['mqtt_topic'], payload=payload)
        time.sleep(5)
功能:验证设备能否连接AWS IoT Core并发送数据

关键点:

mtls_from_path:使用证书建立双向TLS连接

publish:向指定主题发布JSON格式数据

持续每5秒发送一次传感器数据

第五部分:主程序逻辑
python
复制
if __name__ == "__main__":
    resource_info = create_iot_resources()
    print("保留的资源信息:")
    print(json.dumps(resource_info, indent=2))
    
    connect_and_publish()
执行流程:

调用create_iot_resources创建所有资源

打印生成的资源ARN等信息

启动设备连接和数据发送

关键AWS服务交互图示
mermaid
复制
sequenceDiagram
    participant Device
    participant IoT_Core
    participant DynamoDB
    
    Device->>IoT_Core: 1. 创建设备/证书
    IoT_Core->>Device: 返回证书文件
    Device->>IoT_Core: 2. 建立MQTT连接
    Device->>IoT_Core: 3. 发布传感器数据
    IoT_Core->>DynamoDB: 4. 根据规则存储数据
安全注意事项
证书文件保护:

python
复制
# Linux/Mac自动设置权限
import os
os.chmod('device-key.pem', 0o400)
密钥文件必须设为只读

建议将证书存储在安全位置

策略最小权限原则:

json
复制
"Action": ["iot:Connect", "iot:Publish"]
仅授予设备必要权限

避免使用iot:*通配符

扩展建议
添加HTTPS支持:

python
复制
from awsiot import mqtt_connection_builder
mqtt_conn.enable_metrics_collection()  # 启用加密指标收集
增强错误处理:

python
复制
try:
    response = iot.create_thing(…)
except iot.exceptions.ResourceAlreadyExistsException:
    print("设备已存在,跳过创建")
添加云监控:

python
复制
cloudwatch = boto3.client('cloudwatch')
cloudwatch.put_metric_alarm(…)  # 设置设备离线报警
通过以上代码解析,可以完整理解从设备注册到数据上云的全流程实现。每个模块都承担明确的职责,共同组成可运行的物联网解决方案。

import boto3
import json
import time
from awscrt import io, mqtt
from awsiot import mqtt_connection_builder

# 配置参数(根据实际情况修改)
CONFIG = {
    "region": "us-east-1",
    "thing_name": "MyTemperatureSensor",
    "policy_name": "DevicePolicy",
    "rule_name": "SensorToDynamoDB",
    "dynamodb_role_arn": "arn:aws:iam::692859911256:role/dynamo",
    "dynamodb_table": "SensorData",  # DynamoDB 表名称
    "mqtt_topic": "sensor/data"
}

# 创建 DynamoDB 表
def create_dynamodb_table(region, table_name):
    # 初始化 DynamoDB 客户端
    dynamodb = boto3.client('dynamodb', region_name=region)
    
    # 检查表是否存在
    try:
        response = dynamodb.describe_table(TableName=table_name)
        print(f"表 {table_name} 已存在。")
        return True
    except dynamodb.exceptions.ResourceNotFoundException:
        print(f"表 {table_name} 不存在,正在创建…")
        # 创建表
        dynamodb.create_table(
            TableName=table_name,
            KeySchema=[
                {
                    "AttributeName": "deviceId",
                    "KeyType": "HASH"  # 分区键(hash key)
                },
                {
                    "AttributeName": "timestamp",
                    "KeyType": "RANGE"  # 排序键(range key)
                }
            ],
            AttributeDefinitions=[
                {
                    "AttributeName": "deviceId",
                    "AttributeType": "S"  # 字符串类型
                },
                {
                    "AttributeName": "timestamp",
                    "AttributeType": "S"  # 字符串类型(或 "N" 表示数字类型)
                }
            ],
            ProvisionedThroughput={
                "ReadCapacityUnits": 1,
                "WriteCapacityUnits": 1
            }
        )
        # 等待表创建完成
        while True:
            response = dynamodb.describe_table(TableName=table_name)
            if response['Table']['TableStatus'] == 'ACTIVE':
                print(f"表 {table_name} 创建完成。")
                return True
            time.sleep(5)
        return False

def create_iot_resources():
    """创建所有 IoT 资源但不清理"""
    iot = boto3.client('iot', region_name=CONFIG['region'])

    # 1. 创建设备
    print("正在创建设备…")
    thing = iot.create_thing(thingName=CONFIG['thing_name'])

    # 2. 创建证书和密钥
    print("生成证书…")
    cert = iot.create_keys_and_certificate(setAsActive=True)
    with open('device-cert.pem', 'w') as f:
        f.write(cert['certificatePem'])
    with open('device-key.pem', 'w') as f:
        f.write(cert['keyPair']['PrivateKey'])

    # 3. 创建策略
    print("创建安全策略…")
    policy_doc = {
        "Version": "2012-10-17",
        "Statement": [{
            "Effect": "Allow",
            "Action": ["iot:Connect", "iot:Publish"],
            "Resource": "*"
        }]
    }
    iot.create_policy(
        policyName=CONFIG['policy_name'],
        policyDocument=json.dumps(policy_doc)
    )
    # 4. 关联策略和证书
    iot.attach_policy(
        policyName=CONFIG['policy_name'],
        target=cert['certificateArn'])

    # 5. 关联设备和证书
    iot.attach_thing_principal(
        thingName=CONFIG['thing_name'],
        principal=cert['certificateArn'])

    # 6. 创建规则引擎
    print("创建数据转发规则…")
    rule_sql = f"SELECT * FROM '{CONFIG['mqtt_topic']}'"
    actions = [{
        "dynamoDB": {
            "tableName": CONFIG['dynamodb_table'],
            "roleArn": CONFIG['dynamodb_role_arn'],
            "hashKeyField": "deviceId",
            "hashKeyValue": "clientid()",
            "rangeKeyField": "timestamp",
            "rangeKeyValue": "timestamp()"
        }
    }]
    iot.create_topic_rule(
        ruleName=CONFIG['rule_name'],
        topicRulePayload={
            "sql": rule_sql,
            "actions": actions,
            "ruleDisabled": False
        }
    )

    # 创建 DynamoDB 表(如果不存在)
    create_dynamodb_table(CONFIG['region'], CONFIG['dynamodb_table'])

    print("资源创建完成!永久保留中…")
    return {
        "thing_name": CONFIG['thing_name'],
        "cert_arn": cert['certificateArn'],
        "cert_id": cert['certificateId'],
        "policy_name": CONFIG['policy_name'],
        "rule_name": CONFIG['rule_name']
    }

def connect_and_publish():
    """设备连接测试"""
    iot = boto3.client('iot', region_name=CONFIG['region'])
    endpoint = iot.describe_endpoint(endpointType='iot:Data-ATS')['endpointAddress']

    # 配置MQTT连接
    event_loop_group = io.EventLoopGroup(1)
    mqtt_conn = mqtt_connection_builder.mtls_from_path(
        endpoint=endpoint,
        cert_filepath="device-cert.pem",
        pri_key_filepath="device-key.pem",
        client_id=CONFIG['thing_name'],
        event_loop_group=event_loop_group
    )

    print("正在连接MQTT服务器…")
    connect_future = mqtt_conn.connect()
    connect_future.result()

    # 持续发布数据(按 Ctrl+C 停止)
    try:
        while True:
            payload = json.dumps({
                "temperature": 25.5,
                "humidity": 60,
                "status": "normal"
            })
            mqtt_conn.publish(
                topic=CONFIG['mqtt_topic'],
                payload=payload,
                qos=mqtt.QoS.AT_LEAST_ONCE
            )
            print(f"已发送: {payload}")
            time.sleep(5)
    except KeyboardInterrupt:
        mqtt_conn.disconnect()
        print("\n已停止发送")

if __name__ == "__main__":
    resource_info = create_iot_resources()
    print("保留的资源信息(请妥善保存):")
    print(json.dumps(resource_info, indent=2))

    connect_and_publish()

 

作者:aaawsaaa

物联沃分享整理
物联沃-IOTWORD物联网 » boto3与IoT的集成与应用指南

发表回复