Celery 重复执行同一个任务 使用 Celery Once 来防止 Celery 重复执行同一个任务
Step_Top 人气:0想了解使用 Celery Once 来防止 Celery 重复执行同一个任务的相关内容吗,Step_Top在本文为您仔细讲解Celery 重复执行同一个任务的相关知识和一些Code实例,欢迎阅读和指正,我们先划重点:Celery,重复执行同一个任务,Celery,重复执行,下面大家一起来学习吧。
在使用 Celery 的时候发现有的时候 Celery 会将同一个任务执行两遍,我遇到的情况是相同的任务在不同的 worker 中被分别执行,并且时间只相差几毫秒。这问题我一直以为是自己哪里处理的逻辑有问题,后来发现其他人 也有类似的问题,然后基本上出问题的都是使用 Redis 作为 Broker 的,而我这边一方面不想将 Redis 替换掉,就只能在 task 执行的时候加分布式锁了。
不过在 Celery 的 issue 中搜索了一下,有人使用 Redis 实现了分布式锁,然后也有人使用了 Celery Once。 大致看了一下 Celery Once ,发现非常符合现在的情况,就用了下。
Celery Once 也是利用 Redis 加锁来实现, Celery Once 在 Task 类基础上实现了 QueueOnce 类,该类提供了任务去重的功能,所以在使用时,我们自己实现的方法需要将 QueueOnce 设置为 base
@task(base=QueueOnce, once={'graceful': True})
后面的 once 参数表示,在遇到重复方法时的处理方式,默认 graceful 为 False,那样 Celery 会抛出 AlreadyQueued 异常,手动设置为 True,则静默处理。
另外如果要手动设置任务的 key,可以指定 keys 参数
@celery.task(base=QueueOnce, once={'keys': ['a']}) def slow_add(a, b): sleep(30) return a + b
总得来说,分为几步
第一步,安装
pip install -U celery_once
第二步,增加配置
from celery import Celery from celery_once import QueueOnce from time import sleep celery = Celery('tasks', broker='amqp://guest@localhost//') celery.conf.ONCE = { 'backend': 'celery_once.backends.Redis', 'settings': { 'url': 'redis://localhost:6379/0', 'default_timeout': 60 * 60 } }
第三步,修改 delay 方法
example.delay(10) # 修改为 result = example.apply_async(args=(10))
第四步,修改 task 参数
@celery.task(base=QueueOnce, once={'graceful': True, keys': ['a']}) def slow_add(a, b): sleep(30) return a + b
参考链接 https://github.com/cameronmaske/celery-once
加载全部内容