SpringBoot动态定时功能实现方案详解
smileNicky 人气:0业务场景
基于上篇程序,做了一版动态定时程序,然后发现这个定时程序需要在下次执行的时候会加载新的时间,所以如果改了定时程序不能马上触发,所以想到一种方法,在保存定时程序的时候将cron表达式传过去,然后触发定时程序,下面看看怎么实现
环境准备
开发环境
- JDK 1.8
- SpringBoot2.2.1
- Maven 3.2+
开发工具
- IntelliJ IDEA
- smartGit
- Navicat15
实现方案
基于上一版进行改进:
- 先根据选择的星期生成cron表达式,保存到数据库里,同时更改cron表达式手动触发定时程序加载最新的cron表达式
- 根据保存的cron表达式规则执行定时程序
- 通过CommandLineRunner设置启动加载
- 加上线程池,提高线程复用率和程序性能
加上ThreadPoolTaskScheduler,支持同步和异步两种方式:
import lombok.extern.slf4j.Slf4j; import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.AsyncConfigurer; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.SchedulingConfigurer; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import org.springframework.scheduling.config.ScheduledTaskRegistrar; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.ThreadPoolExecutor; @Configuration @Slf4j public class ScheduleConfig implements SchedulingConfigurer , AsyncConfigurer { @Override public void configureTasks(ScheduledTaskRegistrar taskRegistrar) { taskRegistrar.setScheduler(taskScheduler()); } @Bean(destroyMethod="shutdown" , name = "taskScheduler") public ThreadPoolTaskScheduler taskScheduler() { ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); scheduler.setPoolSize(10); scheduler.setThreadNamePrefix("itemTask-"); scheduler.setAwaitTerminationSeconds(600); scheduler.setWaitForTasksToCompleteOnShutdown(true); return scheduler; } @Bean(name = "asyncExecutor") public ThreadPoolTaskExecutor asyncExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(10); executor.setQueueCapacity(1000); executor.setKeepAliveSeconds(600); executor.setMaxPoolSize(20); executor.setThreadNamePrefix("itemAsyncTask-"); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.initialize(); return executor; } @Override public Executor getAsyncExecutor() { return asyncExecutor(); } @Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { return (throwable, method, objects) -> { log.error("异步任务异常,message {} , method {} , params" , throwable , method , objects); }; } }
加上一个SchedulerTaskJob
接口:
public interface SchedulerTaskJob{ void executeTask(); }
AbstractScheduler 抽象类,提供基本的功能
import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.Trigger; import org.springframework.scheduling.TriggerContext; import org.springframework.scheduling.annotation.SchedulingConfigurer; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import org.springframework.scheduling.config.ScheduledTaskRegistrar; import org.springframework.scheduling.support.CronTrigger; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.Date; @Slf4j @Component @Data public abstract class AbstractScheduler implements SchedulerTaskJob{ @Resource(name = "taskScheduler") private ThreadPoolTaskScheduler threadPoolTaskScheduler; @Override public void executeTask() { String cron = getCronString(); Runnable task = () -> { // 执行业务 doBusiness(); }; Trigger trigger = new Trigger() { @Override public Date nextExecutionTime(TriggerContext triggerContext) { CronTrigger trigger; try { trigger = new CronTrigger(cron); return trigger.nextExecutionTime(triggerContext); } catch (Exception e) { log.error("cron表达式异常,已经启用默认配置"); // 配置cron表达式异常,执行默认的表达式 trigger = new CronTrigger(getDefaultCron()); return trigger.nextExecutionTime(triggerContext); } } }; threadPoolTaskScheduler.schedule(task , trigger); } protected abstract String getCronString(); protected abstract void doBusiness(); protected abstract String getDefaultCron(); }
实现类,基于自己的业务实现,然后事项抽象类,通过模板模式进行编程
import cn.hutool.core.util.StrUtil; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.toolkit.Wrappers; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.time.LocalDateTime; @Service @Slf4j @Data public class ItemSyncScheduler extends AbstractScheduler { @Value("${configtask.default.itemsync}") private String defaultCron ; private String cronString ; @Override protected String getCronString() { if (StrUtil.isNotBlank(cronString)) return cronString; SyncConfigModel configModel = syncConfigService.getOne(Wrappers.<SyncConfigModel>lambdaQuery() .eq(SyncConfigModel::getBizType, 1) .last("limit 1")); if (configModel == null) return defaultCron; return configModel.getCronStr(); } @Override protected void doBusiness() { log.info("执行业务..."); log.info("执行时间:{}" , LocalDateTime.now()); // 执行业务 } @Override protected String getDefaultCron() { return defaultCron; } }
如果更改了cron表达式,程序不会马上触发,所以直接开放一个接口出来,调用的时候,设置最新的表达式,然后重新调用定时程序
import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; @RestController @Slf4j public class ItemSchedulerController { private ItemSyncScheduler itemSyncScheduler; @Autowired public ItemSchedulerController(ItemSyncScheduler itemSyncScheduler) { this.itemSyncScheduler= itemSyncScheduler; } @GetMapping(value = "/updateItemCron") @ApiOperation(value = "更新cron表达式") public void updateItemCron(@RequestParam("cronString") String cronString) { log.info("更新cron表达式..."); log.info("cronString:{}" , cronString); itemSyncScheduler.setCronString(cronString); itemSyncScheduler.executeTask(); } }
实现CommandLineRunner ,实现Springboot启动加载
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.core.annotation.Order; import org.springframework.stereotype.Component; import javax.annotation.Resource; @Component @Order(1) public class SchedulerTaskRunner implements CommandLineRunner { private ItemSyncScheduler itemSyncScheduler; @Autowired public SchedulerTaskRunner(ItemSyncScheduler itemSyncScheduler) { this.itemSyncScheduler= itemSyncScheduler; } @Override public void run(String... args) throws Exception { itemSyncScheduler.executeTask(); } }
归纳总结
基于上一版定时程序的问题,做了改进,加上了线程池和做到了动态触发,网上的资料很多都是直接写明使用SchedulingConfigurer
来实现动态定时程序,不过很多都写明场景,本文通过实际,写明实现方法,本文是在保存定时程序的时候,设置最新的cron表达式,调一下接口重新加载,还可以使用canal等中间件监听数据表,如果改了就再设置cron表达式,然后触发程序
加载全部内容