031-Python 操作 MySQL 数据库

Python 提供了与 MySQL 数据库交互的多种方式,其中最常用的是 mysql-connectorPyMySQL 库。通过这些库,可以轻松完成对数据库的连接、查询、插入、更新和删除等操作。

以下是 Python 操作 MySQL 数据库的完整指南。


1. 安装 MySQL 数据库驱动

1.1 安装 mysql-connector

mysql-connector 是 MySQL 官方提供的 Python 驱动。

pip install mysql-connector-python

1.2 安装 PyMySQL

PyMySQL 是一个纯 Python 的 MySQL 驱动,功能强大且易于使用。

pip install PyMySQL

2. 连接 MySQL 数据库

2.1 使用 mysql-connector

import mysql.connector

# 连接到 MySQL 数据库
connection = mysql.connector.connect(
    host="localhost",       # MySQL 主机(本地为 localhost)
    user="root",            # MySQL 用户名
    password="password",    # MySQL 密码
    database="test_db"      # 要连接的数据库
)

# 检查连接是否成功
if connection.is_connected():
    print("成功连接到数据库")

2.2 使用 PyMySQL

import pymysql

# 连接到 MySQL 数据库
connection = pymysql.connect(
    host="localhost",       # MySQL 主机
    user="root",            # 用户名
    password="password",    # 密码
    database="test_db"      # 数据库名
)

# 检查连接是否成功
if connection.open:
    print("成功连接到数据库")

3. 执行 SQL 查询

数据库操作通常分为以下几类:

  1. 创建表
  2. 插入数据
  3. 查询数据
  4. 更新数据
  5. 删除数据

以下以 mysql-connector 为例,展示这些操作。如果使用 PyMySQL,代码结构类似。


3.1 创建表

cursor = connection.cursor()

# 创建一个名为 users 的表
create_table_query = """
CREATE TABLE IF NOT EXISTS users (
    id INT AUTO_INCREMENT PRIMARY KEY,
    name VARCHAR(50) NOT NULL,
    age INT NOT NULL,
    email VARCHAR(100) UNIQUE NOT NULL
)
"""
cursor.execute(create_table_query)
print("表已创建")

3.2 插入数据

单条插入

# 插入单条记录
insert_query = "INSERT INTO users (name, age, email) VALUES (%s, %s, %s)"
data = ("Alice", 25, "alice@example.com")
cursor.execute(insert_query, data)

# 提交更改
connection.commit()
print("数据已插入")
批量插入

# 插入多条记录
bulk_data = [
    ("Bob", 30, "bob@example.com"),
    ("Charlie", 35, "charlie@example.com"),
    ("David", 40, "david@example.com")
]
cursor.executemany(insert_query, bulk_data)
connection.commit()
print("批量数据已插入")

3.3 查询数据

查询所有记录

# 查询所有记录
select_query = "SELECT * FROM users"
cursor.execute(select_query)

# 获取所有结果
results = cursor.fetchall()
for row in results:
    print(row)
条件查询

# 查询特定条件的数据
condition_query = "SELECT * FROM users WHERE age > %s"
cursor.execute(condition_query, (30,))

# 获取结果
results = cursor.fetchall()
for row in results:
    print(row)

3.4 更新数据

# 更新用户的年龄
update_query = "UPDATE users SET age = %s WHERE name = %s"
cursor.execute(update_query, (28, "Alice"))

# 提交更改
connection.commit()
print("数据已更新")

3.5 删除数据

# 删除特定用户
delete_query = "DELETE FROM users WHERE name = %s"
cursor.execute(delete_query, ("Bob",))

# 提交更改
connection.commit()
print("数据已删除")

4. 关闭连接

完成所有操作后,记得关闭数据库连接。

cursor.close()
connection.close()
print("数据库连接已关闭")

5. 使用上下文管理器

为了保证连接和游标在使用完成后正确关闭,可以使用上下文管理器(with 语句)。

示例:使用上下文管理器

import mysql.connector

