Python Sqlalchemy库应用实录

一、sqlalchemy

 一个开源的python ORM框架,用于数据库操作
ORM(Object-Relational Mapping)对象关系映射,将数据库中的数据映射为Python对象
支持MySQL,Oracle,sqlite 等常用数据库

二、使用背景

使用Python做自动化测试项目,需要操作数据库

  1. 批量插入大量数据
  2. 在测试用例中需要进行增删改查

最开始直接在代码中拼接SQL语句,但是考虑到大量的随机生成、大量的字符串拼接都效率低下且易出错。选择使用sqlalchemy库,对数据库操作进行封装
优势:

  1. 批量数据操作更安全
  2. 将数据库表映射为Python对象,python IDE 可以提供python一样的语法检查和代码提示

示例1:直接在python代码中拼接SQL字符串

示例2:使用sqlalchemy进行数据库操作,类的属性映射为数据库字段

三、下载安装

1. pipy换源(按需操作)

pip config set global.index-url https://mirrors.aliyun.com/pypi/

2.安装,sqlalchemy需要对应数据库的驱动,

pip install sqlalchemy
pip install pymysql   # mysql
pip install oracledb  #  oracle

四、构造表对象

sqlalchemy操作数据库,需要先构造表对象

方法一、手动创建(就是照着表手写)

非常不推荐,繁琐且易出错

方法二、使用sqlacodegen库

1.安装 sqlacodegen
pip install sqlacodegen

2.在终端执行生成语句

MySQL:

sqlacodegen "mysql+pymysql://root:123456@localhost:3306/sky_take_out" --tables user[, table2, table3, ...] --outfile outfile.py

Oracle:

sqlacodegen "oracle+oracledb://username:password@host:port/dbname" --noviews --nojoined --schema schemaName --tables table1[, table2, ...] --outfile  savefile.py

执行完成后,如果没有报错,会生成一个outfile.py文件,里面就是表对象定义

生成结果(output.py)

from typing import Optional

from sqlalchemy import BigInteger, DateTime
from sqlalchemy.dialects.mysql import VARCHAR
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
import datetime


class Base(DeclarativeBase):
    pass


class User(Base):
    __tablename__ = 'user'
    __table_args__ = {'comment': '用户信息'}

    id: Mapped[int] = mapped_column(BigInteger, primary_key=True, comment='主键')
    openid: Mapped[Optional[str]] = mapped_column(VARCHAR(45), comment='微信用户唯一标识')
    name: Mapped[Optional[str]] = mapped_column(VARCHAR(32), comment='姓名')
    phone: Mapped[Optional[str]] = mapped_column(VARCHAR(11), comment='手机号')
    sex: Mapped[Optional[str]] = mapped_column(VARCHAR(2), comment='性别')
    id_number: Mapped[Optional[str]] = mapped_column(VARCHAR(18), comment='身份证号')
    avatar: Mapped[Optional[str]] = mapped_column(VARCHAR(500), comment='头像')
    create_time: Mapped[Optional[datetime.datetime]] = mapped_column(DateTime)

Tips: 每次执行此命令,都会覆盖output.py 文件,建议在一个专门的文件保存所有生成的表对象

五、操作数据库

1. 创建连接

注意,对于较早版本的Oracle,由于不支持 thin 模式连接,需要安装额外的Oracle 客户端文件:
Oracle Instant Client

!!! 重要,OracleInstantClient的32位、64位版本选择与Python版本一致,而不是Windows系统位数版本一致

比如我的Windows是64位,Python是32位,所以选择32位版本

import os.path
import oracledb
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker


def get_connect(con_str, method='mysql'):
    if method == 'oracle_thick':
        driver_path = os.path.join("lib", "driver", "instantclient_win32")
        oracledb.init_oracle_client(lib_dir=driver_path)
        engine = create_engine(con_str)
    elif method in ["oracle_thin", "mysql"]:
        engine = create_engine(con_str)
    else:
        raise Exception("不支持的数据库类型")
    Session = sessionmaker(bind=engine)
    return Session()

注意: 使用字符串进行连接的时候,用户名,密码,host使用:和@进行分割,如果用户名密码中有@等特殊字符,需要使用转义符进行转义

from urllib.parse import quote_plus

con_str = f"mysql+pymysql://{quote_plus('root')}:{quote_plus('123456@')}@localhost:3306/sky_take_out"

2. 查询

  1. 单表查询
