031-Python 操作 MySQL 数据库
Python 提供了与 MySQL 数据库交互的多种方式,其中最常用的是 mysql-connector
和 PyMySQL
库。通过这些库,可以轻松完成对数据库的连接、查询、插入、更新和删除等操作。
以下是 Python 操作 MySQL 数据库的完整指南。
1. 安装 MySQL 数据库驱动
1.1 安装 mysql-connector
mysql-connector
是 MySQL 官方提供的 Python 驱动。
pip install mysql-connector-python
1.2 安装 PyMySQL
PyMySQL
是一个纯 Python 的 MySQL 驱动,功能强大且易于使用。
pip install PyMySQL
2. 连接 MySQL 数据库
2.1 使用 mysql-connector
import mysql.connector
# 连接到 MySQL 数据库
connection = mysql.connector.connect(
host="localhost", # MySQL 主机(本地为 localhost)
user="root", # MySQL 用户名
password="password", # MySQL 密码
database="test_db" # 要连接的数据库
)
# 检查连接是否成功
if connection.is_connected():
print("成功连接到数据库")
2.2 使用 PyMySQL
import pymysql
# 连接到 MySQL 数据库
connection = pymysql.connect(
host="localhost", # MySQL 主机
user="root", # 用户名
password="password", # 密码
database="test_db" # 数据库名
)
# 检查连接是否成功
if connection.open:
print("成功连接到数据库")
3. 执行 SQL 查询
数据库操作通常分为以下几类:
- 创建表
- 插入数据
- 查询数据
- 更新数据
- 删除数据
以下以 mysql-connector
为例,展示这些操作。如果使用 PyMySQL
,代码结构类似。
3.1 创建表
cursor = connection.cursor()
# 创建一个名为 users 的表
create_table_query = """
CREATE TABLE IF NOT EXISTS users (
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(50) NOT NULL,
age INT NOT NULL,
email VARCHAR(100) UNIQUE NOT NULL
)
"""
cursor.execute(create_table_query)
print("表已创建")
3.2 插入数据
单条插入
# 插入单条记录
insert_query = "INSERT INTO users (name, age, email) VALUES (%s, %s, %s)"
data = ("Alice", 25, "alice@example.com")
cursor.execute(insert_query, data)
# 提交更改
connection.commit()
print("数据已插入")
批量插入
# 插入多条记录
bulk_data = [
("Bob", 30, "bob@example.com"),
("Charlie", 35, "charlie@example.com"),
("David", 40, "david@example.com")
]
cursor.executemany(insert_query, bulk_data)
connection.commit()
print("批量数据已插入")
3.3 查询数据
查询所有记录
# 查询所有记录
select_query = "SELECT * FROM users"
cursor.execute(select_query)
# 获取所有结果
results = cursor.fetchall()
for row in results:
print(row)
条件查询
# 查询特定条件的数据
condition_query = "SELECT * FROM users WHERE age > %s"
cursor.execute(condition_query, (30,))
# 获取结果
results = cursor.fetchall()
for row in results:
print(row)
3.4 更新数据
# 更新用户的年龄
update_query = "UPDATE users SET age = %s WHERE name = %s"
cursor.execute(update_query, (28, "Alice"))
# 提交更改
connection.commit()
print("数据已更新")
3.5 删除数据
# 删除特定用户
delete_query = "DELETE FROM users WHERE name = %s"
cursor.execute(delete_query, ("Bob",))
# 提交更改
connection.commit()
print("数据已删除")
4. 关闭连接
完成所有操作后,记得关闭数据库连接。
cursor.close()
connection.close()
print("数据库连接已关闭")
5. 使用上下文管理器
为了保证连接和游标在使用完成后正确关闭,可以使用上下文管理器(with
语句)。
示例:使用上下文管理器
import mysql.connector
with mysql.connector.connect(
host="localhost",
user="root",
password="password",
database="test_db"
) as connection:
with connection.cursor() as cursor:
cursor.execute("SELECT * FROM users")
for row in cursor.fetchall():
print(row)
上下文管理器的优势是,即使发生异常,连接和游标也会被正确关闭。
6. 数据库操作完整示例
以下是一个完整的示例,展示了如何创建表、插入数据、查询数据、更新数据和删除数据。
import mysql.connector
# 连接到 MySQL 数据库
connection = mysql.connector.connect(
host="localhost",
user="root",
password="password",
database="test_db"
)
cursor = connection.cursor()
# 创建表
cursor.execute("""
CREATE TABLE IF NOT EXISTS users (
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(50) NOT NULL,
age INT NOT NULL,
email VARCHAR(100) UNIQUE NOT NULL
)
""")
# 插入数据
insert_query = "INSERT INTO users (name, age, email) VALUES (%s, %s, %s)"
cursor.execute(insert_query, ("Alice", 25, "alice@example.com"))
connection.commit()
# 查询数据
cursor.execute("SELECT * FROM users")
for row in cursor.fetchall():
print(row)
# 更新数据
cursor.execute("UPDATE users SET age = %s WHERE name = %s", (26, "Alice"))
connection.commit()
# 删除数据
cursor.execute("DELETE FROM users WHERE name = %s", ("Alice",))
connection.commit()
# 关闭连接
cursor.close()
connection.close()
7. 处理异常
在实际应用中,需要处理可能发生的异常,比如连接失败或 SQL 查询失败。
示例:异常处理
import mysql.connector
from mysql.connector import Error
try:
# 尝试连接数据库
connection = mysql.connector.connect(
host="localhost",
user="root",
password="password",
database="test_db"
)
if connection.is_connected():
print("成功连接到数据库")
cursor = connection.cursor()
# 执行查询
cursor.execute("SELECT DATABASE();")
db_name = cursor.fetchone()
print("当前数据库:", db_name)
except Error as e:
print("连接数据库失败:", e)
finally:
if connection.is_connected():
cursor.close()
connection.close()
print("数据库连接已关闭")
8. 安全性注意事项
-
防止 SQL 注入:
- 始终使用参数化查询,避免直接拼接 SQL 字符串。
- 示例:不安全的代码:
cursor.execute(f"SELECT * FROM users WHERE name = '{user_input}'")
- 修复后的代码:
cursor.execute("SELECT * FROM users WHERE name = %s", (user_input,))
-
使用加密连接:
- 配置 SSL 以保证数据传输的安全性。
-
限制权限:
- 不要使用具有管理员权限的用户连接数据库。
- 为应用程序创建专用用户,并限制其权限。
9. 总结
通过 Python 操作 MySQL 数据库,可以轻松实现对数据的管理和操作。以下是关键点:
mysql-connector
或 PyMySQL
。对于大型项目,推荐结合 ORM(如 SQLAlchemy 或 Django ORM)简化操作,同时保证代码的可维护性和高效性。
以下进一步扩展 Python 操作 MySQL 数据库 的内容,涵盖了高级主题,如事务处理、连接池、ORM(对象关系映射),以及性能优化和实际应用案例。
10. 事务处理
在 MySQL 中,事务(Transaction)是一组作为单个逻辑单元执行的 SQL 语句。事务的主要特性包括:
10.1 启用事务
MySQL 默认在每条 SQL 语句后自动提交。要使用事务,需显式禁用自动提交(autocommit=False
)。
事务处理示例
import mysql.connector
from mysql.connector import Error
try:
# 连接到数据库
connection = mysql.connector.connect(
host="localhost",
user="root",
password="password",
database="test_db"
)
connection.autocommit = False # 禁用自动提交
cursor = connection.cursor()
# 执行 SQL 操作
cursor.execute("INSERT INTO users (name, age, email) VALUES (%s, %s, %s)",
("John", 30, "john@example.com"))
cursor.execute("UPDATE users SET age = age + 1 WHERE name = %s",
("John",))
# 提交事务
connection.commit()
print("事务已提交")
except Error as e:
# 回滚事务
connection.rollback()
print("事务失败,已回滚:", e)
finally:
if connection.is_connected():
cursor.close()
connection.close()
print("数据库连接已关闭")
11. 连接池
连接 MySQL 数据库的开销较大,尤其是在高并发场景中频繁创建和关闭连接会影响性能。连接池可以复用连接,减少开销。
11.1 使用连接池
mysql-connector
提供了内置的连接池支持。
创建连接池
from mysql.connector import pooling
# 创建连接池
connection_pool = pooling.MySQLConnectionPool(
pool_name="mypool",
pool_size=5, # 池中最大连接数
host="localhost",
user="root",
password="password",
database="test_db"
)
# 从连接池获取连接
connection = connection_pool.get_connection()
if connection.is_connected():
print("成功从连接池获取连接")
使用连接池的完整示例
from mysql.connector import pooling
# 创建连接池
connection_pool = pooling.MySQLConnectionPool(
pool_name="mypool",
pool_size=3,
host="localhost",
user="root",
password="password",
database="test_db"
)
# 从连接池中获取连接
try:
connection = connection_pool.get_connection()
cursor = connection.cursor()
# 执行查询
cursor.execute("SELECT * FROM users")
for row in cursor.fetchall():
print(row)
finally:
# 释放连接回连接池
if connection.is_connected():
cursor.close()
connection.close()
12. 使用 ORM(对象关系映射)
ORM(Object-Relational Mapping) 是一种将数据库表映射为 Python 对象的技术,可以简化数据库操作。
12.1 SQLAlchemy
SQLAlchemy 是 Python 最流行的 ORM 库之一,支持高级查询和事务管理。
安装 SQLAlchemy
pip install sqlalchemy
SQLAlchemy 的基本使用
- 定义模型
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
# 创建基础类
Base = declarative_base()
# 定义用户表模型
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(50), nullable=False)
age = Column(Integer, nullable=False)
email = Column(String(100), unique=True, nullable=False)
- 连接数据库
# 创建数据库引擎
engine = create_engine("mysql+mysqlconnector://root:password@localhost/test_db")
# 创建所有表
Base.metadata.create_all(engine)
# 创建会话
Session = sessionmaker(bind=engine)
session = Session()
- 插入数据
# 插入数据
new_user = User(name="Alice", age=25, email="alice@example.com")
session.add(new_user)
session.commit()
print("数据已插入")
- 查询数据
# 查询所有用户
users = session.query(User).all()
for user in users:
print(user.name, user.age, user.email)
- 更新数据
# 更新用户信息
user = session.query(User).filter_by(name="Alice").first()
user.age = 26
session.commit()
- 删除数据
# 删除用户
user = session.query(User).filter_by(name="Alice").first()
session.delete(user)
session.commit()
13. 性能优化
在高并发或大数据量场景中,可以通过以下方法优化性能:
13.1 使用索引
为查询频繁的字段创建索引,可以显著提升查询速度。
示例:创建索引
CREATE INDEX idx_name ON users(name);
13.2 分页查询
对于大数据表,避免一次性查询所有数据,使用分页查询。
示例:分页查询
# 分页查询
page = 1
page_size = 10
offset = (page - 1) * page_size
query = "SELECT * FROM users LIMIT %s OFFSET %s"
cursor.execute(query, (page_size, offset))
for row in cursor.fetchall():
print(row)
13.3 使用批量插入
对于大量数据插入,使用批量插入代替逐条插入。
# 批量插入
data = [
("Tom", 30, "tom@example.com"),
("Jerry", 29, "jerry@example.com"),
("Spike", 35, "spike@example.com")
]
insert_query = "INSERT INTO users (name, age, email) VALUES (%s, %s, %s)"
cursor.executemany(insert_query, data)
connection.commit()
14. 实际案例:用户管理系统
以下是一个完整的用户管理系统示例,演示如何使用 Python 操作 MySQL 数据库。
功能
- 添加用户
- 查询所有用户
- 更新用户信息
- 删除用户
代码实现
import mysql.connector
# 数据库连接
connection = mysql.connector.connect(
host="localhost",
user="root",
password="password",
database="test_db"
)
cursor = connection.cursor()
# 创建用户表
cursor.execute("""
CREATE TABLE IF NOT EXISTS users (
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(50),
age INT,
email VARCHAR(100) UNIQUE
)
""")
def add_user(name, age, email):
query = "INSERT INTO users (name, age, email) VALUES (%s, %s, %s)"
cursor.execute(query, (name, age, email))
connection.commit()
print(f"用户 {name} 已添加")
def get_all_users():
cursor.execute("SELECT * FROM users")
for row in cursor.fetchall():
print(row)
def update_user(user_id, name=None, age=None, email=None):
query = "UPDATE users SET name=%s, age=%s, email=%s WHERE id=%s"
cursor.execute(query, (name, age, email, user_id))
connection.commit()
print(f"用户 {user_id} 已更新")
def delete_user(user_id):
query = "DELETE FROM users WHERE id=%s"
cursor.execute(query, (user_id,))
connection.commit()
print(f"用户 {user_id} 已删除")
# 测试功能
add_user("Alice", 25, "alice@example.com")
add_user("Bob", 30, "bob@example.com")
print("所有用户:")
get_all_users()
update_user(1, name="Alice Johnson", age=26)
delete_user(2)
print("最终用户列表:")
get_all_users()
# 关闭连接
cursor.close()
connection.close()
15. 总结
无论是小型项目还是大型系统,结合 Python 和 MySQL 的强大能力,都可以实现高效、稳定的数据库操作!
作者:小宝哥Code