EVE Light on Life

实现一个基于rq的延迟队列

RQ是用Python实现的一个简单队列任务, 但是没有延迟执行任务的功能, 例如一些业务场景

  • 订单未支付需要30分钟后取消
  • 某项操作成功1分钟后发送通知

等。

像这些任务是需要延迟操作。

我扩展了RQ中的Queue类,新增一个名为DelayQueue的类,用于存放延迟任务的队列,依旧使用Redis存储任务队列信息。

另外使用一个Timer类不断轮询延迟任务的队列,将到期的任务取出,并放入相应的RQ队列,由原有的Worker执行。

即需要另外开一个进程运行Timer的轮询操作。

该项目已放到Github: https://github.com/cocoakekeyu/delay-rq

下面是实现细节:

DelayQueue

原有的Queue入队操作是这样:

q = Queue('low')
q.enqueue(some_func, bar='some value')

DelayQueue继承了Queue增加一个delay参数,代表该任务要延迟执行的秒数, 其余参数与Queue一样。如下操作:

q = DelayQueue('low')
q.enqueue(some_func, bar='some value', delay=30)

Queue类的enqueue方法原型为: def enqueue(self, f, *args, **kwargs)

enqueue方法只是将f函数的参数与队列的参数分离出来,并调用enqueue_call方法进行实际的入队操作。

enqueue_call方法有确定的keyword参数,再增加一个delay参数,并根据delay的值做相应的操作。

所以enqueue和enqueue_call方法都重写了。

# enqueue
def enqueue(self, f, *args, **kwargs):
    # 略...

    # 提取delay参数
    delay = kwargs.pop('delay', 0)

    # 传入enqueue_call方法
    return self.enqueue_call(func=f, args=args, kwargs=kwargs, delay=delay,
                             timeout=timeout, result_ttl=result_ttl, ttl=ttl,
                             description=description, depends_on=depends_on,
                             job_id=job_id, at_front=at_front, meta=meta)

# enqueue_call
def enqueue_call(self, func, args=None, kwargs=None, timeout=None,
                 result_ttl=None, ttl=None, description=None, delay=0,
                 depends_on=None, job_id=None, at_front=False, meta=None):
    # 略...
    # 计算到期时间
    ts = time.time() + delay

    # 创建 job = self.job_class.create(...)

    # 这里判断delay大于0, 放入延迟队列而不是原有的RQ队列,并立即返回
    if delay > 0:
        return self.enqueue_delay_job(job, ts)
    # ...                          

新增的enqueue_delay_job方法将要延迟执行的任务放入延迟队列。

延迟队列由Redis的有序集合实现,存储job_id和它的到期时间,届时Timer轮询延迟队列的时候,每次判断有序集合中的第一个任务到期时间然后放入相应的RQ队列。

DelayQueue在__init__的时候设置一个delay_key的属性,值为延迟队列的键名,例如”rq:delay_queue:default”。

enqueue_delay_job实现:

def enqueue_delay_job(self, job, ts):
    conn = self.connection
    conn.zadd(self.delay_key, **{job.id: ts})
    job.save()
    return job

Timer

Timer继承RQ的Worker类,这样可以利用原有Worker类实现的一些异常处理。

它处理的队列应有delay_key属性代表Redis的有序集合。

用法类似Worker:

q = DelayQueue()
Timer(q).work()

主要的轮询队列功能写在work方法:

def work(self):
    # 略...
    try:
        while True:
            try:
                result = self.dequeue_delay_job()
                if not result:
                    time.sleep(.01)
                    continue
                queue, job = result
                self.process_enqueue(queue, job)
            except StopRequested:
                break
    finally:
        self.register_death()

dequeue_delay_job方法尝试用于从Timer处理的所有延迟队列中取出到期的任务。

def dequeue_delay_job(self):
    conn = self.connection
    for q in self.queues:
        item = conn.zrange(q.delay_key, 0, 0, withscores=True)
        if item and item[0][1] < time.time():
            job_id = item[0][0].decode('utf-8')
            job = self.job_class.fetch(job_id)
            return q, job
    else:
        return None

取出任务后使用process_enqueue方法将任务放入RQ队列由Worker执行:

def process_enqueue(self, queue, job):
    conn = self.connection
    try:
        with SimpleLock(conn, job.id):
            if conn.zrem(queue.delay_key, job.id):
                queue.enqueue_job(job)
                self.log.info('Enqueue delay job: {}'.format(blue(job.id)))
    except NoLock:
        pass

这里用Redis的setnx方法实现一个简单的锁,防止多个Timer实例对同一个任务执行入队操作。

SimpleLock源码可以直接看这: https://github.com/cocoakekeyu/delay-rq/blob/master/delayrq/lock.py