Python Sqlalchemy库应用实录
一、sqlalchemy
一个开源的python ORM框架,用于数据库操作
ORM(Object-Relational Mapping)对象关系映射,将数据库中的数据映射为Python对象
支持MySQL,Oracle,sqlite 等常用数据库
二、使用背景
使用Python做自动化测试项目,需要操作数据库
- 批量插入大量数据
- 在测试用例中需要进行增删改查
最开始直接在代码中拼接SQL语句,但是考虑到大量的随机生成、大量的字符串拼接都效率低下且易出错。选择使用sqlalchemy库,对数据库操作进行封装
优势:
- 批量数据操作更安全
- 将数据库表映射为Python对象,python IDE 可以提供python一样的语法检查和代码提示
示例1:直接在python代码中拼接SQL字符串

示例2:使用sqlalchemy进行数据库操作,类的属性映射为数据库字段

三、下载安装
1. pipy换源(按需操作)
pip config set global.index-url https://mirrors.aliyun.com/pypi/
2.安装,sqlalchemy需要对应数据库的驱动,
pip install sqlalchemy
pip install pymysql # mysql
pip install oracledb # oracle
四、构造表对象
sqlalchemy操作数据库,需要先构造表对象
方法一、手动创建(就是照着表手写)
非常不推荐,繁琐且易出错
方法二、使用sqlacodegen库
1.安装 sqlacodegen
pip install sqlacodegen
2.在终端执行生成语句
MySQL:
sqlacodegen "mysql+pymysql://root:123456@localhost:3306/sky_take_out" --tables user[, table2, table3, ...] --outfile outfile.py
Oracle:
sqlacodegen "oracle+oracledb://username:password@host:port/dbname" --noviews --nojoined --schema schemaName --tables table1[, table2, ...] --outfile savefile.py
执行完成后,如果没有报错,会生成一个outfile.py文件,里面就是表对象定义
生成结果(output.py)
from typing import Optional
from sqlalchemy import BigInteger, DateTime
from sqlalchemy.dialects.mysql import VARCHAR
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
import datetime
class Base(DeclarativeBase):
pass
class User(Base):
__tablename__ = 'user'
__table_args__ = {'comment': '用户信息'}
id: Mapped[int] = mapped_column(BigInteger, primary_key=True, comment='主键')
openid: Mapped[Optional[str]] = mapped_column(VARCHAR(45), comment='微信用户唯一标识')
name: Mapped[Optional[str]] = mapped_column(VARCHAR(32), comment='姓名')
phone: Mapped[Optional[str]] = mapped_column(VARCHAR(11), comment='手机号')
sex: Mapped[Optional[str]] = mapped_column(VARCHAR(2), comment='性别')
id_number: Mapped[Optional[str]] = mapped_column(VARCHAR(18), comment='身份证号')
avatar: Mapped[Optional[str]] = mapped_column(VARCHAR(500), comment='头像')
create_time: Mapped[Optional[datetime.datetime]] = mapped_column(DateTime)
Tips: 每次执行此命令,都会覆盖output.py 文件,建议在一个专门的文件保存所有生成的表对象
五、操作数据库
1. 创建连接
注意,对于较早版本的Oracle,由于不支持 thin 模式连接,需要安装额外的Oracle 客户端文件:
Oracle Instant Client
!!! 重要,OracleInstantClient的32位、64位版本选择与Python版本一致,而不是Windows系统位数版本一致
比如我的Windows是64位,Python是32位,所以选择32位版本