with mysql.connector.connect(
    host="localhost",
    user="root",
    password="password",
    database="test_db"
) as connection:
    with connection.cursor() as cursor:
        cursor.execute("SELECT * FROM users")
        for row in cursor.fetchall():
            print(row)

上下文管理器的优势是,即使发生异常,连接和游标也会被正确关闭。


6. 数据库操作完整示例

以下是一个完整的示例,展示了如何创建表、插入数据、查询数据、更新数据和删除数据。

import mysql.connector

# 连接到 MySQL 数据库
connection = mysql.connector.connect(
    host="localhost",
    user="root",
    password="password",
    database="test_db"
)

cursor = connection.cursor()

# 创建表
cursor.execute("""
CREATE TABLE IF NOT EXISTS users (
    id INT AUTO_INCREMENT PRIMARY KEY,
    name VARCHAR(50) NOT NULL,
    age INT NOT NULL,
    email VARCHAR(100) UNIQUE NOT NULL
)
""")

# 插入数据
insert_query = "INSERT INTO users (name, age, email) VALUES (%s, %s, %s)"
cursor.execute(insert_query, ("Alice", 25, "alice@example.com"))
connection.commit()

# 查询数据
cursor.execute("SELECT * FROM users")
for row in cursor.fetchall():
    print(row)

# 更新数据
cursor.execute("UPDATE users SET age = %s WHERE name = %s", (26, "Alice"))
connection.commit()

# 删除数据
cursor.execute("DELETE FROM users WHERE name = %s", ("Alice",))
connection.commit()

# 关闭连接
cursor.close()
connection.close()

7. 处理异常

在实际应用中,需要处理可能发生的异常,比如连接失败或 SQL 查询失败。

示例:异常处理

import mysql.connector
from mysql.connector import Error

try:
    # 尝试连接数据库
    connection = mysql.connector.connect(
        host="localhost",
        user="root",
        password="password",
        database="test_db"
    )

    if connection.is_connected():
        print("成功连接到数据库")
        cursor = connection.cursor()

        # 执行查询
        cursor.execute("SELECT DATABASE();")
        db_name = cursor.fetchone()
        print("当前数据库:", db_name)

except Error as e:
    print("连接数据库失败:", e)

finally:
    if connection.is_connected():
        cursor.close()
        connection.close()
        print("数据库连接已关闭")

8. 安全性注意事项

  1. 防止 SQL 注入

  2. 始终使用参数化查询,避免直接拼接 SQL 字符串。
  3. 示例:不安全的代码

    cursor.execute(f"SELECT * FROM users WHERE name = '{user_input}'")
    
  4. 修复后的代码

    cursor.execute("SELECT * FROM users WHERE name = %s", (user_input,))
    
  5. 使用加密连接

  6. 配置 SSL 以保证数据传输的安全性。
  7. 限制权限

  8. 不要使用具有管理员权限的用户连接数据库。
  9. 为应用程序创建专用用户,并限制其权限。

9. 总结

