当前位置: 首页 > news >正文

【python】python进阶——Redis模块 - 教程

目录

引言      

一、Redis与Python的初次邂逅

1.1 为什么选择Redis?

1.2 安装redis-py

二、连接Redis:基础与高级配置

2.1 基础连接

2.2 带参数连接

2.3 使用连接池提升性能

三、Redis数据类型操作详解

3.1 字符串(Strings)

3.2 哈希(Hashes)

3.3 列表(Lists)

3.4 集合(Sets)

3.5 有序集合(Sorted Sets)

四、高级功能与实战技巧

4.1 数据过期与自动清理

4.2 管道(Pipeline)批量操作

4.3 事务(Transactions)保证原子性

4.4 发布/订阅模式

五、实战应用场景

5.1 缓存实现

5.2 会话存储

5.3 限流器

六、性能优化


引言

        Redis作为一种高性能的键值存储系统,在缓存、会话存储、消息队列等场景中发挥着重要作用。结合Python使Redis的强大功能更易利用,本文将详细介绍如何使用Python操作Redis。

一、Redis与Python的初次邂逅

1.1 为什么选择Redis?

Redis是一款开源的内存数据结构存储,可用作数据库、缓存和消息代理。它支持多种数据结构(字符串、哈希、列表、集合等),并提供持久化功能。

1.2 安装redis-py

redis-py是Python与Redis交互的官方推荐库,安装简单:

pip install redis

二、连接Redis:基础与高级配置

2.1 基础连接

建立与Redis的基本连接:

import redis
​
# 最简单连接(默认localhost:6379,数据库0)
r = redis.Redis()
print("成功连接到Redis!" if r.ping() else "连接失败")

2.2 带参数连接

实际生产环境中通常需要更多参数:

