Python操作Redis基础教程
这是一份详细的 Python 操作 Redis 教程,涵盖了从安装、连接到操作各种 Redis 数据类型以及一些进阶用法。(本人量化交易的数据库主要是redis+本地文件parquet结合)
1. Redis 简介
Redis (Remote Dictionary Server) 是一个开源的、基于内存的键值对(Key-Value)存储系统。它通常用作数据库、缓存和消息代理。由于数据存储在内存中,Redis 的读写速度非常快,适用于需要高性能数据访问的场景。
支持的数据类型:
2. 前提条件
apt install redis-server
, brew install redis
)。docker run --name my-redis -p 6379:6379 -d redis
6379
。你可以使用 redis-cli ping
命令测试服务器是否响应(应返回 PONG
)。3. 安装 Python Redis 客户端库
与 Redis 服务器交互最常用的 Python 库是 redis-py
。
pip install redis
4. 连接到 Redis
方法一:直接连接
这是最简单的方式,适用于快速测试或简单脚本。
import redis
try:
# 连接到本地默认端口的 Redis
# decode_responses=True: 让 get() 等方法返回字符串而不是 bytes
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
# 如果 Redis 设置了密码
# r = redis.Redis(host='your_redis_host', port=6379, db=0, password='your_password', decode_responses=True)
# 测试连接
response = r.ping()
print(f"Redis PING response: {response}") # 应该打印 True
print("成功连接到 Redis!")
except redis.exceptions.ConnectionError as e:
print(f"无法连接到 Redis: {e}")
# 注意:使用完后,理论上应该关闭连接 r.close(),但在简单脚本中不总是必需。
# 对于长时间运行的应用,强烈推荐使用连接池。
方法二:使用连接池 (Connection Pool) – 推荐
对于需要频繁与 Redis 交互的应用程序(如 Web 服务),每次操作都建立和断开连接会非常低效。连接池可以管理一组预先建立的连接,供应用程序复用。
import redis
try:
# 创建连接池
pool = redis.ConnectionPool(host='localhost', port=6379, db=0, decode_responses=True)
# 如果有密码:
# pool = redis.ConnectionPool(host='your_redis_host', port=6379, db=0, password='your_password', decode_responses=True)
# 从连接池获取一个连接
r = redis.Redis(connection_pool=pool)
# 测试连接
response = r.ping()
print(f"Redis PING response (from pool): {response}")
print("成功通过连接池连接到 Redis!")
# 使用连接池时,不需要手动关闭单个连接 r.close()
# 连接会在使用完毕后自动返回池中
except redis.exceptions.ConnectionError as e:
print(f"无法创建 Redis 连接池或连接失败: {e}")
# 当你的应用程序退出时,可以考虑关闭整个连接池(虽然不总是必需)
# pool.disconnect()
decode_responses=True
的重要性:
decode_responses=True
(默认为 False
),get()
、lrange()
、smembers()
等方法返回的是字节串 (bytes),例如 b'value'
。decode_responses=True
后,这些方法会尝试使用指定的编码(默认为 utf-8
)将字节串解码为字符串 (str),例如 'value'
,这通常更方便处理。5. 操作 Redis 数据类型
假设我们已经通过连接池获取了 r
对象 (r = redis.Redis(connection_pool=pool)
)。
5.1 String (字符串)
# --- String 操作 ---
print("\n--- String 操作 ---")
# 设置值 (SET key value)
# 如果键已存在,会覆盖旧值
r.set('user:name', 'Alice')
print(f"设置 user:name: {r.get('user:name')}")
# 获取值 (GET key)
name = r.get('user:name')
print(f"获取 user:name: {name}")
# 设置带过期时间的值 (SET key value EX seconds)
# 'counter' 将在 60 秒后自动删除
r.set('counter', 100, ex=60)
print(f"设置带过期时间的 counter: {r.get('counter')}")
# 设置值,仅当键不存在时 (SET key value NX)
# 如果 'user:name' 已存在,setnx 返回 False,不设置
result_nx = r.setnx('user:name', 'Bob')
print(f"尝试设置已存在的 user:name (setnx): {result_nx} (当前值: {r.get('user:name')})")
result_nx_new = r.setnx('user:city', 'New York')
print(f"尝试设置不存在的 user:city (setnx): {result_nx_new} (当前值: {r.get('user:city')})")
# 递增 (INCR key) - 对字符串表示的数字进行加 1
r.set('visit_count', '0')
r.incr('visit_count')
r.incr('visit_count')
print(f"递增 visit_count: {r.get('visit_count')}") # 输出 '2'
# 按指定步长递增 (INCRBY key increment)
r.incrby('visit_count', 10)
print(f"递增 visit_count by 10: {r.get('visit_count')}") # 输出 '12'
# 递减 (DECR key / DECRBY key decrement)
r.decr('visit_count')
print(f"递减 visit_count: {r.get('visit_count')}") # 输出 '11'
# 检查键是否存在 (EXISTS key)
exists = r.exists('user:name')
print(f"user:name 是否存在: {exists}") # 输出 1 (表示存在)
exists_non = r.exists('nonexistent_key')
print(f"nonexistent_key 是否存在: {exists_non}") # 输出 0 (表示不存在)
# 设置键的过期时间 (EXPIRE key seconds)
r.set('temp_data', 'will disappear')
r.expire('temp_data', 5) # 5 秒后过期
print(f"设置 temp_data 过期时间为 5 秒")
# time.sleep(6) # 可以取消注释这行来验证过期
# print(f"5 秒后 temp_data 的值: {r.get('temp_data')}") # 如果 sleep 了,这里会输出 None
# 删除键 (DEL key [key ...])
deleted_count = r.delete('user:city', 'temp_data') # 可以同时删除多个
print(f"删除了 {deleted_count} 个键 ('user:city', 'temp_data')")
print(f"删除后 user:city 的值: {r.get('user:city')}") # 输出 None
5.2 List (列表)
Redis 列表是有序的字符串序列,类似 Python 列表,但操作主要在两端进行。
# --- List 操作 ---
print("\n--- List 操作 ---")
list_key = 'tasks'
# 清理旧数据(如果存在)
r.delete(list_key)
# 从左侧推入元素 (LPUSH key element [element ...])
r.lpush(list_key, 'task3', 'task2', 'task1') # 插入顺序: task1, task2, task3
print(f"从左侧推入 'task1', 'task2', 'task3'")
# 从右侧推入元素 (RPUSH key element [element ...])
r.rpush(list_key, 'task4', 'task5') # 插入顺序: task4, task5
print(f"从右侧推入 'task4', 'task5'")
# 获取列表范围 (LRANGE key start stop) - 获取所有元素
# 0 是第一个元素,-1 是最后一个元素
all_tasks = r.lrange(list_key, 0, -1)
print(f"当前列表 ({list_key}): {all_tasks}") # 输出: ['task1', 'task2', 'task3', 'task4', 'task5'] (注意 LPUSH/RPUSH 的顺序)
# 获取列表长度 (LLEN key)
length = r.llen(list_key)
print(f"列表长度: {length}") # 输出: 5
# 从左侧弹出元素 (LPOP key) - 获取并移除第一个元素
first_task = r.lpop(list_key)
print(f"从左侧弹出: {first_task}") # 输出: 'task1'
print(f"弹出后列表: {r.lrange(list_key, 0, -1)}") # 输出: ['task2', 'task3', 'task4', 'task5']
# 从右侧弹出元素 (RPOP key) - 获取并移除最后一个元素
last_task = r.rpop(list_key)
print(f"从右侧弹出: {last_task}") # 输出: 'task5'
print(f"弹出后列表: {r.lrange(list_key, 0, -1)}") # 输出: ['task2', 'task3', 'task4']
# 获取指定索引的元素 (LINDEX key index)
task_at_index_1 = r.lindex(list_key, 1)
print(f"索引 1 处的元素: {task_at_index_1}") # 输出: 'task3'
5.3 Set (集合)
无序、唯一的字符串集合。
# --- Set 操作 ---
print("\n--- Set 操作 ---")
set_key = 'unique_users'
# 清理旧数据
r.delete(set_key)
# 添加成员 (SADD key member [member ...]) - 返回成功添加的新成员数量
added_count = r.sadd(set_key, 'user1', 'user2', 'user3')
print(f"向集合添加了 {added_count} 个成员")
added_again = r.sadd(set_key, 'user2', 'user4') # 'user2' 已存在,不会重复添加
print(f"再次尝试添加 'user2', 'user4',新增 {added_again} 个") # 输出 1
# 获取所有成员 (SMEMBERS key)
members = r.smembers(set_key)
print(f"集合所有成员: {members}") # 输出: {'user1', 'user2', 'user3', 'user4'} (注意是无序的)
# 检查成员是否存在 (SISMEMBER key member)
is_member = r.sismember(set_key, 'user3')
print(f"'user3' 是否是集合成员: {is_member}") # 输出: True
is_member_no = r.sismember(set_key, 'user5')
print(f"'user5' 是否是集合成员: {is_member_no}") # 输出: False
# 获取集合成员数量 (SCARD key)
count = r.scard(set_key)
print(f"集合成员数量: {count}") # 输出: 4
# 移除成员 (SREM key member [member ...]) - 返回成功移除的数量
removed_count = r.srem(set_key, 'user1', 'user5') # 'user5' 不存在,只移除 'user1'
print(f"尝试移除 'user1', 'user5',成功移除 {removed_count} 个") # 输出: 1
print(f"移除后集合成员: {r.smembers(set_key)}") # 输出: {'user2', 'user3', 'user4'}
5.4 Hash (哈希/散列)
存储键值对的集合,适合表示对象。
# --- Hash 操作 ---
print("\n--- Hash 操作 ---")
hash_key = 'user:1000' # 通常用 : 分隔表示层级
# 清理旧数据
r.delete(hash_key)
# 设置单个字段值 (HSET key field value)
r.hset(hash_key, 'name', 'Bob')
print(f"设置 Hash 字段 'name'")
# 同时设置多个字段值 (HSET key mapping)
user_data = {'email': 'bob@example.com', 'city': 'London'}
r.hset(hash_key, mapping=user_data)
print(f"同时设置 Hash 字段 'email', 'city'")
# 获取单个字段值 (HGET key field)
name = r.hget(hash_key, 'name')
email = r.hget(hash_key, 'email')
print(f"获取 Hash 字段 'name': {name}") # 输出: 'Bob'
print(f"获取 Hash 字段 'email': {email}") # 输出: 'bob@example.com'
# 获取所有字段和值 (HGETALL key) - 返回字典
all_fields = r.hgetall(hash_key)
print(f"获取 Hash 所有字段和值: {all_fields}") # 输出: {'name': 'Bob', 'email': 'bob@example.com', 'city': 'London'}
# 获取所有字段名 (HKEYS key)
keys = r.hkeys(hash_key)
print(f"获取 Hash 所有字段名: {keys}") # 输出: ['name', 'email', 'city']
# 获取所有值 (HVALS key)
values = r.hvals(hash_key)
print(f"获取 Hash 所有值: {values}") # 输出: ['Bob', 'bob@example.com', 'London']
# 检查字段是否存在 (HEXISTS key field)
exists = r.hexists(hash_key, 'city')
print(f"Hash 字段 'city' 是否存在: {exists}") # 输出: True
# 删除字段 (HDEL key field [field ...]) - 返回成功删除的字段数量
deleted_fields = r.hdel(hash_key, 'city', 'nonexistent_field')
print(f"尝试删除 Hash 字段 'city', 'nonexistent_field',成功删除 {deleted_fields} 个") # 输出: 1
print(f"删除字段后 Hash: {r.hgetall(hash_key)}") # 输出: {'name': 'Bob', 'email': 'bob@example.com'}
5.5 Sorted Set (有序集合)
与 Set 类似,但每个成员都有一个分数(score),Redis 会根据分数排序。
# --- Sorted Set 操作 ---
print("\n--- Sorted Set 操作 ---")
zset_key = 'leaderboard'
# 清理旧数据
r.delete(zset_key)
# 添加成员和分数 (ZADD key {member1: score1, member2: score2 ...})
# 如果成员已存在,会更新其分数
r.zadd(zset_key, {'player1': 1500, 'player2': 2100, 'player3': 1800})
print(f"添加成员到有序集合")
r.zadd(zset_key, {'player1': 1650}) # 更新 player1 的分数
print(f"更新 player1 的分数")
# 按分数范围获取成员 (ZRANGEBYSCORE key min max [WITHSCORES=True])
# (1000, 2000] 表示分数 > 1000 且 <= 2000
players_in_range = r.zrangebyscore(zset_key, '(1000', 2000) # 默认不包含分数
print(f"分数在 (1000, 2000] 之间的玩家: {players_in_range}") # 输出: ['player1', 'player3']
players_with_scores = r.zrangebyscore(zset_key, 1000, 2000, withscores=True)
print(f"分数在 [1000, 2000] 之间的玩家 (带分数): {players_with_scores}") # 输出: [('player1', 1650.0), ('player3', 1800.0)]
# 按排名范围获取成员 (ZRANGE key start stop [WITHSCORES=True])
# 0 是排名第一(分数最低),-1 是排名最后(分数最高)
top_players = r.zrange(zset_key, 0, -1, desc=True) # desc=True 按分数从高到低排
print(f"所有玩家按分数降序排列: {top_players}") # 输出: ['player2', 'player3', 'player1']
top_2_with_scores = r.zrange(zset_key, 0, 1, desc=True, withscores=True)
print(f"排名前 2 的玩家 (带分数): {top_2_with_scores}") # 输出: [('player2', 2100.0), ('player3', 1800.0)]
# 获取成员的分数 (ZSCORE key member)
score_player2 = r.zscore(zset_key, 'player2')
print(f"'player2' 的分数: {score_player2}") # 输出: 2100.0
# 获取成员数量 (ZCARD key)
num_players = r.zcard(zset_key)
print(f"有序集合成员数量: {num_players}") # 输出: 3
# 移除成员 (ZREM key member [member ...]) - 返回成功移除的数量
removed_count = r.zrem(zset_key, 'player1', 'nonexistent')
print(f"尝试移除 'player1', 'nonexistent',成功移除 {removed_count} 个") # 输出: 1
print(f"移除后有序集合 (降序): {r.zrange(zset_key, 0, -1, desc=True, withscores=True)}")
6. 进阶用法
6.1 Pipeline (管道)
当你需要连续执行多个 Redis 命令时,每次命令都需要一次网络往返 (Client <-> Server)。使用 Pipeline 可以将多个命令打包一次性发送给服务器,服务器执行完所有命令后再将结果一次性返回,从而显著减少网络延迟,提高性能。
# --- Pipeline 操作 ---
print("\n--- Pipeline 操作 ---")
# 使用连接池获取连接
r_pipe = redis.Redis(connection_pool=pool)
# 创建 Pipeline 对象
pipe = r_pipe.pipeline()
# 在 Pipeline 中缓存命令 (不会立即执行)
pipe.set('pipe_test:name', 'Pipeline User')
pipe.incr('pipe_test:counter', 1)
pipe.expire('pipe_test:name', 30)
pipe.get('pipe_test:name')
pipe.get('pipe_test:counter')
# 一次性执行所有缓存的命令
# results 是一个列表,包含每个命令的执行结果,顺序与添加顺序一致
results = pipe.execute()
print(f"Pipeline 执行结果: {results}")
# 可能输出: [True, 1, True, 'Pipeline User', '1'] (具体值取决于 counter 之前的值)
# 清理
r_pipe.delete('pipe_test:name', 'pipe_test:counter')
6.2 发布/订阅 (Pub/Sub)
Redis 提供简单的发布/订阅消息模式。
订阅者代码 (subscriber.py):
import redis
import time
pool = redis.ConnectionPool(host='localhost', port=6379, db=0, decode_responses=True)
r = redis.Redis(connection_pool=pool)
# 创建 PubSub 对象
p = r.pubsub()
# 订阅频道 'news_channel' 和 'alerts_channel'
p.subscribe('news_channel', 'alerts_channel')
print("开始监听频道 'news_channel' 和 'alerts_channel'...")
# 持续监听消息 (这是一个阻塞操作)
# p.listen() 返回一个生成器
try:
for message in p.listen():
print(f"收到消息: {message}")
# 消息格式通常是:
# {'type': 'subscribe', 'pattern': None, 'channel': 'news_channel', 'data': 1}
# {'type': 'message', 'pattern': None, 'channel': 'news_channel', 'data': 'Hello World!'}
# 这里可以根据 message['type'] 和 message['channel'] 处理不同消息
if message['type'] == 'message':
print(f" 频道 [{message['channel']}] 收到数据: {message['data']}")
if message['data'] == 'stop':
print("收到停止信号,退出监听。")
break # 可以根据特定消息退出循环
except KeyboardInterrupt:
print("手动中断监听。")
finally:
# 取消订阅并关闭连接
p.unsubscribe()
p.close()
pool.disconnect() # 关闭整个池
print("监听已停止,连接已关闭。")
发布者代码 (publisher.py):
import redis
import time
pool = redis.ConnectionPool(host='localhost', port=6379, db=0, decode_responses=True)
r = redis.Redis(connection_pool=pool)
# 向 'news_channel' 发布消息
count1 = r.publish('news_channel', 'Breaking News: Redis is awesome!')
print(f"向 'news_channel' 发布消息,接收者数量: {count1}")
time.sleep(1)
# 向 'alerts_channel' 发布消息
count2 = r.publish('alerts_channel', 'System Alert: High CPU usage detected!')
print(f"向 'alerts_channel' 发布消息,接收者数量: {count2}")
time.sleep(1)
# 发送停止信号
count3 = r.publish('news_channel', 'stop')
print(f"向 'news_channel' 发布停止信号,接收者数量: {count3}")
pool.disconnect()
运行 Pub/Sub: 先运行 subscriber.py
,它会阻塞并等待消息。然后运行 publisher.py
,你将在 subscriber.py
的控制台中看到接收到的消息。
7. 最佳实践与提示
decode_responses=True
: 大多数情况下设置此选项能简化代码,避免手动解码 bytes
。object-type:id:field
格式(如 user:1000:profile
)。保持一致性。try...except
捕获可能的 redis.exceptions.ConnectionError
、redis.exceptions.TimeoutError
等异常。r.eval()
或 r.register_script()
) 或 Redis Transactions (通过 Pipeline 的 multi()
和 exec()
)。pool.disconnect()
),尤其是在非长时间运行的脚本中。这个教程涵盖了 Python 操作 Redis 的大部分常用功能。根据你的具体需求,可以进一步探索 Redis 的事务、Lua 脚本、Streams 等更高级的特性。记得查阅 redis-py
的官方文档和 Redis 官方文档以获取最全面和最新的信息。
作者:hiquant