import os.path
import oracledb
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
def get_connect(con_str, method='mysql'):
if method == 'oracle_thick':
driver_path = os.path.join("lib", "driver", "instantclient_win32")
oracledb.init_oracle_client(lib_dir=driver_path)
engine = create_engine(con_str)
elif method in ["oracle_thin", "mysql"]:
engine = create_engine(con_str)
else:
raise Exception("不支持的数据库类型")
Session = sessionmaker(bind=engine)
return Session()
注意: 使用字符串进行连接的时候,用户名,密码,host使用:和@进行分割,如果用户名密码中有@等特殊字符,需要使用转义符进行转义
from urllib.parse import quote_plus
con_str = f"mysql+pymysql://{quote_plus('root')}:{quote_plus('123456@')}@localhost:3306/sky_take_out"
2. 查询
- 单表查询
# 连接 数据库, 获取session
session = get_connect("mysql+pymysql://root:123456@localhost:3306/sky_take_out")
# select * from user where id = 4;
result = session.query(User).filter(User.id == 4).all()
for user in result:
print(user.id, user.name, user.phone, user.sex, user.id_number, user.avatar, user.openid, user.create_time)
>> 4 张三 12345678901 1 None None None None
all() 返回的是结果集,可以使用for循环遍历
first() 返回的是结果集中的第一个元素, 是一个表对象,可以直接访问这个对象的属性
只查询指定字段
r1 = session.query(User.name).filter(User.id == 4).first()
print(r1) # 返回结果是只包含name字段的元组
>>('张三',)
- 多表查询
多个表的连接条件作为filter()的入参:
and_(): 多个条件之间是and关系
or_(): 多个条件之间是or关系
between(): 范围条件
in_(): in条件
not_in(): not in条件
like(): 模糊查询
# select b.* from category a, dish b where a.id=b.category_id and a.id='11';
r2 = session.query(Dish).filter(
and_(Category.id == Dish.category_id, Category.id == 11)
).all()
for item in r2:
print(item.name, item.description, )
# select a.name, b.name, b.image from category a, dish b where a.id=b.category_id and a.id='11';
r3 = session.query(
Category.name, Dish.name, Dish.image
).filter(
and_(Category.id == Dish.category_id, Category.id == 11)
).all()
for item in r3:
print(item.name, item.name, item.image)
('王老吉', '王老吉', 'https://localhost/1.png')
('北冰洋', '北冰洋', 'https://localhost/2.png')
('雪花啤酒', '雪花啤酒', 'https://localhost/3.png')
输出结果中,前两项字段名都是name,无法区分,解决方法:
- 用索引取值:print(item[0], item[1], item[2])
- 为字段取别名
r4 = session.query(
Category.name.label('category_name'),
Dish.name.label('dish_name'), # 将 Dish.name 转换为大写并指定别名
Dish.image
).filter(
and_(Category.id == Dish.category_id, Category.id == 11)
).all()
# 使用别名访问字段值
for item in r4:
print(item.category_name, item.dish_name, item.image)
('酒水饮料', '王老吉', 'https://localhost/1.png')
('酒水饮料', '北冰洋', 'https://localhost/2.png')
('酒水饮料', '雪花啤酒', 'https://localhost/3.png')
3. 插入数据
1. 单个插入
user1 = User(name="张三1", phone="12345678901", sex="男", id_number="123456789012345678", avatar="", openid="")
user2_map = {"name": "张三2", "phone": "12345678901", "sex": "男", "id_number": "123456789012345678", "avatar": "",
"openid": ""}
user2 = User(**user2_map)
session.add(user1)
session.add(user2)
session.commit() # commit方法提交数据
2. 批量插入数据
user_list = [
{"name": fake.name(), "phone": fake.phone_number(), "sex": fake.random_element(["0", "1"]),},
{"name": fake.name(), "phone": fake.phone_number(), "sex": fake.random_element(["0", "1"]),},
{"name": fake.name(), "phone": fake.phone_number(), "sex": fake.random_element(["0", "1"]),},
{"name": fake.name(), "phone": fake.phone_number(), "sex": fake.random_element(["0", "1"]),},
{"name": fake.name(), "phone": fake.phone_number(), "sex": fake.random_element(["0", "1"]),},
]
session.bulk_insert_mappings(User, user_list)
session.commit()
r6 = session.query(User).all()
for item in r6:
print(item.name, item.phone, item.sex)
bulk_insert_mappings() 方法用于批量插入数据,第一个参数是操作表的模型,第二个参数是插入的数据列表
4. 更新数据
r7 = session.query(User).filter(User.id == 7).first()
print(r7.name)
r7.name = "张三2.1"
session.commit()
r8 = session.query(User).filter(User.id == 7).first()
print(r8.name)
张三2
张三2.1
批量更新,使用bulk_update_mappings()方法,参数同bulk_insert_mappings(),不同的是,更新的数据列表中,需要包含主键字段
5. 删除数据
r8 = session.query(User).filter(User.id == 7).delete()
session.commit()
print(r8) # delete() 返回值是删除的行数
作者:MarsAres2