Python定时任务APScheduler
Yunlord 人气:0前言
在日常工作中,如果想要简化工作流程实现办公自动化,那么几乎有大半的功能模块都需要使用定时任务,例如定时收发邮件,或者定时发微信或是检测垃圾邮件等等,而在python中常用实现定时任务的包含以下四种方法:
- while True: + sleep()
- threading.Timer定时器
- 调度模块schedule
- 任务框架APScheduler
但在实际测试中,可以发现:
循环+sleep方式可以用来做简单测试。
timer可以实现异步定时任务。
schedule可以定点定时执行,但是仍然需要while Ture配合,而且占用内存大。
APScheduler框架更加强大,可以直接在里面添加定点与定时任务,无可挑剔!!!
所以接下来讲介绍如何使用APscheduler实现python定时任务!!!
一、APscheduler简介
APscheduler全称Advanced Python Scheduler,作用为在指定的时间规则执行指定的作业,其是基于Quartz的一个Python定时任务框架,实现了Quartz的所有功能,使用起来十分方便。提供了基于日期、固定时间间隔以及crontab类型的任务,并且可以持久化任务。基于这些功能,我们可以很方便的实现一个python定时任务系统。
二、APscheduler安装
我们通过Anaconda管理虚拟环境,并进行APscheduler库的安装。
pip install apscheduler
三、APscheduler组成部分
触发器(trigger):包含调度逻辑,每一个作业有它自己的触发器,用于决定接下来哪一个作业会运行。除了他们自己初始配置意外,触发器完全是无状态的。
作业存储(job store):存储被调度的作业,默认的作业存储是简单地把作业保存在内存中,其他的作业存储是将作业保存在数据库中。一个作业的数据讲在保存在持久化作业存储时被序列化,并在加载时被反序列化。调度器不能分享同一个作业存储。
执行器(executor):处理作业的运行,他们通常通过在作业中提交制定的可调用对象到一个线程或者进城池来进行。当作业完成时,执行器将会通知调度器。
调度器(scheduler):其他的组成部分。通常在应用只有一个调度器,应用的开发者通常不会直接处理作业存储、调度器和触发器,相反,调度器提供了处理这些的合适的接口。配置作业存储和执行器可以在调度器中完成,例如添加、修改和移除作业。
1.Job 作业
作用:
Job作为APScheduler最小执行单位。
创建Job时指定执行的函数,函数中所需参数,Job执行时的一些设置信息。
构建说明:
id:指定作业的唯一ID
name:指定作业的名字
trigger:apscheduler定义的触发器,用于确定Job的执行时间,根据设置的trigger规则,计算得到下次执行此job的
时间, 满足时将会执行
executor:apscheduler定义的执行器,job创建时设置执行器的名字,根据字符串你名字到scheduler获取到执行此
job的 执行器,执行job指定的函数
max_instances:执行此job的最大实例数,executor执行job时,根据job的id来计算执行次数,根据设置的最大实例数
来确定是否可执行
next_run_time:Job下次的执行时间,创建Job时可以指定一个时间[datetime],不指定的话则默认根据trigger获取触
发时间
misfire_grace_time:Job的延迟执行时间,例如Job的计划执行时间是21:00:00,但因服务重启或其他原因导致
21:00:31才执行,如果设置此key为40,则该job会继续执行,否则将会丢弃此job
coalesce:Job是否合并执行,是一个bool值。例如scheduler停止20s后重启启动,而job的触发器设置为5s执行
一次,因此此job错过了4个执行时间,如果设置为是,则会合并到一次执行,否则会逐个执行
func:Job执行的函数
args:Job执行函数需要的位置参数
kwargs:Job执行函数需要的关键字参数
2.Trigger 触发器
包含调度逻辑,每一个作业有它自己的触发器,用于决定接下来哪一个作业会运行。除了它们自己初始配置以外,触发器完全是无状态的。
APScheduler 有三种内建的 trigger:
- date: 特定的时间点触发
- interval: 固定时间间隔触发
- cron: 在特定时间周期性地触发
触发器就是根据你指定的触发方式,比如是按照时间间隔,还是按照 crontab触发,触发条件是什么等。每个任务都有自己的触发器。
3.Jobstore 作业存储
如果你的应用在每次启动的时候都会重新创建作业,那么使用默认的作业存储器(MemoryJobStore)即可,但是如果你需要在调度器重启或者应用程序奔溃的情况下任然保留作业,你应该根据你的应用环境来选择具体的作业存储器。例如:使用Mongo或者SQLAlchemy JobStore (用于支持大多数RDBMS)。
任务存储器是可以存储任务的地方,默认情况下任务保存在内存,也可将任务保存在各种数据库中。任务存储进去后,会进行序列化,然后也可以反序列化提取出来,继续执行。
4.Executor 执行器
Executor在scheduler中初始化,另外也可通过scheduler的add_executor动态添加Executor。
每个executor都会绑定一个alias,这个作为唯一标识绑定到Job,在实际执行时会根据Job绑定的executor。找到实际的执行器对象,然后根据执行器对象执行Job。
Executor的种类会根据不同的调度来选择,如果选择AsyncIO作为调度的库,那么选择AsyncIOExecutor,如果选择tornado作为调度的库,选择TornadoExecutor,如果选择启动进程作为调度,选择ThreadPoolExecutor或者ProcessPoolExecutor都可以。
Executor的选择需要根据实际的scheduler来选择不同的执行器。
处理作业的运行,它们通常通过在作业中提交制定的可调用对象到一个线程或者进城池来进行。当作业完成时,执行器将会通知调度器。
5.scheduler 调度器
Scheduler是APScheduler的核心,所有相关组件通过其定义。scheduler启动之后,将开始按照配置的任务进行调度。除了依据所有定义Job的trigger生成的将要调度时间唤醒调度之外。当发生Job信息变更时也会触发调度。
scheduler可根据自身的需求选择不同的组件,如果是使用AsyncIO则选择AsyncIOScheduler,使用tornado则选择TornadoScheduler。
任务调度器是属于整个调度的总指挥官。它会合理安排作业存储器、执行器、触发器进行工作,并进行添加和删除任务等。调度器通常是只有一个的。开发人员很少直接操作触发器、存储器、执行器等。因为这些都由调度器自动来实现了。
四、Scheduler工作流程图
1.Scheduler添加job流程
2.Scheduler调度流程
五、APscheduler使用
1.简单应用
import time from apscheduler.schedulers.blocking import BlockingScheduler def my_job(i): print (i) sched = BlockingScheduler() sched.add_job(my_job, 'interval', seconds=5,values=['学会了']) sched.start()
输出:
2.操作作业
2.1 date触发器
date 是最基本的一种调度,作业任务只会执行一次。它表示特定的时间点触发。它的参数如下:
参数 | 说明 |
run_date(datetime or str) | 任务运行的日期或者时间 |
timezone(datetime.tzinfo or str) | 指定时区 |
import datetime from apscheduler.schedulers.blocking import BlockingScheduler scheduler = BlockingScheduler() def my_job(text): print(text) # datetime类型(用于精确时间) scheduler.add_job(my_job, 'date', run_date=datetime(2022, 4, 25, 17, 30, 5), args=['测试任务']) scheduler.start()
注意:run_date参数可以是date类型、datetime类型或文本类型。
2.2 interval触发器
固定时间间隔触发。interval 间隔调度,参数如下:
参数 | 说明 |
weeks(int) | 间隔几周 |
days(int) | 间隔几天 |
hours(int) | 间隔几小时 |
minutes(int) | 间隔几分钟 |
seconds(int) | 间隔多少秒 |
start_date(datetime or str) | 开始日期 |
end_date(datetime or str) | 结束日期 |
timezone(datetime.tzinfo or str) | 时区 |
from datetime import datetime from apscheduler.schedulers.blocking import BlockingScheduler def job_func(): print("当前时间:", datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f") scheduler = BlockingScheduler() # 每2小时触发 scheduler.add_job(job_func, 'interval', hours=2) # 在 2019-04-15 17:00:00 ~ 2019-12-31 24:00:00 之间, 每隔两分钟执行一次 job_func 方法 scheduler .add_job(job_func, 'interval', minutes=2, start_date='2022-04-29 17:00:00' , end_date='2022-12-31 24:00:00') scheduler.start()
jitter振动参数,给每次触发添加一个随机浮动秒数,一般适用于多服务器,避免同时运行造成服务拥堵。
# 每小时(上下浮动120秒区间内)运行`job_function` scheduler.add_job(job_func, 'interval', hours=1, jitter=120)
2.3 cron触发器
在特定时间周期性地触发,和Linux crontab格式兼容。它是功能最强大的触发器。
cron 参数:
参数 | 说明 |
year(int or str) | 年,4位数字 |
month(int or str) | 月(范围1-12) |
day(int or str) | 日(范围1-31) |
week(int or str) | 周(范围1-53) |
day_of_week(int or str) | 周内第几天或者星期几(范围0-6或者mon,tue,wed,thu,fri,stat,sun) |
hour(int or str) | 时(0-23) |
minute(int or str) | 分(0-59) |
second(int or str) | 秒(0-59) |
start_date(datetime or str) | 最早开始日期(含) |
end_date(datetime or str) | 最晚结束日期(含) |
timezone(datetime.tzinfo or str) | 指定时区 |
表达式类型
表达式 | 参数类型 | 描述 |
---|---|---|
* | 所有 | 通配符。例:minutes=* 即每分钟触发 |
*/a | 所有 | 可被a整除的通配符。 |
a-b | 所有 | 范围a-b触发 |
a-b/c | 所有 | 范围a-b,且可被c整除时触发 |
xth y | 日 | 第几个星期几触发。x为第几个,y为星期几 |
last x | 日 | 一个月中,最后的星期几触发 |
last | 日 | 一个月最后一天触发 |
x,y,z | 所有 | 组合表达式,可以组合确定值或上方的表达式 |
import datetime from apscheduler.schedulers.background import BackgroundScheduler def job_func(text): print("当前时间:", datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]) scheduler = BackgroundScheduler() # 在每年 1-3、7-9 月份中的每个星期一、二中的 00:00, 01:00, 02:00 和 03:00 执行 job_func 任务 scheduler .add_job(job_func, 'cron', month='1-3,7-9',day='0, tue', hour='0-3') scheduler.start()
参考:
加载全部内容