Celery Docker 处理Django定期任务 怎样使用Celery和Docker处理Django中的定期任务
mindg 人气:0在构建和扩展Django应用程序时,不可避免地需要定期在后台自动运行某些任务。
一些例子:
生成定期报告
清除缓存
发送批量电子邮件通知
执行每晚维护工作
这是构建和扩展不属于Django核心的Web应用程序所需的少数功能之一。幸运的是,Celery提供了一个强大的解决方案,该解决方案非常容易实现,称为Celery Beat。
在下面的文章中,我们将向您展示如何使用Docker设置Django,Celery和Redis,以便通过Celery Beat定期运行自定义Django Admin命令。
依存关系:
Django v3.0.5
Docker v19.03.8
Python v3.8.2
芹菜v4.4.1
Redis v5.0.8
Django + Celery系列:
Django和Celery的异步任务
使用Celery和Docker处理Django中的定期任务(本文!)
目标
在本教程结束时,您应该能够:
使用Docker容器化Django,Celery和Redis
将Celery集成到Django应用中并创建任务
编写自定义Django Admin命令
安排自定义Django Admin命令以通过Celery Beat定期运行
项目设置
从django-celery-beat
存储库中克隆基础项目,然后签出基础分支:
$ git clone
https://github.com/testdrivenio/django-celery-beat
--branch base --single-branch
$ cd django-celery-beat
由于我们总共需要管理四个流程(Django,Redis,worker和Scheduler),因此我们将使用Docker通过连接起来简化它们的工作流程,从而使它们都可以通过一个命令从一个终端窗口运行 。
从项目根目录创建映像,并启动Docker容器:
$ docker-compose up -d --build $ docker-compose exec web python manage.py migrate
构建完成后,导航至http:// localhost:1337以确保该应用程序能够按预期运行。 您应该看到以下文本:
Orders
No orders found!
项目结构:
├── .gitignore
├── docker-compose.yml
└── project
├── Dockerfile
├── core
│ ├── __init__.py
│ ├── asgi.py
│ ├── settings.py
│ ├── urls.py
│ └── wsgi.py
├── entrypoint.sh
├── manage.py
├── orders
│ ├── __init__.py
│ ├── admin.py
│ ├── apps.py
│ ├── migrations
│ │ ├── 0001_initial.py
│ │ └── __init__.py
│ ├── models.py
│ ├── tests.py
│ ├── urls.py
│ └── views.py
├── requirements.txt
└── templates
└── orders
└── order_list.html
Celery和Redis
现在,我们需要为Celery,Celery Beat和Redis添加容器。
首先,将依赖项添加到requirements.txt文件中:
Django==3.0.5 celery==4.4.1 redis==3.4.1
docker-compose.yml
文件内容:
redis: image: redis:alpine celery: build: ./project command: celery -A core worker -l info volumes: - ./project/:/usr/src/app/ environment: - DEBUG=1 - SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m - DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1] depends_on: - redis celery-beat: build: ./project command: celery -A core beat -l info volumes: - ./project/:/usr/src/app/ environment: - DEBUG=1 - SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m - DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1] depends_on: - redis
我们还需要更新Web服务的depends_on部分:
web: build: ./project command: python manage.py runserver 0.0.0.0:8000 volumes: - ./project/:/usr/src/app/ ports: - 1337:8000 environment: - DEBUG=1 - SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m - DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1] depends_on: - redis # NEW
完整的docker-compose文件如下:
version: '3.7' services: web: build: ./project command: python manage.py runserver 0.0.0.0:8000 volumes: - ./project/:/usr/src/app/ ports: - 1337:8000 environment: - DEBUG=1 - SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m - DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1] depends_on: - redis redis: image: redis:alpine celery: build: ./project command: celery -A core worker -l info volumes: - ./project/:/usr/src/app/ environment: - DEBUG=1 - SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m - DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1] depends_on: - redis celery-beat: build: ./project command: celery -A core beat -l info volumes: - ./project/:/usr/src/app/ environment: - DEBUG=1 - SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m - DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1] depends_on: - redis
在构建新容器之前,我们需要在Django应用中配置Celery。
芹菜配置
设定
在“核心”目录中,创建一个celery.py文件并添加以下代码:
import os from celery import Celery os.environ.setdefault("DJANGO_SETTINGS_MODULE", "core.settings") app = Celery("core") app.config_from_object("django.conf:settings", namespace="CELERY") app.autodiscover_tasks()
这里发生了什么事?
首先,我们为DJANGO_SETTINGS_MODULE环境变量设置一个默认值,以便Celery知道如何找到Django项目。
接下来,我们创建了一个名称为core的新Celery实例,并将该值分配给名为app的变量。
然后,我们从django.conf的settings对象中加载了celery配置值。 我们使用namespace =“ CELERY”来防止与其他Django设置发生冲突。 换句话说,Celery的所有配置设置必须以CELERY_为前缀。
最后,app.autodiscover_tasks()
告诉Celery从settings.INSTALLED_APPS中定义的应用程序中查找Celery任务。
将以下代码添加到core / __ init__.py:
from .celery import app as celery_app __all__ = ("celery_app",)
最后,使用以下Celery设置更新core / settings.py文件,使其可以连接到Redis:
CELERY_BROKER_URL = "redis://redis:6379" CELERY_RESULT_BACKEND = "redis://redis:6379"
build:
$ docker-compose up -d --build
查看日志:
$ docker-compose logs 'web' $ docker-compose logs 'celery' $ docker-compose logs 'celery-beat' $ docker-compose logs 'redis'
如果一切顺利,我们现在有四个容器,每个容器提供不同的服务。
现在,我们准备创建一个示例任务,以查看其是否可以正常工作。
创建一个任务
创建一个新文件core / tasks.py并为仅打印到控制台的示例任务添加以下代码:
from celery import shared_task @shared_task def sample_task(): print("The sample task just ran.")
安排任务
在settings.py文件的末尾,添加以下代码,以使用Celery Beat将sample_task安排为每分钟运行一次:
CELERY_BEAT_SCHEDULE = { "sample_task": { "task": "core.tasks.sample_task", "schedule": crontab(minute="*/1"), }, }
在这里,我们使用CELERY_BEAT_SCHEDULE设置定义了定期任务。 我们给任务命名了sample_task,然后声明了两个设置:
任务声明要运行的任务。
时间表设置任务应运行的时间间隔。 这可以是整数,时间增量或crontab。 我们在任务中使用了crontab模式,告诉它每分钟运行一次。 您可以在此处找到有关Celery日程安排的更多信息。
确保添加导入:
from celery.schedules import crontab import core.tasks
重启容器,应用变更:
$ docker-compose up -d --build
查看日志:
$ docker-compose logs -f 'celery' celery_1 | -------------- [queues] celery_1 | .> celery exchange=celery(direct) key=celery celery_1 | celery_1 | celery_1 | [tasks] celery_1 | . core.tasks.sample_task
我们可以看到Celery获得了示例任务core.tasks.sample_task。
每分钟,您应该在日志中看到一行以“示例任务刚刚运行”结尾的行:
celery_1 | [2020-04-15 22:49:00,003: INFO/MainProcess]
Received task: core.tasks.sample_task[8ee5a84f-c54b-4e41-945b-645765e7b20a]
celery_1 | [2020-04-15 22:49:00,007: WARNING/ForkPoolWorker-1] The sample task just ran.
自定义Django Admin命令
Django提供了许多内置的django-admin命令,例如:
迁移
启动项目
startapp
转储数据
移民
除了内置命令,Django还为我们提供了创建自己的自定义命令的选项:
自定义管理命令对于运行独立脚本或从UNIX crontab或Windows计划任务控制面板定期执行的脚本特别有用。
因此,我们将首先配置一个新命令,然后使用Celery Beat自动运行它。
首先创建一个名为orders / management / commands / my_custom_command.py的新文件。 然后,添加运行它所需的最少代码:
from django.core.management.base import BaseCommand, CommandError class Command(BaseCommand): help = "A description of the command" def handle(self, *args, **options): pass
BaseCommand有一些可以被覆盖的方法,但是唯一需要的方法是handle。 handle是自定义命令的入口点。 换句话说,当我们运行命令时,将调用此方法。
为了进行测试,我们通常只添加一个快速打印语句。 但是,建议根据Django文档使用stdout.write代替:
当您使用管理命令并希望提供控制台输出时,应该写入self.stdout和self.stderr,而不是直接打印到stdout和stderr。 通过使用这些代理,测试自定义命令变得更加容易。 另请注意,您无需以换行符结束消息,除非您指定结束参数,否则它将自动添加。
因此,添加一个self.stdout.write命令:
from django.core.management.base import BaseCommand, CommandError class Command(BaseCommand): help = "A description of the command" def handle(self, *args, **options): self.stdout.write("My sample command just ran.") # NEW
测试:
$ docker-compose exec web python manage.py my_custom_command My sample command just ran.
这样,让我们将所有内容捆绑在一起!
使用Celery Beat安排自定义命令
现在我们已经启动并运行了容器,已经过测试,可以安排任务定期运行,并编写了自定义的Django Admin示例命令,现在该进行设置以定期运行自定义命令了。
设定
在项目中,我们有一个非常基本的应用程序,称为订单。 它包含两个模型,产品和订单。 让我们创建一个自定义命令,该命令从当天发送确认订单的电子邮件报告。
首先,我们将通过此项目中包含的夹具将一些产品和订单添加到数据库中:
$ docker-compose exec web python manage.py loaddata products.json
创建超级用户:
$ docker-compose exec web python manage.py createsuperuser
出现提示时,请填写用户名,电子邮件和密码。 然后在您的Web浏览器中导航到http://127.0.0.1:1337/admin。 使用您刚创建的超级用户登录,并创建几个订单。 确保至少有一个日期为今天。
让我们为我们的电子邮件报告创建一个新的自定义命令。
创建一个名为orders / management / commands / email_report.py的文件:
from datetime import timedelta, time, datetime from django.core.mail import mail_admins from django.core.management import BaseCommand from django.utils import timezone from django.utils.timezone import make_aware from orders.models import Order today = timezone.now() tomorrow = today + timedelta(1) today_start = make_aware(datetime.combine(today, time())) today_end = make_aware(datetime.combine(tomorrow, time())) class Command(BaseCommand): help = "Send Today's Orders Report to Admins" def handle(self, *args, **options): orders = Order.objects.filter(confirmed_date__range=(today_start, today_end)) if orders: message = "" for order in orders: message += f"{order} \n" subject = ( f"Order Report for {today_start.strftime('%Y-%m-%d')} " f"to {today_end.strftime('%Y-%m-%d')}" ) mail_admins(subject=subject, message=message, html_message=None) self.stdout.write("E-mail Report was sent.") else: self.stdout.write("No orders confirmed today.")
在代码中,我们向数据库查询了日期为Confirmed_date的订单,将订单合并为电子邮件正文的单个消息,然后使用Django内置的mail_admins命令将电子邮件发送给管理员。
添加一个虚拟管理员电子邮件,并将EMAIL_BACKEND设置为使用控制台后端,以便将该电子邮件发送到设置文件中的stdout:
EMAIL_BACKEND = "django.core.mail.backends.console.EmailBackend" DEFAULT_FROM_EMAIL = "noreply@email.com" ADMINS = [("testuser", "test.user@email.com"), ]
运行:
$ docker-compose exec web python manage.py email_report
Content-Type: text/plain; charset="utf-8"
MIME-Version: 1.0
Content-Transfer-Encoding: 7bit
Subject: [Django] Order Report for 2020-04-15 to 2020-04-16
From: root@localhost
To: test.user@email.com
Date: Wed, 15 Apr 2020 23:10:45 -0000
Message-ID: <158699224565.85.8278261495663971825@5ce6313185d3>
Order: 337ef21c-5f53-4761-9f81-07945de385ae - product: Rice
-------------------------------------------------------------------------------
E-mail Report was sent.
Celery Beat
现在,我们需要创建一个定期任务来每天运行此命令。
向core / tasks.py添加一个新任务:
from celery import shared_task from django.core.management import call_command # NEW @shared_task def sample_task(): print("The sample task just ran.") # NEW @shared_task def send_email_report(): call_command("email_report", )
因此,首先我们添加了一个call_command导入,该导入用于以编程方式调用django-admin命令。 在新任务中,然后将call_command与自定义命令的名称一起用作参数。
要安排此任务,请打开core / settings.py文件,并更新CELERY_BEAT_SCHEDULE设置以包括新任务。
CELERY_BEAT_SCHEDULE = { "sample_task": { "task": "core.tasks.sample_task", "schedule": crontab(minute="*/1"), }, "send_email_report": { "task": "core.tasks.send_email_report", "schedule": crontab(hour="*/1"), }, }
在这里,我们向CELERY_BEAT_SCHEDULE添加了一个名为send_email_report的新条目。 正如我们对上一个任务所做的那样,我们声明了该任务应运行的任务-例如core.tasks.send_email_report-并使用crontab模式设置重复性。
重新启动容器,以确保新设置处于活动状态:
$ docker-compose up -d --build 看日志: $ docker-compose logs -f 'celery' celery_1 | -------------- [queues] celery_1 | .> celery exchange=celery(direct) key=celery celery_1 | celery_1 | celery_1 | [tasks] celery_1 | . core.tasks.sample_task celery_1 | . core.tasks.send_email_report
一分钟后邮件发出:
celery_1 | [2020-04-15 23:20:00,309: WARNING/ForkPoolWorker-1] Content-Type: text/plain; charset="utf-8"
celery_1 | MIME-Version: 1.0
celery_1 | Content-Transfer-Encoding: 7bit
celery_1 | Subject: [Django] Order Report for 2020-04-15 to 2020-04-16
celery_1 | From: root@localhost
celery_1 | To: test.user@email.com
celery_1 | Date: Wed, 15 Apr 2020 23:20:00 -0000
celery_1 | Message-ID: <158699280030.12.8934112422500683251@42481c198b77>
celery_1 |
celery_1 | Order: 337ef21c-5f53-4761-9f81-07945de385ae - product: Rice
celery_1 | [2020-04-15 23:20:00,310: WARNING/ForkPoolWorker-1] -------------------------------------------------------------------------------
celery_1 | [2020-04-15 23:20:00,312: WARNING/ForkPoolWorker-1] E-mail Report was sent.
结论
在本文中,我们指导您为Celery,Celery Beat和Redis设置Docker容器。 然后,我们展示了如何使用Celery Beat创建自定义Django Admin命令和定期任务以自动运行该命令。
原文:https://testdriven.io/blog/django-celery-periodic-tasks/
加载全部内容