【Python】SQLAlchemy库:强大的数据库操作利器,灵活处理数据任务
SQLAlchemy 是一个功能强大且灵活的 Python 库,用于数据库操作。它提供了对象关系映射(ORM)和核心 SQL 表达式语言(Core),支持与多种数据库(如 SQLite、PostgreSQL、MySQL、Oracle)交互。SQLAlchemy 兼顾高层次的 ORM 抽象和低层次的 SQL 控制,广泛应用于 Web 开发、数据分析和企业应用。
以下是对 SQLAlchemy 库的详细介绍,包括其功能、用法和实际应用,基于最新信息(截至 2025)。
1. SQLAlchemy 库的作用
2. 安装与环境要求
greenlet:异步支持(部分数据库)。typing-extensions(类型注解)。psycopg2 或 asyncpg(异步)。mysql-connector-python 或 pymysql.oracledb.pip install sqlalchemy
pip install sqlalchemy[asyncio] asyncpg
pip install sqlalchemy pymysql
import sqlalchemy
print(sqlalchemy.__version__) # 示例输出: 2.0.36
3. 核心功能与用法
SQLAlchemy 提供两层 API:ORM(高层次,基于对象)和 Core(低层次,基于 SQL)。以下是主要功能和示例。
3.1 ORM 基本使用
定义模型类并操作数据库。
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.orm import declarative_base, sessionmaker
# 创建数据库连接
engine = create_engine("sqlite:///example.db", echo=True)
# 定义基类
Base = declarative_base()
# 定义模型
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
name = Column(String)
email = Column(String, unique=True)
# 创建表
Base.metadata.create_all(engine)
# 创建会话
Session = sessionmaker(bind=engine)
session = Session()
# 插入数据
new_user = User(name="Alice", email="alice@example.com")
session.add(new_user)
session.commit()
# 查询数据
users = session.query(User).all()
for user in users:
print(f"{user.id}: {user.name} ({user.email})")
输出示例:
1: Alice (alice@example.com)
说明:
create_engine:连接数据库,echo=True 打印 SQL 日志。declarative_base:创建 ORM 基类。Column:定义表字段,支持类型、约束(如 primary_key、unique)。sessionmaker:管理会话,处理事务。3.2 Core SQL 表达式
使用 Core API 构造 SQL 查询。
from sqlalchemy import create_engine, Table, Column, Integer, String, MetaData
from sqlalchemy.sql import select
# 创建数据库连接
engine = create_engine("sqlite:///example.db")
metadata = MetaData()
# 定义表
users = Table(
"users",
metadata,
Column("id", Integer, primary_key=True),
Column("name", String),
Column("email", String)
)
# 插入数据
with engine.connect() as conn:
conn.execute(users.insert().values(name="Bob", email="bob@example.com"))
conn.commit()
# 查询数据
with engine.connect() as conn:
result = conn.execute(select(users).where(users.c.name == "Bob"))
for row in result:
print(row)
输出示例:
(2, 'Bob', 'bob@example.com')
说明:
Table:定义表结构。select:构造 SQL 查询,支持 where、join 等。engine.connect():直接执行 SQL。3.3 关系映射
处理表之间的关系(如一对多)。
from sqlalchemy import ForeignKey
from sqlalchemy.orm import relationship
# 定义模型
class Post(Base):
__tablename__ = "posts"
id = Column(Integer, primary_key=True)
title = Column(String)
user_id = Column(Integer, ForeignKey("users.id"))
user = relationship("User", back_populates="posts")
User.posts = relationship("Post", back_populates="user")
# 创建表
Base.metadata.create_all(engine)
# 插入数据
session = Session()
user = User(name="Charlie", email="charlie@example.com")
post = Post(title="First Post", user=user)
session.add_all([user, post])
session.commit()
# 查询关系
user = session.query(User).filter_by(name="Charlie").first()
print(f"{user.name}'s posts: {[post.title for post in user.posts]}")
输出示例:
Charlie's posts: ['First Post']
说明:
ForeignKey:定义外键关系。relationship:建立 ORM 关系,支持双向访问。3.4 异步支持
使用 async API 进行异步数据库操作(Python 3.7+)。
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
from sqlalchemy import select
import asyncio
# 创建异步引擎
engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/db", echo=True)
# 定义异步会话
AsyncSessionLocal = sessionmaker(engine, class_=AsyncSession)
async def main():
async with AsyncSessionLocal() as session:
# 查询用户
stmt = select(User).where(User.name == "Alice")
result = await session.execute(stmt)
user = result.scalars().first()
print(f"Found: {user.name} ({user.email})")
# 运行
asyncio.run(main())
说明:
create_async_engine:支持异步驱动(如 asyncpg)。AsyncSession:异步会话,配合 await 使用。3.5 查询过滤与聚合
支持复杂查询,如过滤、分组和聚合。
from sqlalchemy import func
# 查询用户数量
user_count = session.query(func.count(User.id)).scalar()
print(f"Total users: {user_count}")
# 过滤和排序
users = session.query(User).filter(User.name.like("A%")).order_by(User.id).all()
print([user.name for user in users])
输出示例:
Total users: 3
['Alice']
说明:
func:调用 SQL 函数(如 count、sum)。filter、order_by:构造复杂查询。3.6 事务管理
自动或手动管理事务。
# 自动事务
session = Session()
try:
session.add(User(name="Dave", email="dave@example.com"))
session.commit()
except Exception as e:
session.rollback()
print(f"Error: {e}")
# 手动事务
with session.begin():
session.add(User(name="Eve", email="eve@example.com"))
说明:
commit():提交事务。rollback():回滚事务。begin():上下文管理事务。4. 性能与特点
asyncpg)。5. 实际应用场景
示例(FastAPI 集成):
from fastapi import FastAPI, Depends
from sqlalchemy.orm import Session
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker
from pydantic import BaseModel
app = FastAPI()
# 数据库配置
engine = create_async_engine("sqlite+aiosqlite:///example.db")
AsyncSessionLocal = sessionmaker(engine, class_=AsyncSession)
# 模型
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
name = Column(String)
email = Column(String)
Base.metadata.create_all(bind=engine)
# Pydantic 模型
class UserCreate(BaseModel):
name: str
email: str
# 依赖注入
async def get_db():
async with AsyncSessionLocal() as session:
yield session
# 路由
@app.post("/users/")
async def create_user(user: UserCreate, db: AsyncSession = Depends(get_db)):
db_user = User(name=user.name, email=user.email)
db.add(db_user)
await db.commit()
await db.refresh(db_user)
return {"id": db_user.id, "name": db_user.name, "email": db_user.email}
说明:
Pydantic 验证输入,SQLAlchemy 管理数据库。6. 部署与扩展
postgresql://user:pass@localhost/db)。engine = create_engine("postgresql+psycopg2://...", pool_size=20, max_overflow=10)
asyncpg 或 aiosqlite。:memory:):
engine = create_engine("sqlite:///:memory:")
pytest 和 unittest 测试 ORM 操作。SQLAlchemy 集成:
pip install alembic
alembic init migrations
7. 注意事项
selectinload 或 joinedload 优化关系加载:
from sqlalchemy.orm import selectinload
users = session.query(User).options(selectinload(User.posts)).all()
commit() 或 rollback(),避免资源泄漏。with session.begin(): 简化事务。psycopg2(同步)或 asyncpg(异步)。pymysql(轻量)。session.execute 返回 Result),1.x 代码需迁移。asyncpg>=0.27)。echo=True 查看生成的 SQL。sqlalchemy.event 监听查询性能。8. 综合示例
以下是一个综合示例,展示 ORM、Core、异步操作和 FastAPI 集成:
from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy import create_engine, Column, Integer, String, select
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import declarative_base, sessionmaker, Session
from pydantic import BaseModel
from loguru import logger
import asyncio
# 配置日志
logger.add("app.log", rotation="1 MB", level="INFO")
# 数据库配置
sync_engine = create_engine("sqlite:///example.db", echo=True)
async_engine = create_async_engine("sqlite+aiosqlite:///example.db", echo=True)
Base = declarative_base()
SyncSession = sessionmaker(bind=sync_engine)
AsyncSessionLocal = sessionmaker(bind=async_engine, class_=AsyncSession)
# 模型
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
name = Column(String)
email = Column(String, unique=True)
Base.metadata.create_all(sync_engine)
# Pydantic 模型
class UserCreate(BaseModel):
name: str
email: str
class UserResponse(BaseModel):
id: int
name: str
email: str
# FastAPI 应用
app = FastAPI()
async def get_db():
async with AsyncSessionLocal() as session:
yield session
# 同步 Core 查询
def get_user_by_email(email: str, session: Session) -> User:
stmt = select(User).where(User.email == email)
result = session.execute(stmt)
return result.scalars().first()
# 异步路由
@app.post("/users/", response_model=UserResponse)
async def create_user(user: UserCreate, db: AsyncSession = Depends(get_db)):
try:
db_user = User(name=user.name, email=user.email)
db.add(db_user)
await db.commit()
await db.refresh(db_user)
logger.info(f"Created user: {user.name} ({user.email})")
return db_user
except Exception as e:
await db.rollback()
logger.error(f"Error creating user: {e}")
raise HTTPException(status_code=400, detail=str(e))
@app.get("/users/{email}", response_model=UserResponse)
async def get_user(email: str, db: AsyncSession = Depends(get_db)):
stmt = select(User).where(User.email == email)
result = await db.execute(stmt)
user = result.scalars().first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user
# 测试
if __name__ == "__main__":
# 同步插入测试数据
with SyncSession() as session:
if not get_user_by_email("alice@example.com", session):
session.add(User(name="Alice", email="alice@example.com"))
session.commit()
说明:
loguru 记录日志,pydantic 验证输入。运行:
uvicorn main:app --reload
访问:
http://localhost:8000/users/:创建用户。http://localhost:8000/users/alice@example.com:查询用户。9. 资源与文档
sqlalchemy 标签):https://stackoverflow.com/questions/tagged/sqlalchemy作者:彬彬侠