通过 Python 操作 MySQL 数据库,可以轻松实现对数据的管理和操作。以下是关键点:

  • 连接数据库:使用 mysql-connectorPyMySQL
  • 执行操作:包括创建表、插入、查询、更新和删除操作。
  • 使用参数化查询:防止 SQL 注入。
  • 关闭连接:确保使用完毕后正确关闭连接。
  • 对于大型项目,推荐结合 ORM(如 SQLAlchemy 或 Django ORM)简化操作,同时保证代码的可维护性和高效性。


    以下进一步扩展 Python 操作 MySQL 数据库 的内容,涵盖了高级主题,如事务处理、连接池、ORM(对象关系映射),以及性能优化和实际应用案例。


    10. 事务处理

    在 MySQL 中,事务(Transaction)是一组作为单个逻辑单元执行的 SQL 语句。事务的主要特性包括:

  • 原子性(Atomicity):事务要么全部成功,要么全部失败。
  • 一致性(Consistency):事务执行后,数据库从一个一致状态转移到另一个一致状态。
  • 隔离性(Isolation):事务之间彼此独立,互不干扰。
  • 持久性(Durability):事务一旦提交,结果是永久性的。
  • 10.1 启用事务

    MySQL 默认在每条 SQL 语句后自动提交。要使用事务,需显式禁用自动提交(autocommit=False)。

    事务处理示例

    import mysql.connector
    from mysql.connector import Error
    
    try:
        # 连接到数据库
        connection = mysql.connector.connect(
            host="localhost",
            user="root",
            password="password",
            database="test_db"
        )
        connection.autocommit = False  # 禁用自动提交
    
        cursor = connection.cursor()
    
        # 执行 SQL 操作
        cursor.execute("INSERT INTO users (name, age, email) VALUES (%s, %s, %s)", 
                       ("John", 30, "john@example.com"))
        cursor.execute("UPDATE users SET age = age + 1 WHERE name = %s", 
                       ("John",))
    
        # 提交事务
        connection.commit()
        print("事务已提交")
    
    except Error as e:
        # 回滚事务
        connection.rollback()
        print("事务失败,已回滚:", e)
    
    finally:
        if connection.is_connected():
            cursor.close()
            connection.close()
            print("数据库连接已关闭")
    

    11. 连接池

    连接 MySQL 数据库的开销较大,尤其是在高并发场景中频繁创建和关闭连接会影响性能。连接池可以复用连接,减少开销。

    11.1 使用连接池

    mysql-connector 提供了内置的连接池支持。

    创建连接池

    from mysql.connector import pooling
    
    # 创建连接池
    connection_pool = pooling.MySQLConnectionPool(
        pool_name="mypool",
        pool_size=5,                # 池中最大连接数
        host="localhost",
        user="root",
        password="password",
        database="test_db"
    )
    
    # 从连接池获取连接
    connection = connection_pool.get_connection()
    
    if connection.is_connected():
        print("成功从连接池获取连接")
    
    使用连接池的完整示例

    from mysql.connector import pooling
    
    # 创建连接池
    connection_pool = pooling.MySQLConnectionPool(
        pool_name="mypool",
        pool_size=3,
        host="localhost",
        user="root",
        password="password",
        database="test_db"
    )
    
    # 从连接池中获取连接
    try:
        connection = connection_pool.get_connection()
        cursor = connection.cursor()
    
        # 执行查询
        cursor.execute("SELECT * FROM users")
        for row in cursor.fetchall():
            print(row)
    
    finally:
        # 释放连接回连接池
        if connection.is_connected():
            cursor.close()
            connection.close()
    

    12. 使用 ORM(对象关系映射)

    ORM(Object-Relational Mapping) 是一种将数据库表映射为 Python 对象的技术,可以简化数据库操作。

    12.1 SQLAlchemy

    SQLAlchemy 是 Python 最流行的 ORM 库之一,支持高级查询和事务管理。

    安装 SQLAlchemy

    pip install sqlalchemy
    
    SQLAlchemy 的基本使用
    1. 定义模型
    from sqlalchemy import create_engine, Column, Integer, String
    from sqlalchemy.ext.declarative import declarative_base
    from sqlalchemy.orm import sessionmaker
    
    # 创建基础类
    Base = declarative_base()
    
    # 定义用户表模型
    class User(Base):
        __tablename__ = "users"
        id = Column(Integer, primary_key=True, autoincrement=True)
        name = Column(String(50), nullable=False)
        age = Column(Integer, nullable=False)
        email = Column(String(100), unique=True, nullable=False)
    
    1. 连接数据库
    # 创建数据库引擎
    engine = create_engine("mysql+mysqlconnector://root:password@localhost/test_db")
    
    # 创建所有表
    Base.metadata.create_all(engine)
    
    # 创建会话
    Session = sessionmaker(bind=engine)
    session = Session()
    
    1. 插入数据
    # 插入数据
    new_user = User(name="Alice", age=25, email="alice@example.com")
    session.add(new_user)
    session.commit()
    print("数据已插入")
    
    1. 查询数据
    # 查询所有用户
    users = session.query(User).all()
    for user in users:
        print(user.name, user.age, user.email)
    
    1. 更新数据
    # 更新用户信息
    user = session.query(User).filter_by(name="Alice").first()
    user.age = 26
    session.commit()
    
    1. 删除数据
    # 删除用户
    user = session.query(User).filter_by(name="Alice").first()
    session.delete(user)
    session.commit()
    

    13. 性能优化

    在高并发或大数据量场景中,可以通过以下方法优化性能:

    13.1 使用索引

    为查询频繁的字段创建索引,可以显著提升查询速度。

    示例:创建索引

    CREATE INDEX idx_name ON users(name);
    

    13.2 分页查询

    对于大数据表,避免一次性查询所有数据,使用分页查询。

    示例:分页查询

    # 分页查询
    page = 1
    page_size = 10
    offset = (page - 1) * page_size
    
    query = "SELECT * FROM users LIMIT %s OFFSET %s"
    cursor.execute(query, (page_size, offset))
    
    for row in cursor.fetchall():
        print(row)
    

    13.3 使用批量插入

    对于大量数据插入,使用批量插入代替逐条插入。

    # 批量插入
    data = [
        ("Tom", 30, "tom@example.com"),
        ("Jerry", 29, "jerry@example.com"),
        ("Spike", 35, "spike@example.com")
    ]
    insert_query = "INSERT INTO users (name, age, email) VALUES (%s, %s, %s)"
    cursor.executemany(insert_query, data)
    connection.commit()
    

    14. 实际案例:用户管理系统

    以下是一个完整的用户管理系统示例,演示如何使用 Python 操作 MySQL 数据库。

    功能
    1. 添加用户
    2. 查询所有用户
    3. 更新用户信息
    4. 删除用户

    代码实现

    import mysql.connector
    
    # 数据库连接
    connection = mysql.connector.connect(
        host="localhost",
        user="root",
        password="password",
        database="test_db"
    )
    cursor = connection.cursor()
    
    # 创建用户表
    cursor.execute("""
    CREATE TABLE IF NOT EXISTS users (
        id INT AUTO_INCREMENT PRIMARY KEY,
        name VARCHAR(50),
        age INT,
        email VARCHAR(100) UNIQUE
    )
    """)
    
    def add_user(name, age, email):
        query = "INSERT INTO users (name, age, email) VALUES (%s, %s, %s)"
        cursor.execute(query, (name, age, email))
        connection.commit()
        print(f"用户 {name} 已添加")
    
    def get_all_users():
        cursor.execute("SELECT * FROM users")
        for row in cursor.fetchall():
            print(row)
    
    def update_user(user_id, name=None, age=None, email=None):
        query = "UPDATE users SET name=%s, age=%s, email=%s WHERE id=%s"
        cursor.execute(query, (name, age, email, user_id))
        connection.commit()
        print(f"用户 {user_id} 已更新")
    
    def delete_user(user_id):
        query = "DELETE FROM users WHERE id=%s"
        cursor.execute(query, (user_id,))
        connection.commit()
        print(f"用户 {user_id} 已删除")
    
    # 测试功能
    add_user("Alice", 25, "alice@example.com")
    add_user("Bob", 30, "bob@example.com")
    
    print("所有用户:")
    get_all_users()
    
    update_user(1, name="Alice Johnson", age=26)
    delete_user(2)
    
    print("最终用户列表:")
    get_all_users()
    
    # 关闭连接
    cursor.close()
    connection.close()
    

    15. 总结

  • 基础操作:熟练掌握如何连接 MySQL、创建表、查询、插入、更新和删除数据。
  • 高级功能:使用事务、连接池和 ORM 简化复杂的数据库操作。
  • 性能优化:通过索引、分页查询和批量操作提升数据库性能。
  • 实际案例:将功能模块化应用到实际项目中。
  • 无论是小型项目还是大型系统,结合 Python 和 MySQL 的强大能力,都可以实现高效、稳定的数据库操作!

    作者:小宝哥Code

    物联沃分享整理
    物联沃-IOTWORD物联网 » 031-Python 操作 MySQL 数据库

    发表回复