原文出处:老钱的 Redis 深度历险:核心原理与应用实践
# coding: utf8
import time
class Funnel(object):
def __init__(self, capacity, leaking_rate):
self.capacity = capacity # 漏斗容量
self.leaking_rate = leaking_rate # 漏嘴流水速率
self.left_quota = capacity # 漏斗剩余空间
self.leaking_ts = time.time() # 上一次漏水时间
def make_space(self):
now_ts = time.time()
delta_ts = now_ts - self.leaking_ts # 距离上一次漏水过去了多久
delta_quota = delta_ts * self.leaking_rate # 又可以腾出不少空间了
if delta_quota < 1: # 腾的空间太少,那就等下次吧
return
self.left_quota += delta_quota # 增加剩余空间
self.leaking_ts = now_ts # 记录漏水时间
if self.left_quota > self.capacity: # 剩余空间不得高于容量
self.left_quota = self.capacity
def watering(self, quota):
self.make_space()
if self.left_quota >= quota: # 判断剩余空间是否足够
self.left_quota -= quota
return True
return False
funnels = {} # 所有的漏斗
# capacity 漏斗容量
# leaking_rate 漏嘴流水速率 quota/s
def is_action_allowed(
user_id, action_key, capacity, leaking_rate):
key = '%s:%s' % (user_id, action_key)
funnel = funnels.get(key)
if not funnel:
funnel = Funnel(capacity, leaking_rate)
funnels[key] = funnel
return funnel.watering(1)
for i in range(20):
print is_action_allowed('laoqian', 'reply', 15, 0.5)
- 关于思路:
Funnel 对象的 make_space 方法是漏斗算法的核心,其在每次灌水前都会被调用以触发漏水,给漏斗腾出空间来。能腾出多少空间取决于- 过去了多久,以及
- 流水的速率。
Funnel 对象占据的空间大小不再和行为的频率成正比,它的空间占用是一个常量。
- 关于代码:
将 Funnel 对象的内容按字段存储到一个 hash 结构中,灌水的时候将 hash 结构的字段取出来进行逻辑运算后,再将新值回填到 hash 结构中就完成了一次行为频度的检测。 - 存在的问题:
但是有个问题,我们无法保证整个过程的原子性。从 hash 结构中取值,然后在内存里运算,再回填到 hash 结构,这三个过程无法原子化,意味着需要进行适当的加锁控制。而一旦加锁,就意味着会有加锁失败,加锁失败就需要选择重试或者放弃。
如果重试的话,就会导致性能下降。如果放弃的话,就会影响用户体验。同时,代码的复杂度也跟着升高很多。这真是个艰难的选择,我们该如何解决这个问题呢?Redis-Cell 救星来了! - Redis-Cell
Redis 4.0 提供了一个限流 Redis 模块,它叫 redis-cell。该模块也使用了漏斗算法,并提供了原子的限流指令。有了这个模块,限流问题就非常简单了。
该模块只有1条指令cl.throttle,它的参数和返回值都略显复杂,接下来让我们来看看这个指令具体该如何使用。
> cl.throttle laoqian:reply 15 30 60 1
▲ ▲ ▲ ▲ ▲
| | | | └───── need 1 quota (可选参数,默认值也是1)
| | └──┴─────── 30 operations / 60 seconds 这是漏水速率
| └───────────── 15 capacity 这是漏斗容量
└─────────────────── key laoqian
在执行限流指令时,如果被拒绝了,就需要丢弃或重试。cl.throttle 指令考虑的非常周到,连重试时间都帮你算好了,直接取返回结果数组的第四个值进行 sleep 即可,如果不想阻塞线程,也可以异步定时任务来重试。