Python Flask实现与Amazon RDS和DynamoDB交互的简易示例
Amazon RDS和DynamoDB是两种常用于数据存储的数据库服务。RDS提供关系型数据库服务,而DynamoDB则是一款NoSQL数据库服务。本文将探讨如何在AWS中通过Python脚本搭建一个简单的flask框架对这两种数据库进行简单操作。
1. 安装boto3、flask、pymysql
在开始之前请确保安装了boto3、flask、pymysql库。
pip install boto3
pip install flask
pip install pymysql
2. 配置 AWS 认证信息
在使用 Boto3 之前,需要配置 AWS 认证信息。你可以通过设置环境变量或使用 AWS CLI 配置文件来实现。
1、使用环境变量的方式
export AWS_ACCESS_KEY_ID="your_access_key"
export AWS_SECRET_ACCESS_KEY="your_secret_key"
export AWS_DEFAULT_REGION="your_region"
2、使用 AWS CLI 配置文件
运行以下命令,并按照提示进行配置。
aws configure
(关于 AWS CLI 的安装或更新最新版本,请点击下面的链接访问AWS CLI的官方文档查看详情)
https://docs.aws.amazon.com/zh_cn/cli/latest/userguide/getting-started-install.html
3. 使用Python操作DynamoDB写入与读取的简单示例
1、 向你的DynamoDB表中插入数据
以下是一个简单的向DynamoDB表中插入数据的简单例子
import boto3
dbarn = "" #在此输入你的Dynamodb的资源ARN
client = boto3.client('dynamodb')
response = client.put_item(
TableName=dbarn,
Item={
'name': {
'S': "Steve"
},
'age': {
'S': "28"
}
}
)
2、 向你的DynamoDB表中读取数据
以下是一个按照 name 查询的例子
import boto3
dbarn = "" #在此输入你的Dynamodb的资源ARN
client = boto3.client('dynamodb')
response = client.get_item(
TableName=dbarn,
Key={
'id': {
'S': "Steve",
}
}
)
(使用boto3操作DynamoDB的其他操作与细节,可以访问下面这个 AWS boto3 的官方文档)
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb.html
4、使用Python Flask搭建一个简单的应用程序
以下的 Flask 程序会在网页输出 Hello World。
from flask import Flask
app = Flask(__name__)@app.route("/")
def hello():
return "Hello World!"if __name__ == "__main__":
app.run()
5、整体代码演示
from flask import Flask,request,jsonify
import boto3
import pymysql
#此处填写数据库相关数据
connect = pymysql.Connect(
host='', #输入RDS的host
port=3306, #默认端口
user='', #数据库用户名
passwd='', #密码
db='', #数据库名称
charset='utf8'
)
cursor = connect.cursor()
tablename = "" #在此处填写数据库内表名
sql = "CREATE TABLE {}(id INTEGER PRIMARY KEY,value TEXT)".format(tablename)
try: #检测数据库中是否存在名为tablename的表,不存在则创建,存在就跳过
cursor.execute(sql)
connect.commit()
except:
pass
app = Flask(__name__)
dbarn = "" #在此处填写dynamodb的arn
#获取dynamodb内数据
def dbget(get_id):
client = boto3.client('dynamodb')
response = client.get_item(
TableName=dbarn,
Key={
'id': {
'S': str(get_id),
}
}
)
n = response['Item']['value']['N']
return n
#获取sql内数据
def sqlget(get_id):
sql = "SELECT * FROM {} WHERE id = %s".format(tablename)
data = (f"{get_id}")
cursor.execute(sql % data)
n = cursor.fetchone()[1]
return n
#创建一个跟路由,以供健康测试
@app.route('/', methods=['GET'])
def homo():
return "health",200
#dynamodb存储
@app.route('/post_data1', methods=['POST'])
def db():
task = {
'id': request.json['id'],
'value': request.json['value'],
}
client = boto3.client('dynamodb')
response = client.put_item(
TableName=dbarn,
Item={
'id': {
'S': request.json['id'],
},
'value': {
'N': request.json['value']
}
}
)
return jsonify({'task': task}), 200
#sql存储
@app.route('/post_data2', methods=['POST'])
def rds():
id = request.json['id']
value = request.json['value']
task = {
'id': id,
'value': value,
}
sql = "INSERT INTO {} VALUES(%s,'%s')".format(tablename)
data = (id,value)
cursor.execute(sql % data)
connect.commit()
return jsonify({'task': task}), 200
#获取dynamodb和sql数据总和
@app.route('/get_value', methods=['GET'])
def getall():
get_id = request.args.get("id")
db = dbget(get_id)
rds = dbget(get_id)
aws = {
"message" : str(int(db) + int(rds))
}
return aws, 200
#获取dynamodb数据
@app.route('/get_dynamodb', methods=['GET'])
def getdb():
get_id = request.args.get("id")
db = dbget(get_id)
return db, 200
#获取sql数据
@app.route('/get_aurora', methods=['GET'])
def getdb():
get_id = request.args.get("id")
sql = sqlget(get_id)
return sql, 200
#删除dynamodb和sql两表中的数据(未完成,可以查询AWS boto3的官方文档自行完成)
@app.route('/delete', methods=['GET'])
def delete():
return "delete", 200
#修改host
#修改port端口号
if __name__ == '__main__':
app.run(port=80,host="0.0.0.0")
作者:zi_feng1145