Python操作Redis基础教程

这是一份详细的 Python 操作 Redis 教程,涵盖了从安装、连接到操作各种 Redis 数据类型以及一些进阶用法。(本人量化交易的数据库主要是redis+本地文件parquet结合)

1. Redis 简介

Redis (Remote Dictionary Server) 是一个开源的、基于内存的键值对(Key-Value)存储系统。它通常用作数据库、缓存和消息代理。由于数据存储在内存中,Redis 的读写速度非常快,适用于需要高性能数据访问的场景。

支持的数据类型:

  • String (字符串): 最基本的数据类型,可以是任何数据,如文本、JSON、序列化对象等(最大 512MB)。
  • List (列表): 字符串列表,按插入顺序排序。可以从两端进行 push/pop 操作。
  • Set (集合): 无序的字符串集合,不允许重复成员。
  • Hash (哈希/散列): 包含键值对的集合,非常适合存储对象。
  • Sorted Set (有序集合): 类似 Set,但每个成员都关联一个 double 类型的分数(score),并根据分数进行排序。成员必须唯一,但分数可以重复。
  • 以及其他更高级的类型如 Streams, Bitmaps, HyperLogLogs 等。
  • 2. 前提条件

  • 安装 Python: 确保你的系统已安装 Python (建议 3.6+)。
  • 安装 Redis 服务器: 你需要在你的机器上或可访问的网络上运行一个 Redis 服务器实例。
  • Linux/macOS: 可以通过包管理器安装(如 apt install redis-server, brew install redis)。
  • Windows: 可以通过 WSL (Windows Subsystem for Linux) 安装,或者下载官方不推荐但可用的 Windows 版本,或者使用 Docker。
  • Docker: 推荐方式,跨平台且易于管理:docker run --name my-redis -p 6379:6379 -d redis
  • 启动 Redis 服务器: 确保 Redis 服务正在运行。默认监听端口是 6379。你可以使用 redis-cli ping 命令测试服务器是否响应(应返回 PONG)。
  • 3. 安装 Python Redis 客户端库

    与 Redis 服务器交互最常用的 Python 库是 redis-py

    pip install redis
    

    4. 连接到 Redis

    方法一:直接连接

    这是最简单的方式,适用于快速测试或简单脚本。

    import redis
    
    try:
        # 连接到本地默认端口的 Redis
        # decode_responses=True: 让 get() 等方法返回字符串而不是 bytes
        r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
    
        # 如果 Redis 设置了密码
        # r = redis.Redis(host='your_redis_host', port=6379, db=0, password='your_password', decode_responses=True)
    
        # 测试连接
        response = r.ping()
        print(f"Redis PING response: {response}") # 应该打印 True
    
        print("成功连接到 Redis!")
    
    except redis.exceptions.ConnectionError as e:
        print(f"无法连接到 Redis: {e}")
    
    # 注意:使用完后,理论上应该关闭连接 r.close(),但在简单脚本中不总是必需。
    # 对于长时间运行的应用,强烈推荐使用连接池。
    

    方法二:使用连接池 (Connection Pool) – 推荐

    对于需要频繁与 Redis 交互的应用程序(如 Web 服务),每次操作都建立和断开连接会非常低效。连接池可以管理一组预先建立的连接,供应用程序复用。

    import redis
    
    try:
        # 创建连接池
        pool = redis.ConnectionPool(host='localhost', port=6379, db=0, decode_responses=True)
        # 如果有密码:
        # pool = redis.ConnectionPool(host='your_redis_host', port=6379, db=0, password='your_password', decode_responses=True)
    
        # 从连接池获取一个连接
        r = redis.Redis(connection_pool=pool)
    
        # 测试连接
        response = r.ping()
        print(f"Redis PING response (from pool): {response}")
    
        print("成功通过连接池连接到 Redis!")
    
        # 使用连接池时,不需要手动关闭单个连接 r.close()
        # 连接会在使用完毕后自动返回池中
    
    except redis.exceptions.ConnectionError as e:
        print(f"无法创建 Redis 连接池或连接失败: {e}")
    
    # 当你的应用程序退出时,可以考虑关闭整个连接池(虽然不总是必需)
    # pool.disconnect()
    

    decode_responses=True 的重要性:

  • 如果不设置 decode_responses=True (默认为 False),get()lrange()smembers() 等方法返回的是字节串 (bytes),例如 b'value'
  • 设置 decode_responses=True 后,这些方法会尝试使用指定的编码(默认为 utf-8)将字节串解码为字符串 (str),例如 'value',这通常更方便处理。
  • 5. 操作 Redis 数据类型

    假设我们已经通过连接池获取了 r 对象 (r = redis.Redis(connection_pool=pool))。

    5.1 String (字符串)

    # --- String 操作 ---
    print("\n--- String 操作 ---")
    
    # 设置值 (SET key value)
    # 如果键已存在,会覆盖旧值
    r.set('user:name', 'Alice')
    print(f"设置 user:name: {r.get('user:name')}")
    
    # 获取值 (GET key)
    name = r.get('user:name')
    print(f"获取 user:name: {name}")
    
    # 设置带过期时间的值 (SET key value EX seconds)
    # 'counter' 将在 60 秒后自动删除
    r.set('counter', 100, ex=60)
    print(f"设置带过期时间的 counter: {r.get('counter')}")
    
    # 设置值,仅当键不存在时 (SET key value NX)
    # 如果 'user:name' 已存在,setnx 返回 False,不设置
    result_nx = r.setnx('user:name', 'Bob')
    print(f"尝试设置已存在的 user:name (setnx): {result_nx} (当前值: {r.get('user:name')})")
    result_nx_new = r.setnx('user:city', 'New York')
    print(f"尝试设置不存在的 user:city (setnx): {result_nx_new} (当前值: {r.get('user:city')})")
    
    # 递增 (INCR key) - 对字符串表示的数字进行加 1
    r.set('visit_count', '0')
    r.incr('visit_count')
    r.incr('visit_count')
    print(f"递增 visit_count: {r.get('visit_count')}") # 输出 '2'
    
    # 按指定步长递增 (INCRBY key increment)
    r.incrby('visit_count', 10)
    print(f"递增 visit_count by 10: {r.get('visit_count')}") # 输出 '12'
    
    # 递减 (DECR key / DECRBY key decrement)
    r.decr('visit_count')
    print(f"递减 visit_count: {r.get('visit_count')}") # 输出 '11'
    
    # 检查键是否存在 (EXISTS key)
    exists = r.exists('user:name')
    print(f"user:name 是否存在: {exists}") # 输出 1 (表示存在)
    exists_non = r.exists('nonexistent_key')
    print(f"nonexistent_key 是否存在: {exists_non}") # 输出 0 (表示不存在)
    
    # 设置键的过期时间 (EXPIRE key seconds)
    r.set('temp_data', 'will disappear')
    r.expire('temp_data', 5) # 5 秒后过期
    print(f"设置 temp_data 过期时间为 5 秒")
    # time.sleep(6) # 可以取消注释这行来验证过期
    # print(f"5 秒后 temp_data 的值: {r.get('temp_data')}") # 如果 sleep 了,这里会输出 None
    
    # 删除键 (DEL key [key ...])
    deleted_count = r.delete('user:city', 'temp_data') # 可以同时删除多个
    print(f"删除了 {deleted_count} 个键 ('user:city', 'temp_data')")
    print(f"删除后 user:city 的值: {r.get('user:city')}") # 输出 None
    

    5.2 List (列表)

    Redis 列表是有序的字符串序列,类似 Python 列表,但操作主要在两端进行。

    # --- List 操作 ---
    print("\n--- List 操作 ---")
    list_key = 'tasks'
    
    # 清理旧数据(如果存在)
    r.delete(list_key)
    
    # 从左侧推入元素 (LPUSH key element [element ...])
    r.lpush(list_key, 'task3', 'task2', 'task1') # 插入顺序: task1, task2, task3
    print(f"从左侧推入 'task1', 'task2', 'task3'")
    
    # 从右侧推入元素 (RPUSH key element [element ...])
    r.rpush(list_key, 'task4', 'task5') # 插入顺序: task4, task5
    print(f"从右侧推入 'task4', 'task5'")
    
    # 获取列表范围 (LRANGE key start stop) - 获取所有元素
    # 0 是第一个元素,-1 是最后一个元素
    all_tasks = r.lrange(list_key, 0, -1)
    print(f"当前列表 ({list_key}): {all_tasks}") # 输出: ['task1', 'task2', 'task3', 'task4', 'task5'] (注意 LPUSH/RPUSH 的顺序)
    
    # 获取列表长度 (LLEN key)
    length = r.llen(list_key)
    print(f"列表长度: {length}") # 输出: 5
    
    # 从左侧弹出元素 (LPOP key) - 获取并移除第一个元素
    first_task = r.lpop(list_key)
    print(f"从左侧弹出: {first_task}") # 输出: 'task1'
    print(f"弹出后列表: {r.lrange(list_key, 0, -1)}") # 输出: ['task2', 'task3', 'task4', 'task5']
    
    # 从右侧弹出元素 (RPOP key) - 获取并移除最后一个元素
    last_task = r.rpop(list_key)
    print(f"从右侧弹出: {last_task}") # 输出: 'task5'
    print(f"弹出后列表: {r.lrange(list_key, 0, -1)}") # 输出: ['task2', 'task3', 'task4']
    
    # 获取指定索引的元素 (LINDEX key index)
    task_at_index_1 = r.lindex(list_key, 1)
    print(f"索引 1 处的元素: {task_at_index_1}") # 输出: 'task3'
    

    5.3 Set (集合)

    无序、唯一的字符串集合。

    # --- Set 操作 ---
    print("\n--- Set 操作 ---")
    set_key = 'unique_users'
    
    # 清理旧数据
    r.delete(set_key)
    
    # 添加成员 (SADD key member [member ...]) - 返回成功添加的新成员数量
    added_count = r.sadd(set_key, 'user1', 'user2', 'user3')
    print(f"向集合添加了 {added_count} 个成员")
    added_again = r.sadd(set_key, 'user2', 'user4') # 'user2' 已存在,不会重复添加
    print(f"再次尝试添加 'user2', 'user4',新增 {added_again} 个") # 输出 1
    
    # 获取所有成员 (SMEMBERS key)
    members = r.smembers(set_key)
    print(f"集合所有成员: {members}") # 输出: {'user1', 'user2', 'user3', 'user4'} (注意是无序的)
    
    # 检查成员是否存在 (SISMEMBER key member)
    is_member = r.sismember(set_key, 'user3')
    print(f"'user3' 是否是集合成员: {is_member}") # 输出: True
    is_member_no = r.sismember(set_key, 'user5')
    print(f"'user5' 是否是集合成员: {is_member_no}") # 输出: False
    
    # 获取集合成员数量 (SCARD key)
    count = r.scard(set_key)
    print(f"集合成员数量: {count}") # 输出: 4
    
    # 移除成员 (SREM key member [member ...]) - 返回成功移除的数量
    removed_count = r.srem(set_key, 'user1', 'user5') # 'user5' 不存在,只移除 'user1'
    print(f"尝试移除 'user1', 'user5',成功移除 {removed_count} 个") # 输出: 1
    print(f"移除后集合成员: {r.smembers(set_key)}") # 输出: {'user2', 'user3', 'user4'}
    

    5.4 Hash (哈希/散列)

    存储键值对的集合,适合表示对象。

    # --- Hash 操作 ---
    print("\n--- Hash 操作 ---")
    hash_key = 'user:1000' # 通常用 : 分隔表示层级
    
    # 清理旧数据
    r.delete(hash_key)
    
    # 设置单个字段值 (HSET key field value)
    r.hset(hash_key, 'name', 'Bob')
    print(f"设置 Hash 字段 'name'")
    
    # 同时设置多个字段值 (HSET key mapping)
    user_data = {'email': 'bob@example.com', 'city': 'London'}
    r.hset(hash_key, mapping=user_data)
    print(f"同时设置 Hash 字段 'email', 'city'")
    
    # 获取单个字段值 (HGET key field)
    name = r.hget(hash_key, 'name')
    email = r.hget(hash_key, 'email')
    print(f"获取 Hash 字段 'name': {name}") # 输出: 'Bob'
    print(f"获取 Hash 字段 'email': {email}") # 输出: 'bob@example.com'
    
    # 获取所有字段和值 (HGETALL key) - 返回字典
    all_fields = r.hgetall(hash_key)
    print(f"获取 Hash 所有字段和值: {all_fields}") # 输出: {'name': 'Bob', 'email': 'bob@example.com', 'city': 'London'}
    
    # 获取所有字段名 (HKEYS key)
    keys = r.hkeys(hash_key)
    print(f"获取 Hash 所有字段名: {keys}") # 输出: ['name', 'email', 'city']
    
    # 获取所有值 (HVALS key)
    values = r.hvals(hash_key)
    print(f"获取 Hash 所有值: {values}") # 输出: ['Bob', 'bob@example.com', 'London']
    
    # 检查字段是否存在 (HEXISTS key field)
    exists = r.hexists(hash_key, 'city')
    print(f"Hash 字段 'city' 是否存在: {exists}") # 输出: True
    
    # 删除字段 (HDEL key field [field ...]) - 返回成功删除的字段数量
    deleted_fields = r.hdel(hash_key, 'city', 'nonexistent_field')
    print(f"尝试删除 Hash 字段 'city', 'nonexistent_field',成功删除 {deleted_fields} 个") # 输出: 1
    print(f"删除字段后 Hash: {r.hgetall(hash_key)}") # 输出: {'name': 'Bob', 'email': 'bob@example.com'}
    

    5.5 Sorted Set (有序集合)

    与 Set 类似,但每个成员都有一个分数(score),Redis 会根据分数排序。

    # --- Sorted Set 操作 ---
    print("\n--- Sorted Set 操作 ---")
    zset_key = 'leaderboard'
    
    # 清理旧数据
    r.delete(zset_key)
    
    # 添加成员和分数 (ZADD key {member1: score1, member2: score2 ...})
    # 如果成员已存在,会更新其分数
    r.zadd(zset_key, {'player1': 1500, 'player2': 2100, 'player3': 1800})
    print(f"添加成员到有序集合")
    r.zadd(zset_key, {'player1': 1650}) # 更新 player1 的分数
    print(f"更新 player1 的分数")
    
    # 按分数范围获取成员 (ZRANGEBYSCORE key min max [WITHSCORES=True])
    # (1000, 2000] 表示分数 > 1000 且 <= 2000
    players_in_range = r.zrangebyscore(zset_key, '(1000', 2000) # 默认不包含分数
    print(f"分数在 (1000, 2000] 之间的玩家: {players_in_range}") # 输出: ['player1', 'player3']
    players_with_scores = r.zrangebyscore(zset_key, 1000, 2000, withscores=True)
    print(f"分数在 [1000, 2000] 之间的玩家 (带分数): {players_with_scores}") # 输出: [('player1', 1650.0), ('player3', 1800.0)]
    
    # 按排名范围获取成员 (ZRANGE key start stop [WITHSCORES=True])
    # 0 是排名第一(分数最低),-1 是排名最后(分数最高)
    top_players = r.zrange(zset_key, 0, -1, desc=True) # desc=True 按分数从高到低排
    print(f"所有玩家按分数降序排列: {top_players}") # 输出: ['player2', 'player3', 'player1']
    top_2_with_scores = r.zrange(zset_key, 0, 1, desc=True, withscores=True)
    print(f"排名前 2 的玩家 (带分数): {top_2_with_scores}") # 输出: [('player2', 2100.0), ('player3', 1800.0)]
    
    # 获取成员的分数 (ZSCORE key member)
    score_player2 = r.zscore(zset_key, 'player2')
    print(f"'player2' 的分数: {score_player2}") # 输出: 2100.0
    
    # 获取成员数量 (ZCARD key)
    num_players = r.zcard(zset_key)
    print(f"有序集合成员数量: {num_players}") # 输出: 3
    
    # 移除成员 (ZREM key member [member ...]) - 返回成功移除的数量
    removed_count = r.zrem(zset_key, 'player1', 'nonexistent')
    print(f"尝试移除 'player1', 'nonexistent',成功移除 {removed_count} 个") # 输出: 1
    print(f"移除后有序集合 (降序): {r.zrange(zset_key, 0, -1, desc=True, withscores=True)}")
    

    6. 进阶用法

    6.1 Pipeline (管道)

    当你需要连续执行多个 Redis 命令时,每次命令都需要一次网络往返 (Client <-> Server)。使用 Pipeline 可以将多个命令打包一次性发送给服务器,服务器执行完所有命令后再将结果一次性返回,从而显著减少网络延迟,提高性能。

    # --- Pipeline 操作 ---
    print("\n--- Pipeline 操作 ---")
    
    # 使用连接池获取连接
    r_pipe = redis.Redis(connection_pool=pool)
    
    # 创建 Pipeline 对象
    pipe = r_pipe.pipeline()
    
    # 在 Pipeline 中缓存命令 (不会立即执行)
    pipe.set('pipe_test:name', 'Pipeline User')
    pipe.incr('pipe_test:counter', 1)
    pipe.expire('pipe_test:name', 30)
    pipe.get('pipe_test:name')
    pipe.get('pipe_test:counter')
    
    # 一次性执行所有缓存的命令
    # results 是一个列表,包含每个命令的执行结果,顺序与添加顺序一致
    results = pipe.execute()
    
    print(f"Pipeline 执行结果: {results}")
    # 可能输出: [True, 1, True, 'Pipeline User', '1'] (具体值取决于 counter 之前的值)
    
    # 清理
    r_pipe.delete('pipe_test:name', 'pipe_test:counter')
    

    6.2 发布/订阅 (Pub/Sub)

    Redis 提供简单的发布/订阅消息模式。

  • 发布者 (Publisher): 向指定频道 (Channel) 发送消息。
  • 订阅者 (Subscriber): 监听一个或多个频道,接收发送到这些频道的消息。
  • 订阅者代码 (subscriber.py):

    import redis
    import time
    
    pool = redis.ConnectionPool(host='localhost', port=6379, db=0, decode_responses=True)
    r = redis.Redis(connection_pool=pool)
    
    # 创建 PubSub 对象
    p = r.pubsub()
    
    # 订阅频道 'news_channel' 和 'alerts_channel'
    p.subscribe('news_channel', 'alerts_channel')
    
    print("开始监听频道 'news_channel' 和 'alerts_channel'...")
    
    # 持续监听消息 (这是一个阻塞操作)
    # p.listen() 返回一个生成器
    try:
        for message in p.listen():
            print(f"收到消息: {message}")
            # 消息格式通常是:
            # {'type': 'subscribe', 'pattern': None, 'channel': 'news_channel', 'data': 1}
            # {'type': 'message', 'pattern': None, 'channel': 'news_channel', 'data': 'Hello World!'}
    
            # 这里可以根据 message['type'] 和 message['channel'] 处理不同消息
            if message['type'] == 'message':
                print(f"  频道 [{message['channel']}] 收到数据: {message['data']}")
                if message['data'] == 'stop':
                    print("收到停止信号,退出监听。")
                    break # 可以根据特定消息退出循环
    
    except KeyboardInterrupt:
        print("手动中断监听。")
    finally:
        # 取消订阅并关闭连接
        p.unsubscribe()
        p.close()
        pool.disconnect() # 关闭整个池
        print("监听已停止,连接已关闭。")
    

    发布者代码 (publisher.py):

    import redis
    import time
    
    pool = redis.ConnectionPool(host='localhost', port=6379, db=0, decode_responses=True)
    r = redis.Redis(connection_pool=pool)
    
    # 向 'news_channel' 发布消息
    count1 = r.publish('news_channel', 'Breaking News: Redis is awesome!')
    print(f"向 'news_channel' 发布消息,接收者数量: {count1}")
    
    time.sleep(1)
    
    # 向 'alerts_channel' 发布消息
    count2 = r.publish('alerts_channel', 'System Alert: High CPU usage detected!')
    print(f"向 'alerts_channel' 发布消息,接收者数量: {count2}")
    
    time.sleep(1)
    
    # 发送停止信号
    count3 = r.publish('news_channel', 'stop')
    print(f"向 'news_channel' 发布停止信号,接收者数量: {count3}")
    
    pool.disconnect()
    

    运行 Pub/Sub: 先运行 subscriber.py,它会阻塞并等待消息。然后运行 publisher.py,你将在 subscriber.py 的控制台中看到接收到的消息。

    7. 最佳实践与提示

  • 使用连接池: 对于生产环境或需要频繁交互的应用,务必使用连接池。
  • decode_responses=True: 大多数情况下设置此选项能简化代码,避免手动解码 bytes
  • 键名设计 (Key Naming): 使用有意义且结构化的键名,常用 object-type:id:field 格式(如 user:1000:profile)。保持一致性。
  • 错误处理: 使用 try...except 捕获可能的 redis.exceptions.ConnectionErrorredis.exceptions.TimeoutError 等异常。
  • 大 Value 处理: 避免在 Redis 中存储过大的单个 Value(几 MB 或更大),这可能影响性能和内存使用。考虑拆分或使用其他存储。
  • 原子性: Redis 的单个命令是原子的。对于需要多个命令组合的原子操作,考虑使用 Lua 脚本 (通过 r.eval()r.register_script()) 或 Redis Transactions (通过 Pipeline 的 multi()exec())。
  • 资源管理: 确保在适当的时候(如应用退出时)断开连接或关闭连接池 (pool.disconnect()),尤其是在非长时间运行的脚本中。
  • 这个教程涵盖了 Python 操作 Redis 的大部分常用功能。根据你的具体需求,可以进一步探索 Redis 的事务、Lua 脚本、Streams 等更高级的特性。记得查阅 redis-py 的官方文档和 Redis 官方文档以获取最全面和最新的信息。

    作者:hiquant

    物联沃分享整理
    物联沃-IOTWORD物联网 » Python操作Redis基础教程

    发表回复