Spring动态管理定时任务之ThreadPoolTaskScheduler解读
DayDayUp丶 人气:0Spring动态管理定时任务ThreadPoolTaskScheduler
Spring任务调度核心类ThreadPoolTaskScheduler,API文档解释如下:
Implementation of Spring's TaskScheduler interface, wrapping a native java.util.concurrent.ScheduledThreadPoolExecutor.
Spring的TaskScheduler接口的实现,包装了一个本地java.util.concurrent.ScheduledThreadPoolExecutor。
实现思路
注入调度类bean,初始化一个ConcurrentHashMap容器,用来保存多个定时任务的状态,每一个任务的运行状态被封装在ScheduledFuture中,借此类可取消对应的定时任务。
import java.time.LocalDateTime; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledFuture; import javax.annotation.Resource; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import org.springframework.scheduling.support.CronTrigger; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import com.cjia.spidercommon.model.SpiderJob; import com.cjia.spiderjob.mapper.SpiderJobMapper; import lombok.Data; import lombok.extern.slf4j.Slf4j; /** * 用来管理(启动、停止、新增、删除、更新编辑、查看运行状态)定时任务(增量任务) */ @Slf4j @RestController @RequestMapping("spiderJob/cron") public class CronJobController extends SpiderJobController { @Autowired private ThreadPoolTaskScheduler threadPoolTaskScheduler; private Map<Integer, ScheduledFuture<?>> futureMap = new ConcurrentHashMap<>(); @Resource private SpiderJobMapper spiderJobMapper; @Bean public ThreadPoolTaskScheduler threadPoolTaskScheduler() { return new ThreadPoolTaskScheduler(); } /** * 启动单个定时任务 */ @RequestMapping("/start/{jobId}") public String start(@PathVariable Integer jobId) { SpiderJob job = spiderJobMapper.selectById(jobId); if (job == null) { log.warn("任务[{}]已不存在,无法启动!", jobId); return "任务[" + jobId + "]已不存在,无法启动!"; } int enable = job.getEnable(); if (enable == 0) { log.warn("任务[{}]已被禁用,无法启动!", jobId); return "任务[" + jobId + "]已被禁用,无法启动!"; } // 检测该任务是否已在运行调度中 if (futureMap.get(jobId) != null) { log.warn("任务[{}]已在调度运行,无法重复启动!", jobId); return "任务[" + jobId + "]已在调度运行,无法重复启动!"; } String cron = job.getCron(); // TODO check cron ScheduledFuture<?> future = threadPoolTaskScheduler.schedule(new MyRunnable(job), new CronTrigger(cron)); log.info("任务[{}]已被启动!", jobId); futureMap.put(jobId, future); return "任务[" + jobId + "]已被启动!"; } /** * 批量启动定时任务 */ @RequestMapping("/startBatch/{jobIds}") public String startBatch(@PathVariable String jobIds) { // TODO jobIds valid String[] jobIdsArr = jobIds.split(","); StringBuffer sb = new StringBuffer(); for (String jobId : jobIdsArr) { String result = start(Integer.valueOf(jobId)); sb.append(result).append("<br>"); } return sb.toString(); } /** * 停止单个定时任务 */ @RequestMapping("/stop/{jobId}") public String stop(@PathVariable Integer jobId) { // 检测该任务是否已在运行调度中 ScheduledFuture<?> future = futureMap.get(jobId); if (future == null) { log.warn("任务[{}]已不在调度中,无法停止!", jobId); return "任务[" + jobId + "]已不在调度中,无法停止!"; } else { future.cancel(true); futureMap.remove(jobId); log.info("任务[{}]已被停止!", jobId); return "任务[" + jobId + "]已被停止!"; } } /** * 批量停止定时任务 */ @RequestMapping("/stopBatch/{jobIds}") public String stopBatch(@PathVariable String jobIds) { // TODO jobIds valid String[] jobIdsArr = jobIds.split(","); StringBuffer sb = new StringBuffer(); for (String jobId : jobIdsArr) { String result = stop(Integer.valueOf(jobId)); sb.append(result).append("<br>"); } return sb.toString(); } /** * 查看当前时刻调度中的定时任务 */ @RequestMapping("/status") public String getAllStatus() { Set<Integer> runningKeys = futureMap.keySet(); return "当前正在调度的任务列表:" + runningKeys.toString(); } @Data private class MyRunnable implements Runnable { private SpiderJob job; public MyRunnable(SpiderJob job) { this.job = job; } @Override public void run() { log.info("运行定时任务[{}: {}] at {}!", job.getId(), job.getBizName(), LocalDateTime.now()); executeIncrementJob(job.getBizName()); } } }
ThreadPoolTaskScheduler 定时任务实现
org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler定时任务调度线程池
CREATE TABLE `sys_job` ( `id` bigint(20) NOT NULL COMMENT '任务key', `job_name` varchar(64) NOT NULL COMMENT '任务名称', `bean_class` varchar(128) NOT NULL COMMENT '类路径', `cron_expression` varchar(64) NOT NULL COMMENT 'cron表达式', `status` tinyint(1) NOT NULL COMMENT '状态值 @JobStatusEnum 详见具体枚举类', `is_deleted` tinyint(1) DEFAULT '0' COMMENT '删除标识 1是 0否', `create_time` datetime DEFAULT NULL, `update_time` datetime DEFAULT NULL, PRIMARY KEY (`id`) USING BTREE ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
@Configuration @Slf4j public class SchedulingConfigure { @Bean public ThreadPoolTaskScheduler threadPoolTaskScheduler() { log.info("开始创建定时任务调度线程池"); ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler(); threadPoolTaskScheduler.setPoolSize(20); threadPoolTaskScheduler.setThreadNamePrefix("schedule-task-"); threadPoolTaskScheduler.setWaitForTasksToCompleteOnShutdown(true); threadPoolTaskScheduler.setAwaitTerminationSeconds(60); log.info("创建定时任务调度线程池完成!"); return threadPoolTaskScheduler; } }
public enum JobStatusEnum { /** * 未加入调度器 */ NOT_SCHEDULE(0, "未加入调度器"), /** * 加入调度器,但未运行 */ SCHEDULED_BUT_NOT_RUNNING(1, "加入调度器,但未运行"), /** * 从调度器中已删除 */ DELETED(2, "从调度器中已删除"), ; private Integer status; private String detail; JobStatusEnum(Integer status, String detail) { this.status = status; this.detail = detail; } public Integer getStatus() { return status; } public void setStatus(Integer status) { this.status = status; } public String getDetail() { return detail; } public void setDetail(String detail) { this.detail = detail; } }
@Component @Slf4j public class ScheduledJobService { private final ReentrantLock lock = new ReentrantLock(); @Autowired private ThreadPoolTaskScheduler threadPoolTaskScheduler; @Autowired private SysJobService jobService; @Autowired private SpringBeanUtils springBeanUtils; /** * 已经加入调度器的任务map */ private final ConcurrentHashMap<Long, ScheduledFuture<?>> scheduledFutureMap = new ConcurrentHashMap<>(); /** * 初始化启动任务 * * @param sysJobs 数据库任务集合 */ public void initAllJob(List<SysJob> sysJobs) { if (CollectionUtils.isEmpty(sysJobs)) { return; } for (SysJob sysJob : sysJobs) { if (JobStatusEnum.NOT_SCHEDULE.getStatus().equals(sysJob.getStatus()) || JobStatusEnum.DELETED.getStatus().equals(sysJob.getStatus()) || this.isScheduled(sysJob.getId())) { // 任务初始化状态或已删除或已加载到调度器中 continue; } // 将任务加入调度器 this.doScheduleJob(sysJob); } } /** * 启动任务 * * @param jobId job主键id */ public void start(Long jobId) { log.info("启动任务:-> jobId_{}", jobId); // 加入调度器 schedule(jobId); log.info("启动任务结束:-> jobId_{}", jobId); // 更新任务状态 jobService.updateJobStatus(jobId, JobStatusEnum.SCHEDULED_BUT_NOT_RUNNING.getStatus()); } /** * 停止任务 * * @param jobId job主键id */ public void stop(Long jobId) { log.info("停止任务:-> jobId_{}", jobId); // 取消任务 cancel(jobId); log.info("停止任务结束:-> jobId_{}", jobId); // 更新表中任务状态为已停止 jobService.updateJobStatus(jobId, JobStatusEnum.NOT_SCHEDULE.getStatus()); } /** * 移除任务 * * @param jobId job主键id */ public void remove(Long jobId) { log.info("移除任务:-> jobId_{}", jobId); // 取消任务 cancel(jobId); log.info("移除任务结束:-> jobId_{}", jobId); // 更新表中任务状态为已删除 jobService.updateJobStatus(jobId, JobStatusEnum.DELETED.getStatus()); } /** * 取消 * * @param jobId 工作id */ private void cancel(Long jobId) { // 任务是否存在 if (scheduledFutureMap.containsKey(jobId)) { ScheduledFuture<?> scheduledFuture = scheduledFutureMap.get(jobId); if (!scheduledFuture.isCancelled()) { // 取消调度 scheduledFuture.cancel(true); } } } private void schedule(Long jobId) { // 添加锁,只允许单个线程访问,防止任务启动多次 lock.lock(); try { if (isScheduled(jobId)) { log.error("任务jobId_{}已经加入调度器,无需重复操作", jobId); return; } // 通过jobKey查询jobBean对象 SysJob sysJob = jobService.getById(jobId); // 启动定时任务 doScheduleJob(sysJob); } finally { // 释放锁资源 lock.unlock(); } } /** * 执行启动任务 * * @param sysJob 任务实体类对象 */ private void doScheduleJob(SysJob sysJob) { Long jobId = sysJob.getId(); String beanClass = sysJob.getBeanClass(); String jobName = sysJob.getJobName(); String cron = sysJob.getCronExpression(); // 从Spring中获取目标的job业务实现类 ScheduledJob scheduledJob = parseFrom(beanClass); if (scheduledJob == null) { return; } scheduledJob.setJobId(jobId); scheduledJob.setJobName(jobName); ScheduledFuture<?> scheduledFuture = threadPoolTaskScheduler.schedule(scheduledJob, triggerContext -> { CronTrigger cronTrigger = new CronTrigger(cron); return cronTrigger.nextExecutionTime(triggerContext); }); log.info("任务加入调度器 -> jobId:{},jobName:{}", jobId, jobName); // 将启动的任务放入map assert scheduledFuture != null; scheduledFutureMap.put(jobId, scheduledFuture); } /** * 任务是否已经进入调度器 * * @param jobId 任务主键key * @return {@link Boolean} */ private Boolean isScheduled(Long jobId) { if (scheduledFutureMap.containsKey(jobId)) { return !scheduledFutureMap.get(jobId).isCancelled(); } return false; } private ScheduledJob parseFrom(String beanClass) { try { Class<?> clazz = Class.forName(beanClass); return (ScheduledJob) springBeanUtils.getBean(clazz); } catch (ClassNotFoundException e) { e.printStackTrace(); } return null; } }
@Component public class SpringBeanUtils implements ApplicationContextAware { private static ApplicationContext applicationContext; @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { SpringBeanUtils.applicationContext = applicationContext; } /** * 获取applicationContext */ public static ApplicationContext getApplicationContext() { return applicationContext; } /** * 通过name获取 Bean. */ public Object getBean(String name) { return getApplicationContext().getBean(name); } /** * 通过class获取Bean. */ public <T> T getBean(Class<T> clazz) { return getApplicationContext().getBean(clazz); } /** * 通过name,以及Clazz返回指定的Bean */ public <T> T getBean(String name, Class<T> clazz) { return getApplicationContext().getBean(name, clazz); } }
@Data public abstract class ScheduledJob implements Runnable { /** * 任务主键id */ private Long jobId; /** * 任务名 */ private String jobName; }
@Component public class SchedulerTestDemo extends ScheduledJob { @Override public void run() { System.out.println("我是定时任务要执行的类.."); System.out.println(SchedulerTestDemo.class.getName() + ":" + LocalDateTime.now()); } }
/** * 项目启动时,将数据库中job定时任务加载 */ @Component public class GrapeApplicationListener { private final ScheduledJobService scheduledJobService; private final ISysJobService sysJobService; public GrapeApplicationListener(ISysJobService sysJobService, ScheduledJobService scheduledJobService) { this.sysJobService = sysJobService; this.scheduledJobService = scheduledJobService; } @PostConstruct public void initStartJob() { // 初始化job scheduledJobService.initAllJob(sysJobService.list()); } }
@SpringBootApplication(scanBasePackages = {"com.example.grape"}) @MapperScan("com.example.grape.dao.mapper") @EnableScheduling public class GrapeApplication { public static void main(String[] args) { SpringApplication.run(GrapeApplication.class, args); } }
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持。
加载全部内容