IT策士 10余年一线大厂经验,专注 IT 思维、架构、职场进阶。我会在各个平台持续发布最新文章,助你少走弯路。
Redis 做缓存性能卓越,但一旦缓存出了问题,所有请求直接打到数据库上,瞬间就能将数据库压垮。缓存层在实践中面临三大经典难题:穿透、击穿、雪崩。每个名词听起来都很可怕,但理解了它们的成因和应对之道,你就能轻松化解。
本文不仅讲原理,更用 Python 实现完整的防御方案——布隆过滤器、互斥锁、逻辑过期、随机 TTL 和缓存一致性策略。读完就能直接用到你的项目里。
1. 缓存穿透
1.1 什么是缓存穿透?
查询一个根本不存在的数据,缓存层和数据库层都没有。这类请求每次都穿过缓存直接打到数据库,当有大量这种恶意请求时,数据库压力骤增,甚至宕机。
请求 → 缓存(miss)→ 数据库(miss)→ 返回空因为缓存不命中,每次请求都击穿缓存直抵数据库。
1.2 解决方案一:缓存空对象
当数据库查询不到数据时,依然将一个空值写入缓存,并设置较短的过期时间。这样下次请求就直接命中缓存,不会访问数据库。
importredisimporttimer=redis.Redis(host='localhost',port=6379,decode_responses=True)def get_product(product_id): cache_key=f'product:{product_id}'# 1. 查缓存cached=r.get(cache_key)ifcached is not None:ifcached=='NULL':print('缓存命中:空对象')returnNone print('缓存命中:真实数据')returncached# 2. 查数据库(模拟)product=query_db(product_id)# 可能返回 None# 3. 写入缓存ifproduct is None: r.setex(cache_key,60,'NULL')# 空值缓存 60 秒print('写入空对象缓存')else: r.setex(cache_key,300, product)# 真实数据缓存 5 分钟print('写入真实数据缓存')returnproduct def query_db(product_id):"""模拟数据库查询"""# 假设只有 id 为 1 的产品存在ifproduct_id==1:return'iPhone 15'returnNone# 测试print(get_product(1))# 缓存未命中,查数据库,写入真实数据print(get_product(1))# 缓存命中:真实数据print(get_product(2))# 缓存未命中,查数据库,写入空对象print(get_product(2))# 缓存命中:空对象优点:简单直接。
缺点:若恶意攻击者不断用不同的不存在 ID 请求,缓存中会充满大量NULL键,浪费内存。此时需要布隆过滤器。
1.3 解决方案二:布隆过滤器
布隆过滤器(Bloom Filter)是一个概率型数据结构,用来判断“一个元素一定不在集合中”或“可能在集合中”。
它由
bitmap和多个哈希函数组成。添加元素时,通过多个哈希函数计算位置,将 bitmap 相应位置 1。
查询时,同样计算多个位置,如果任何一个位置为 0,则元素肯定不存在;如果全为 1,则元素可能存在(有一定误判率,但不会漏判)。
布隆过滤器的误判率与 bit 数组长度和哈希函数个数有关,可以通过参数控制。
布隆过滤器原理示意图
添加"apple":hash1("apple")=2→ setbit2hash2("apple")=5→ setbit5hash3("apple")=7→ setbit7查询"orange":hash1("orange")=2→1hash2("orange")=5→1hash3("orange")=8→0→ 一定不存在!Python 实现基于 Redis 位图的布隆过滤器
我们使用多个哈希函数(通过hashlib模拟)在 Redis 的 String 位图上操作。
importhashlibimportmathimportredis class BloomFilter:"""基于 Redis 位图的布隆过滤器""" def __init__(self, redis_client, key,expected_items=100000,false_positive_rate=0.01):""" expected_items: 预期元素数量 false_positive_rate: 可接受的误判率""" self.redis=redis_client self.key=key# 根据预期元素和误判率计算位图大小和哈希函数个数self.bit_size=int(-expected_items * math.log(false_positive_rate)/(math.log(2)**2))self.hash_count=int(self.bit_size / expected_items * math.log(2))def _hash(self, item, seed):"""使用不同 seed 生成多个哈希值""" h=hashlib.md5((str(seed)+str(item)).encode())returnint(h.hexdigest(),16)% self.bit_size def add(self, item):"""添加元素到布隆过滤器"""forseedinrange(self.hash_count): offset=self._hash(item, seed)self.redis.setbit(self.key, offset,1)returnTrue def exists(self, item):"""检查元素是否可能存在"""forseedinrange(self.hash_count): offset=self._hash(item, seed)ifself.redis.getbit(self.key, offset)==0:returnFalsereturnTrue# 使用示例r=redis.Redis(host='localhost',port=6379,decode_responses=True)bf=BloomFilter(r,'bloom:products',expected_items=1000000,false_positive_rate=0.01)# 预加载所有存在的商品 ID 到布隆过滤器existing_product_ids=[1,2,3,4,5]# 模拟数据库中存在的IDforpidinexisting_product_ids: bf.add(pid)# 查询时先经过布隆过滤器def get_product_with_bloom(product_id):ifnot bf.exists(product_id): print(f'布隆过滤器判定 {product_id} 不存在,直接返回')returnNone# 可能存在于缓存或数据库returnget_product(product_id)# 使用前面定义的函数print(get_product_with_bloom(999))# 直接返回 None,不会查库print(get_product_with_bloom(1))# 可能命中输出示例:
布隆过滤器判定999不存在,直接返回 None 缓存未命中,查询数据库... 写入真实数据缓存 iPhone15这样,绝大多数不存在的 ID 被拦截在布隆过滤器层,大大减轻数据库压力。
💡 生产环境可以使用
redis-py官方提供的布隆过滤器模块RedisBloom,或者使用pybloom库。核心原理完全相同。
2. 缓存击穿
2.1 什么是缓存击穿?
一个热点数据 key 在缓存过期的瞬间,大量请求同时涌向数据库去重建该缓存。因为重建缓存通常需要一定时间(比如复杂的 SQL 查询),这会导致数据库瞬间负载飙升,甚至崩溃。
大量并发请求 → 缓存过期(miss)→ 同时打到数据库与穿透不同,击穿的数据是存在的,只是因为热点 key 过期。
2.2 解决方案一:互斥锁
让第一个请求去查数据库并重建缓存,其他请求等待结果,而不是都去查数据库。
使用 Redis 的SETNX实现分布式互斥锁。
importuuidimporttimeimportthreading def get_hotspot_with_lock(product_id): cache_key=f'product:{product_id}'lock_key=f'lock:product:{product_id}'lock_value=str(uuid.uuid4())# 1. 查缓存cached=r.get(cache_key)ifcached:returncached# 2. 尝试获取锁ifr.set(lock_key, lock_value,nx=True,ex=5):# 锁过期5秒,防死锁try:# 双重检查cached=r.get(cache_key)ifcached:returncached# 3. 查询数据库product=query_db(product_id)# 可能耗时ifproduct: r.setex(cache_key,300, product)returnproduct finally:# 4. 释放锁(Lua 保证原子性)unlock_script="""ifredis.call("get", KEYS[1])==ARGV[1]thenreturnredis.call("del", KEYS[1])elsereturn0end""" r.eval(unlock_script,1, lock_key, lock_value)else:# 获取锁失败,等待并重试time.sleep(0.05)returnget_hotspot_with_lock(product_id)# 递归重试# 模拟并发请求def test_concurrent(): def task(): result=get_hotspot_with_lock(1)print(f'线程 {threading.current_thread().name}: {result}')threads=[threading.Thread(target=task)for_inrange(10)]fortinthreads: t.start()fortinthreads: t.join()test_concurrent()优点:简单可靠,保证同一时刻只有一个线程查询数据库。
缺点:线程等待可能影响响应时间,锁的粒度需要控制。
2.3 解决方案二:逻辑过期
除了物理 TTL,再给缓存值加一个逻辑过期时间字段。当物理过期后,并不立即删除缓存,而是由后台线程异步更新缓存,请求在此期间仍然返回旧值。
importjsonimportthreading from datetimeimportdatetime, timedelta LOGICAL_TTL=300# 逻辑过期时间(秒)def set_with_logical_expiry(key, value,logical_ttl=LOGICAL_TTL):"""存储带逻辑过期时间的值""" data={'data':value,'expire_at':(datetime.now()+ timedelta(seconds=logical_ttl)).timestamp()}r.set(key, json.dumps(data))def get_with_logical_expiry(key): cached=r.get(key)ifnot cached:returnNone item=json.loads(cached)data=item['data']expire_at=item['expire_at']# 如果逻辑时间已过期,启动异步刷新ifdatetime.now().timestamp()>expire_at:# 使用互斥锁避免大量并发刷新lock_key=f'lock:logical:{key}'ifr.set(lock_key,'1',nx=True,ex=5): threading.Thread(target=refresh_cache,args=(key,)).start()# 仍然返回旧值returndatareturndata def refresh_cache(key):"""异步刷新缓存(实际应查数据库)""" time.sleep(0.2)# 模拟数据库查询new_value=f'new_value_for_{key}'# 这里应该是 query_db() 的结果set_with_logical_expiry(key, new_value)print(f'异步刷新缓存: {key}')优点:用户请求不会阻塞,始终返回缓存数据(可能是旧值)。
缺点:数据可能短暂不一致,适合允许最终一致性的场景。
3. 缓存雪崩
3.1 什么是缓存雪崩?
大量缓存 key 在同一时间过期,或者 Redis 服务宕机,导致所有请求瞬间打到数据库,就像雪崩一样,压垮数据库。
常见诱因:
设置了相同的过期时间,大量 key 同时失效。
Redis 集群大面积故障。
3.2 解决方案一:随机 TTL
给缓存过期时间加上一个随机偏移量,避免集体失效。
importrandom def set_with_random_ttl(key, value,base_ttl=300,random_range=60):"""基础 TTL ± 随机偏移""" ttl=base_ttl + random.randint(-random_range, random_range)ttl=max(ttl,60)# 至少 60 秒r.setex(key, ttl, value)print(f'{key} 过期时间: {ttl}s')# 设置一批缓存,观察过期时间foriinrange(10): set_with_random_ttl(f'hot:key:{i}', f'value_{i}')输出示例:
hot:key:0 过期时间: 332s hot:key:1 过期时间: 268s hot:key:2 过期时间: 319s...3.3 解决方案二:多级缓存与降级
结合本地缓存(如 Python 字典或cachetools),当 Redis 不可用时使用本地缓存兜底;或者直接降级返回默认值,避免请求直达数据库。
from cachetoolsimportTTLCache local_cache=TTLCache(maxsize=1000,ttl=60)def get_with_fallback(key): try: value=r.get(key)ifvalue: local_cache[key]=value# 更新本地缓存returnvalue except Exception as e: print(f'Redis 异常: {e}')# Redis 不可用,走本地缓存returnlocal_cache.get(key,'默认值')4. 缓存一致性策略
使用缓存不可避免地会面临数据一致性问题:数据库数据更新了,缓存怎么同步?
4.1 Cache Aside 模式(旁路缓存)
这是最经典的缓存策略:
读:先读缓存,缓存没有则读数据库,再写入缓存。
写:先更新数据库,再删除缓存。
为什么是删除缓存而不是更新缓存?因为更新缓存可能涉及复杂计算,而且若并发写可能产生脏数据。删除缓存是更轻量和安全的选择。
def update_product(product_id, new_data):# 1. 更新数据库save_to_db(product_id, new_data)# 2. 删除缓存r.delete(f'product:{product_id}')print('缓存已删除')4.2 延迟双删
为了防止数据库主从延迟导致的不一致,可以在写入数据库后延迟一段时间再次删除缓存。
def update_product_with_delay(product_id, new_data,delay=0.5):# 1. 删除缓存r.delete(f'product:{product_id}')# 2. 更新数据库save_to_db(product_id, new_data)# 3. 延迟后再次删除缓存threading.Timer(delay, lambda: r.delete(f'product:{product_id}')).start()4.3 最终一致性方案
对于一致性要求极高的场景,可以基于 binlog 异步更新缓存(如使用 Canal + MQ)。这里展示一个简化的消息通知方案:
# 发布方(数据库更新后)def notify_cache_evict(channel, key): r.publish(channel, key)# 订阅方def cache_evict_subscriber(): pubsub=r.pubsub()pubsub.subscribe('cache:evict')formsginpubsub.listen():ifmsg['type']=='message':r.delete(msg['data'])print(f'删除缓存: {msg["data"]}')5. 动手试试
穿透实验:停掉布隆过滤器,用脚本循环请求 1000 个不存在的商品 ID,观察数据库压力;然后开启布隆过滤器对比。
击穿模拟:在 Redis 中设置一个热点键过期时间为 2 秒,同时启动 20 个并发线程读取该键,统计未命中次数。分别用互斥锁和逻辑过期方案对比。
雪崩模拟:设置 100 个键过期时间集中在 5 秒内,观察过期瞬间的数据库请求量;然后改为随机 TTL,观察平滑程度。
一致性验证:模拟并发读写,验证 Cache Aside 模式下删除缓存后可能存在的短期不一致,观察最终结果。
预期效果:布隆过滤器拦截绝大部分无效请求;互斥锁让数据库只查询一次;随机 TTL 使缓存过期平缓分布;Cache Aside 在读多写少下表现优异。
6. 总结
缓存三大难题是面试和生产的常客,理解原理后,用代码将它们一一化解并不复杂。下一篇我们将深入 Redis 的内存管理与淘汰策略,了解当内存满时 Redis 如何优雅地“断舍离”。
想了解更多还可以去各个平台搜索「IT策士」,一起升级 IT 思维 !