Python Flask实现与Amazon RDS和DynamoDB交互的简易示例

Amazon RDS和DynamoDB是两种常用于数据存储的数据库服务。RDS提供关系型数据库服务,而DynamoDB则是一款NoSQL数据库服务。本文将探讨如何在AWS中通过Python脚本搭建一个简单的flask框架对这两种数据库进行简单操作。

1. 安装boto3、flask、pymysql

在开始之前请确保安装了boto3、flask、pymysql库。

pip install boto3

pip install flask

pip install pymysql

2. 配置 AWS 认证信息

在使用 Boto3 之前,需要配置 AWS 认证信息。你可以通过设置环境变量或使用 AWS CLI 配置文件来实现。

1、使用环境变量的方式

export AWS_ACCESS_KEY_ID="your_access_key"

export AWS_SECRET_ACCESS_KEY="your_secret_key"

export AWS_DEFAULT_REGION="your_region"

 2、使用 AWS CLI 配置文件

运行以下命令,并按照提示进行配置。

aws configure

 (关于 AWS CLI 的安装或更新最新版本,请点击下面的链接访问AWS CLI的官方文档查看详情)

 https://docs.aws.amazon.com/zh_cn/cli/latest/userguide/getting-started-install.html

3. 使用Python操作DynamoDB写入与读取的简单示例

1、 向你的DynamoDB表中插入数据

以下是一个简单的向DynamoDB表中插入数据的简单例子

import boto3

dbarn = "" #在此输入你的Dynamodb的资源ARN

client = boto3.client('dynamodb')

    response = client.put_item(

        TableName=dbarn,

        Item={

            'name': {

                'S': "Steve"

            },

            'age': {

                'S': "28"

            }

        }

    )

 2、 向你的DynamoDB表中读取数据

以下是一个按照 name 查询的例子

import boto3

dbarn = "" #在此输入你的Dynamodb的资源ARN

client = boto3.client('dynamodb')

    response = client.get_item(

        TableName=dbarn,

        Key={

            'id': {

                'S': "Steve",

            }

        }

    )

(使用boto3操作DynamoDB的其他操作与细节,可以访问下面这个 AWS boto3 的官方文档)

https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb.html

 4、使用Python Flask搭建一个简单的应用程序

以下的 Flask 程序会在网页输出 Hello World。

from flask import Flask
app = Flask(__name__)

@app.route("/")
def hello():
    return "Hello World!"

if __name__ == "__main__":
    app.run()

5、整体代码演示

from flask import Flask,request,jsonify

import boto3

import pymysql

#此处填写数据库相关数据

connect = pymysql.Connect(

    host='', #输入RDS的host

    port=3306, #默认端口

    user='', #数据库用户名

    passwd='', #密码

    db='', #数据库名称

    charset='utf8'

)

cursor = connect.cursor()

tablename = ""  #在此处填写数据库内表名

sql = "CREATE TABLE {}(id INTEGER PRIMARY KEY,value TEXT)".format(tablename)

try:    #检测数据库中是否存在名为tablename的表,不存在则创建,存在就跳过

    cursor.execute(sql)

    connect.commit()

except:

    pass

app = Flask(__name__)

dbarn = ""  #在此处填写dynamodb的arn

#获取dynamodb内数据

def dbget(get_id):

    client = boto3.client('dynamodb')

    response = client.get_item(

        TableName=dbarn,

        Key={

            'id': {

                'S': str(get_id),

            }

        }

    )

    n = response['Item']['value']['N']

    return n

#获取sql内数据

def sqlget(get_id):

    sql = "SELECT * FROM {} WHERE id = %s".format(tablename)

    data = (f"{get_id}")

    cursor.execute(sql % data)

    n = cursor.fetchone()[1]

    return n

#创建一个跟路由,以供健康测试

@app.route('/', methods=['GET'])

def homo():

    return "health",200

#dynamodb存储

@app.route('/post_data1', methods=['POST'])

def db():

    task = {

        'id': request.json['id'],

        'value': request.json['value'],

    }

    client = boto3.client('dynamodb')

    response = client.put_item(

        TableName=dbarn,

        Item={

            'id': {

                'S': request.json['id'],

            },

            'value': {

                'N': request.json['value']

            }

        }

    )

    return jsonify({'task': task}), 200

#sql存储

@app.route('/post_data2', methods=['POST'])

def rds():

    id = request.json['id']

    value = request.json['value']

    task = {

        'id': id,

        'value': value,

    }

    sql = "INSERT INTO {} VALUES(%s,'%s')".format(tablename)

    data = (id,value)

    cursor.execute(sql % data)

    connect.commit()

    return jsonify({'task': task}), 200

 

#获取dynamodb和sql数据总和

@app.route('/get_value', methods=['GET'])

def getall():

    get_id = request.args.get("id")

    db = dbget(get_id)

    rds = dbget(get_id)

    aws = {

        "message" : str(int(db) + int(rds))

    }

    return aws, 200

   

#获取dynamodb数据

@app.route('/get_dynamodb', methods=['GET'])

def getdb():

    get_id = request.args.get("id")

    db = dbget(get_id)

    return db, 200

#获取sql数据

@app.route('/get_aurora', methods=['GET'])

def getdb():

    get_id = request.args.get("id")

    sql = sqlget(get_id)

    return sql, 200

#删除dynamodb和sql两表中的数据(未完成,可以查询AWS boto3的官方文档自行完成)

@app.route('/delete', methods=['GET'])

def delete():

    return "delete", 200

#修改host

#修改port端口号

if __name__ == '__main__':

    app.run(port=80,host="0.0.0.0")

作者:zi_feng1145

物联沃分享整理
物联沃-IOTWORD物联网 » Python Flask实现与Amazon RDS和DynamoDB交互的简易示例

发表回复