celery异步消息队列的使用
吴先生不爱吃辣 人气:01、准备工作
1.1 流程图
2、环境安装
2.1、在Ubuntu中需要安装redis
安装redis $sudo apt-get update $sudo apt-get install redis-server 启动redis $redis-server 连接redis $redis-cli $redis-cli -h ip -a 6379 安装Python操作redis的包 pip install redis 重启redis sudo service reids restart
redis默认绑定的ip为127.0.0.1其他电脑无法访问Ubuntu的redis
重启redis服务 service redis restart
查看绑定端口
在wind上telnet ip 6379 成功说明成功
2.2、安装celery
pip install celery
2、开始使用celery
2、1基本应用
在/home/zbwu103/celery 文件中创建一个tasks.py的任务文件
#task.py from celery import Celery app = Celery('tasks', broker='redis://192.168.1.111', backend='redis://192.168.1.111' #redis://密码@ip ) @app.task def add(x,y): print("running...",x,y) return x+y
在home/zbwu103/celery的目录启动监听任务
#打印日志的模式运行 celery -A tasks worker --loglevel=info
在开一个终端,到/home/zbwu103/celery用Python进入命令行运行
from tasks import add t = add.delay(4,5) #t.result.ready() 查看任务是否完成,完成返回True,未完成返回False #t.get() 返回完成之后的结果 #t.task_id 返回任务的唯一ID号,可以通过ID查询到任务
上面任务都是在终端上运行,如果终端关闭tasks也会终止。
所以需要任务在后台运行
celery multi stop w1 停止 w1
2.2 、在项目中如何使用celery
from __future__ import absolute_import, unicode_literals from celery import Celery app = Celery('my_proj', broker='redis://192.168.1.111', backend='redis://192.168.1.111', include=['myp_roj.tasks']) app.conf.update( result_expires=3600, ) if __name__ == '__main__': app.start()
from __future__ import absolute_import, unicode_literals import subprocess from .celery import app @app.task def add(x,y): return x+y @app.task def run_cmd(cmd): obj = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE) return obj.stdout.read().decode('utf-8')
在my_proj同级目录启动
查看任务启动情况
ps -ef |grep celery
2.3 、celery 定时任务
celery使用beat来执行celert beat 来实现定时任务
worker定时任务
from celery import Celery from celery.schedules import crontab app = Celery('task', broker='redis://192.168.1.111', backend='redis://192.168.1.111') @app.on_after_configure.connect def setup_periodic_tasks(sender, **kwargs): # Calls test('hello') every 10 seconds. sender.add_periodic_task(10.0, test.s('hello'), name='add every 10') # Calls test('world') every 30 seconds sender.add_periodic_task(30.0, test.s('world'), expires=10) # Executes every Monday morning at 7:30 a.m. sender.add_periodic_task( crontab(hour=21, minute=26, day_of_week='Sum'), test.s('Happy Mondays!'), ) @app.task def test(arg): print('runing test.....') print(arg)
启动定时任务
celery -A periodic_task worker
另外开一个任务调度区不断的检测你的任务计划
celery -A periodic_task beat
2.4、celery和django配置一起使用
在setting同级的目录中新建一个celery.py的文件配置celery基本的配置
from __future__ import absolute_import, unicode_literals import os from celery import Celery # set the default Django settings module for the 'celery' program. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'CeleryTest.settings') app = Celery('CeleryTest') # Using a string here means the worker don't have to serialize # the configuration object to child processes. # - namespace='CELERY' means all celery-related configuration keys # should have a `CELERY_` prefix. app.config_from_object('django.conf:settings', namespace='CELERY') # Load task modules from all registered Django app configs. app.autodiscover_tasks() @app.task(bind=True) def debug_task(self): print('Request: {0!r}'.format(self.request))
在setting.py同级的目录配置__init__.py
from __future__ import absolute_import, unicode_literals # This will make sure the app is always imported when # Django starts so that shared_task will use this app. from .celery import app as celery_app __all__ = ['celery_app']
在APP的目录里面新建一个tasks.py的任务来填写任务
#app01/tasks.py # Create your tasks here from __future__ import absolute_import, unicode_literals from celery import shared_task import time @shared_task def add(x, y): print("running task add,我是windows ") time.sleep(1) return x + y @shared_task def mul(x, y): return x * y @shared_task def xsum(numbers): return sum(numbers)
从views中调用任务
/app01/view.py from django.shortcuts import render,HttpResponse from app01 import tasks from celery.result import AsyncResult def index(request): res = tasks.add.delay(5,999) print("res:",res) print(res.status) # import pdb # pdb.set_trace() return HttpResponse(res.task_id) def task_res(request): #通过ID获取结果 result = AsyncResult(id="be4933c0-ed9b-4a04-ade8-79f4c57cfc74") #return HttpResponse(result.get()) return HttpResponse(result.status)
加载全部内容