Redis 缓存使用的实践

2025-12-21 18:13:48

Redis缓存最佳实践指南

一、缓存更新策略

1. Cache Aside Pattern(旁路缓存)

这是最常用的缓存策略,适合读多写少的场景。

class UserService:

def get_user(self, user_id):

# 1. 先查询缓存

user = redis_client.get(f"user:{user_id}")

if user:

return json.loads(user)

# 2. 缓存未命中,查询数据库

user = db.query(f"SELECT * FROM users WHERE id = {user_id}")

# 3. 写入缓存

if user:

redis_client.setex(

f"user:{user_id}",

3600, # 过期时间1小时

json.dumps(user)

)

return user

def update_user(self, user):

# 1. 更新数据库

db.execute(

"UPDATE users SET name = %s WHERE id = %s",

(user.name, user.id)

)

# 2. 删除缓存

redis_client.delete(f"user:{user.id}")

2. Write Through(读写穿透)

适合写多的场景,确保缓存与数据库的一致性。

def write_through_update(user):

# 1. 更新数据库

db.execute(

"UPDATE users SET name = %s WHERE id = %s",

(user.name, user.id)

)

# 2. 更新缓存

redis_client.setex(

f"user:{user.id}",

3600,

json.dumps(user)

)

二、缓存击穿防护

1. 分布式锁实现

class RedisLock:

def __init__(self, key, expire=60):

self.key = f"lock:{key}"

self.expire = expire

def acquire(self):

# SET NX = set if not exists

return redis_client.set(

self.key,

'locked',

ex=self.expire,

nx=True

)

def release(self):

redis_client.delete(self.key)

def get_hot_data(key):

# 1. 查询缓存

data = redis_client.get(key)

if

return json.loads(data)

# 2. 缓存未命中,使用分布式锁防止击穿

lock = RedisLock(key)

if lock.acquire():

try:

# 双重检查

data = redis_client.get(key)

if

return json.loads(data)

# 查询数据库

data = db.query_hot_data(key)

# 写入缓存

redis_client.setex(key, 3600, json.dumps(data))

return data

finally:

lock.release()

else:

# 等待100ms后重试

time.sleep(0.1)

return get_hot_data(key)

三、大key处理方案

1. 大key识别

def scan_big_keys(pattern="*", min_size=1024*1024):

"""扫描大于指定大小的key"""

cursor = 0

big_keys = []

while True:

cursor, keys = redis_client.scan(

cursor,

match=pattern,

count=1000

)

for key in keys:

key_type = redis_client.type(key)

key_size = get_key_size(key, key_type)

if key_size > min_size:

big_keys.append({

'key': key,

'type': key_type,

'size': key_size

})

if cursor == 0:

break

return big_keys

def get_key_size(key, key_type):

if key_type == 'string':

return len(redis_client.get(key))

elif key_type == 'hash':

return len(redis_client.hgetall(key))

elif key_type == 'list':

return redis_client.llen(key)

elif key_type == 'set':

return redis_client.scard(key)

elif key_type == 'zset':

return redis_client.zcard(key)

2. 大key拆分

class UserFavorites:

def __init__(self, user_id):

self.base_key = f"user:{user_id}:favorites"

def add(self, item_id):

# 使用取模方式拆分大key

shard = item_id % 10

key = f"{self.base_key}:{shard}"

return redis_client.sadd(key, item_id)

def remove(self, item_id):

shard = item_id % 10

key = f"{self.base_key}:{shard}"

return redis_client.srem(key, item_id)

def is_favorite(self, item_id):

shard = item_id % 10

key = f"{self.base_key}:{shard}"

return redis_client.sismember(key, item_id)

def get_all(self):

# 合并所有分片的结果

result = set()

for i in range(10):

key = f"{self.base_key}:{i}"

result.update(redis_client.smembers(key))

return result

四、性能优化建议

批量操作:使用pipeline减少网络往返

def batch_get_users(user_ids):

pipe = redis_client.pipeline()

# 批量获取

for user_id in user_ids:

pipe.get(f"user:{user_id}")

# 一次性执行

results = pipe.execute()

# 处理结果

users = []

for result in results:

if result:

users.append(json.loads(result))

return users

序列化优化:对于频繁访问的数据,考虑使用protobuf等二进制序列化

内存优化:合理设置过期时间,避免内存泄漏

def cache_with_variable_ttl(key, value):

"""根据数据热度设置不同的过期时间"""

access_count = redis_client.incr(f"access:{key}")

if access_count > 1000:

ttl = 7200 # 热点数据2小时过期

else:

ttl = 3600 # 普通数据1小时过期

redis_client.setex(key, ttl, value)

通过以上实践指南,您应该能更好地在项目中使用Redis缓存。记住:缓存设计没有银弹,需要根据具体场景选择合适的策略。

HTC Desire 610
博世安防监控设备评价:技术先进、功能全面