# 连接 数据库, 获取session
session = get_connect("mysql+pymysql://root:123456@localhost:3306/sky_take_out")
# select * from user where id = 4;
result = session.query(User).filter(User.id == 4).all()
for user in result:
    print(user.id, user.name, user.phone, user.sex, user.id_number, user.avatar, user.openid, user.create_time)
   >> 4 张三 12345678901 1 None None None None

all() 返回的是结果集,可以使用for循环遍历

first() 返回的是结果集中的第一个元素, 是一个表对象,可以直接访问这个对象的属性

只查询指定字段

r1 = session.query(User.name).filter(User.id == 4).first()
print(r1)  # 返回结果是只包含name字段的元组
   >>('张三',)
  1. 多表查询

    多个表的连接条件作为filter()的入参:

    and_(): 多个条件之间是and关系

    or_(): 多个条件之间是or关系

    between(): 范围条件

    in_(): in条件

    not_in(): not in条件

    like(): 模糊查询

# select b.* from category a, dish b where a.id=b.category_id and a.id='11';
r2 = session.query(Dish).filter(
    and_(Category.id == Dish.category_id, Category.id == 11)
).all()
for item in r2:
    print(item.name, item.description, )

# select a.name, b.name, b.image from category a, dish b where a.id=b.category_id and a.id='11';
r3 = session.query(
    Category.name, Dish.name, Dish.image
).filter(
    and_(Category.id == Dish.category_id, Category.id == 11)
).all()
for item in r3:
    print(item.name, item.name, item.image)
('王老吉', '王老吉', 'https://localhost/1.png')
('北冰洋', '北冰洋', 'https://localhost/2.png')
('雪花啤酒', '雪花啤酒', 'https://localhost/3.png')

输出结果中,前两项字段名都是name,无法区分,解决方法:

  1. 用索引取值:print(item[0], item[1], item[2])
  2. 为字段取别名
r4 = session.query(
    Category.name.label('category_name'),
    Dish.name.label('dish_name'),  # 将 Dish.name 转换为大写并指定别名
    Dish.image
).filter(
    and_(Category.id == Dish.category_id, Category.id == 11)
).all()

# 使用别名访问字段值
for item in r4:
    print(item.category_name, item.dish_name, item.image)
('酒水饮料', '王老吉', 'https://localhost/1.png')
('酒水饮料', '北冰洋', 'https://localhost/2.png')
('酒水饮料', '雪花啤酒', 'https://localhost/3.png')

3. 插入数据

1. 单个插入
user1 = User(name="张三1", phone="12345678901", sex="男", id_number="123456789012345678", avatar="", openid="")
user2_map = {"name": "张三2", "phone": "12345678901", "sex": "男", "id_number": "123456789012345678", "avatar": "",
         "openid": ""}
user2 = User(**user2_map)
session.add(user1)
session.add(user2)
session.commit() # commit方法提交数据
2. 批量插入数据
user_list = [
    {"name": fake.name(), "phone": fake.phone_number(), "sex": fake.random_element(["0", "1"]),},
    {"name": fake.name(), "phone": fake.phone_number(), "sex": fake.random_element(["0", "1"]),},
    {"name": fake.name(), "phone": fake.phone_number(), "sex": fake.random_element(["0", "1"]),},
    {"name": fake.name(), "phone": fake.phone_number(), "sex": fake.random_element(["0", "1"]),},
    {"name": fake.name(), "phone": fake.phone_number(), "sex": fake.random_element(["0", "1"]),},
]
session.bulk_insert_mappings(User, user_list)
session.commit()
r6 = session.query(User).all()
for item in r6:
    print(item.name, item.phone, item.sex)

bulk_insert_mappings() 方法用于批量插入数据,第一个参数是操作表的模型,第二个参数是插入的数据列表

4. 更新数据

r7 = session.query(User).filter(User.id == 7).first()
print(r7.name)
r7.name = "张三2.1"
session.commit()
r8 = session.query(User).filter(User.id == 7).first()
print(r8.name)
张三2
张三2.1

批量更新,使用bulk_update_mappings()方法,参数同bulk_insert_mappings(),不同的是,更新的数据列表中,需要包含主键字段

5. 删除数据

r8 = session.query(User).filter(User.id == 7).delete()
session.commit()
print(r8) # delete() 返回值是删除的行数

作者:MarsAres2

物联沃分享整理
物联沃-IOTWORD物联网 » Python Sqlalchemy库应用实录

发表回复