import redis
​
try:   r = redis.Redis(       host='your_redis_host',  # Redis服务器地址       port=6379,               # 端口,默认6379       password='your_password', # 认证密码       db=0,                    # 数据库索引(0-15)       decode_responses=True,   # 自动将字节数据解码为字符串       socket_connect_timeout=5, # 连接超时时间(秒)       socket_timeout=5         # 操作超时时间(秒)   )   # 测试连接   r.ping()   print("成功连接到Redis!")
except redis.exceptions.ConnectionError as e:   print(f"连接失败: {e}")

关键参数解析

  • decode_responses=True:让redis-py自动将返回的字节数据解码为字符串,省去手动解码步骤

  • socket_connect_timeout:控制连接建立的超时时间,避免程序长时间阻塞

  • socket_timeout:控制命令执行超时时间,提升应用稳定性

2.3 使用连接池提升性能

在高并发场景下,使用连接池可以显著提升性能:

import redis
​
# 创建连接池
pool = redis.ConnectionPool(   host='localhost',   port=6379,   password='your_password',   db=0,   decode_responses=True,   max_connections=10  # 连接池最大连接数
)
​
# 从连接池获取连接
r = redis.Redis(connection_pool=pool)
​
# 使用连接
r.set('pool_test', '连接池工作正常')
value = r.get('pool_test')
print(value)  # 输出:连接池工作正常

连接池的优势在于复用连接,避免频繁创建和销毁连接的开销,特别适合Web应用等高并发场景。

三、Redis数据类型操作详解

3.1 字符串(Strings)

字符串是Redis最基本的数据类型:

# 基本设置和获取
r.set('username', 'john_doe')
r.setex('temp_session', 3600, 'session_data')  # 带过期时间(秒)
​
# 批量操作
r.mset({'key1': 'value1', 'key2': 'value2', 'key3': 'value3'})
values = r.mget('key1', 'key2', 'key3')
​
# 数字操作
r.set('counter', 10)
r.incr('counter')  # 增加1
r.incrby('counter', 5)  # 增加5
r.decr('counter')  # 减少1
​
print(r.get('counter'))  # 输出:15

3.2 哈希(Hashes)

哈希适合存储对象:

# 设置哈希值
r.hset('user:100', 'name', 'Alice')
r.hset('user:100', 'age', 30)
# 或使用mapping参数批量设置
r.hset('user:101', mapping={'name': 'Bob', 'age': 25, 'city': 'New York'})
​
# 获取数据
name = r.hget('user:100', 'name')
all_data = r.hgetall('user:101')  # 获取所有字段
​
print(f"用户100姓名: {name}")
print(f"用户101全部数据: {all_data}")

3.3 列表(Lists)

Redis列表是基于链表实现的,适合做队列、栈等结构:

# 列表操作
r.lpush('tasks', 'task1', 'task2', 'task3')  # 左侧插入
r.rpush('tasks', 'task4')  # 右侧插入
​
# 获取元素
first_task = r.lpop('tasks')  # 左侧弹出
last_task = r.rpop('tasks')  # 右侧弹出
​
# 获取范围元素
all_tasks = r.lrange('tasks', 0, -1)  # 获取所有元素
​
print(f"第一个任务: {first_task}")
print(f"所有任务: {all_tasks}")

3.4 集合(Sets)

# 集合 - 无序且唯一
r.sadd('tags', 'python', 'redis', 'database', 'python')  # 重复元素自动去重
​
tags = r.smembers('tags')
print(f"标签集合: {tags}")  # 输出: {'python', 'redis', 'database'}
​

3.5 有序集合(Sorted Sets)

# 有序集合 - 带分数排序
r.zadd('leaderboard', {'Alice': 100, 'Bob': 90, 'Charlie': 120})
top_players = r.zrevrange('leaderboard', 0, 1, withscores=True)  # 降序排列
​
print(f"排行榜前两名: {top_players}")

四、高级功能与实战技巧

4.1 数据过期与自动清理

Redis可以设置键的生存时间,实现数据自动过期:

# 设置键的过期时间
r.setex('session_token', 3600, 'encrypted_token_data')  # 1小时后过期
r.expire('existing_key', 600)  # 设置已有键的过期时间(10分钟)
# 检查剩余时间
ttl = r.ttl('session_token')  # 获取剩余生存时间(秒)
print(f"会话令牌剩余生存时间: {ttl}秒")
# 监控数据生命周期
import time
r.set('temp_data', '重要但会消失的数据', ex=60)  # 60秒后自动删除
while True:remaining = r.ttl('temp_data')if remaining > 0:print(f"数据剩余寿命:{remaining}秒")time.sleep(10)else:print("数据已自然消亡")break

4.2 管道(Pipeline)批量操作

使用管道可以显著提升批量操作的性能:

import redis
r = redis.Redis(decode_responses=True)
# 创建管道
pipe = r.pipeline()
# 添加多个命令到管道
for i in range(1000):pipe.set(f'key:{i}', f'value:{i}')
# 一次性执行所有命令
results = pipe.execute()
print(f"批量插入了{len(results)}条数据")

管道通过减少网络往返次数来提升性能,特别适合批量数据插入场景。

4.3 事务(Transactions)保证原子性

Redis事务可以确保一系列命令的原子性执行:

# 基本事务
pipe = r.pipeline(transaction=True)  # 开启事务模式
try:pipe.multi()  # 开始事务pipe.set('account:A', 100)pipe.set('account:B', 100)pipe.execute()  # 执行事务print("事务执行成功")
except redis.exceptions.RedisError as e:print(f"事务执行失败: {e}")
# 使用WATCH实现乐观锁
def transfer_funds(source, destination, amount):with r.pipeline() as pipe:try:# 监视源账户键pipe.watch(source)# 检查余额是否足够current_balance = int(r.get(source) or 0)if current_balance < amount:pipe.unwatch()print("余额不足")return False# 开始事务pipe.multi()pipe.decrby(source, amount)pipe.incrby(destination, amount)pipe.execute()print("转账成功")return Trueexcept redis.WatchError:print("账户余额已被其他操作修改,转账失败")return False
# 测试转账
r.set('account:A', 500)
r.set('account:B', 500)
transfer_funds('account:A', 'account:B', 100)

4.4 发布/订阅模式

Redis支持发布/订阅消息模式:

# 订阅者
import threading
def subscriber(channel_name):pubsub = r.pubsub()pubsub.subscribe(channel_name)for message in pubsub.listen():if message['type'] == 'message':print(f"收到消息: {message['data']}")
# 启动订阅线程
thread = threading.Thread(target=subscriber, args=('news',))
thread.daemon = True
thread.start()
# 发布者
r.publish('news', 'Hello, subscribers!')
r.publish('news', '这是第二条消息')

五、实战应用场景

5.1 缓存实现

def get_user_profile(user_id, cache_timeout=3600):   # 尝试从缓存获取   cache_key = f"user_profile:{user_id}"   cached_data = r.get(cache_key)   if cached_data:       print("从缓存获取数据")       return cached_data   # 缓存不存在,从数据库获取   print("从数据库获取数据")   user_data = f"用户{user_id的详细信息}"  # 模拟数据库查询   # 存入缓存   r.setex(cache_key, cache_timeout, user_data)   return user_data

5.2 会话存储

def create_user_session(user_id):   session_id = f"session:{user_id}"   session_data = {       'user_id': user_id,       'username': 'john_doe',       'last_login': '2023-10-01 12:00:00'   }   # 会话保存2小时   r.hset(session_id, mapping=session_data)   r.expire(session_id, 7200)   return session_id
​
def get_session(session_id):   return r.hgetall(session_id)

5.3 限流器

def rate_limiter(user_id, action, limit=10, window=60):   key = f"rate_limit:{user_id}:{action}"   current = r.get(key)   if current and int(current) >= limit:       return False  # 超过限制   pipe = r.pipeline()   pipe.incr(key)   pipe.expire(key, window)  # 设置过期时间   pipe.execute()   return True  # 在限制内

六、性能优化

  • 连接管理:始终使用连接池,避免频繁创建和销毁连接

  • 管道批量操作:批量数据操作使用管道提升性能

  • 合理设置超时:避免因Redis服务不可用导致应用阻塞

  • 键命名规范:使用冒号分隔的命名空间(如user:100:profile

  • 避免大键:单个键的值不宜过大,会影响性能

  • 监控与日志:记录Redis操作日志,监控性能指标

http://www.hskmm.com/?act=detail&tid=29205

相关文章:

  • 2025 年 10 月桥架厂家最新推荐:专业制造与品牌保障口碑之选!
  • 语文_作文_开头结尾
  • 后端缓存好?缓存实用的方案实例直接用就是前端缓存好还
  • 2025年通风天窗厂家最新权威推荐榜:专业性能与高效通风口碑
  • 解决scoop安装的anaconda无法在商店版powershell使用的问题
  • 2025智能吉他厂家最新权威推荐榜:创新科技与卓越音质完美融
  • 2025景区售票系统厂家最新权威推荐榜:智慧票务与高效管理口
  • 数学邪修手册
  • 实用指南:光谱相机在护眼灯领域的应用
  • 2025校平机厂家最新权威推荐榜:精准矫平与高效生产首选
  • idea 激活
  • 海曼HTPA80X64红外热成像 测温采集记录仪 多点实时温度分析
  • 位与字节
  • 2025年氢氧化镁厂家最新权威推荐榜:环保阻燃与工业应用深度
  • 2025年苹果仓民宿最新权威推荐榜:工业遗存与人文情怀完美融
  • 2025二手掘进机厂家最新权威推荐榜:高性价比与可靠性能深度
  • 对我学过的算法的一些总结
  • 鲜花 10.12
  • 数组01
  • 实验一:逆向及BOF基础实践-20232301郑好
  • 排除通过IP访问MySQL时出现的连接错误问题
  • SciTech-Mathmatics-Proba. Stats.: 统计量: 增长速度、同比 与 环比 的概念、用途、示例、计算公式、注意事项
  • Java二维数组
  • 在Ubuntu系统上设置syslog日志轮替与大小限制
  • 从 “有人值守” 到 “少人运维”:智能巡检机器人重塑配电室管理模式 - 实践
  • 2025年10月最新推荐卫星电话品牌发布,涵防爆对讲卫星电话,卫星电话应急指挥系统,卫星电话防爆对讲终端,防爆手持卫星电话!
  • 很早就想注册博客园了
  • [KaibaMath]1006 关于∀ε0, |a-b|λε(λ0) = a=b的证明
  • dataset类
  • 【PolarCTF】nc