springboot @Async异步多线程入库
jiuchengi 人气:0前言
在开发过程中,我们会遇到很多使用线程池的业务场景,例如定时任务使用的就是ScheduledThreadPoolExecutor。而有些时候使用线程池的场景就是会将一些可以进行异步操作的业务放在线程池中去完成,例如在生成订单的时候给用户发送短信,生成订单的结果不应该被发送短信的成功与否所左右,也就是说生成订单这个主操作是不依赖于发送短信这个操作,所以我们就可以把发送短信这个操作置为异步操作。而要想完成异步操作,一般使用的一个是消息服务器MQ,一个就是线程池。今天我们就来看看在Java中常用的Spring框架中如何去使用线程池来完成异步操作,以及分析背后的原理。
在Spring4中,Spring中引入了一个新的注解@Async,这个注解让我们在使用Spring完成异步操作变得非常方便。
在SpringBoot环境中,要使用@Async注解,我们需要先在启动类上加上@EnableAsync注解。这个与在SpringBoot中使用@Scheduled注解需要在启动类中加上@EnableScheduling是一样的道理(当然你使用古老的XML配置也是可以的,但是在SpringBoot环境中,建议的是全注解开发),具体原理下面会分析。加上@EnableAsync注解后,如果我们想在调用一个方法的时候开启一个新的线程开始异步操作,我们只需要在这个方法上加上@Async注解,当然前提是,这个方法所在的类必须在Spring环境中。
项目实况介绍
项目中,我需要将700w条数据,定时任务加入到mysql表中,去掉日志打印和一些其他因素的影响,入库时间还是需要8个小时以上,严重影响后续的一系列操作,所以我才用@Async注解,来实现异步入库,开了7个线程,入库时间缩短为1.5个小时,大大提高效率,以下是详细介绍,一级一些需要注意的坑.
需要写个配置文件两种方式
第一种方式
@Configuration @EnableAsync //启用异步任务 public class ThreadConfig { @Bean public ThreadPoolTaskExecutor executor(){ ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); //配置核心线程数 executor.setCorePoolSize(15); //配置最大线程数 executor.setMaxPoolSize(30); //配置队列大小 executor.setQueueCapacity(1000); //线程的名称前缀 executor.setThreadNamePrefix("Executor-"); //线程活跃时间(秒) //executor.setKeepAliveSeconds(60); //等待所有任务结束后再关闭线程池 executor.setWaitForTasksToCompleteOnShutdown(true); //设置拒绝策略 //executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //执行初始化 executor.initialize(); return executor; } }
第二种方式
@Configuration @EnableAsync public class ExecutorConfig { @Value("${thread.maxPoolSize}") private Integer maxPoolSize; @Value("${thread.corePoolSize}") private Integer corePoolSize; @Value("${thread.keepAliveSeconds}") private Integer keepAliveSeconds; @Value("${thread.queueCapacity}") private Integer queueCapacity; @Bean public ThreadPoolTaskExecutor asyncExecutor(){ ThreadPoolTaskExecutor taskExecutor=new ThreadPoolTaskExecutor(); taskExecutor.setCorePoolSize(corePoolSize);//核心数量 taskExecutor.setMaxPoolSize(maxPoolSize);//最大数量 taskExecutor.setQueueCapacity(queueCapacity);//队列 taskExecutor.setKeepAliveSeconds(keepAliveSeconds);//存活时间 taskExecutor.setWaitForTasksToCompleteOnShutdown(true);//设置等待任务完成后线程池再关闭 taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());//设置拒绝策略 taskExecutor.initialize();//初始化 return taskExecutor; } }
配置文件
#线程池 thread: corePoolSize: 5 maxPoolSize: 10 queueCapacity: 100 keepAliveSeconds: 3000
springboot默认是不开启异步注解功能的,所以,要让springboot中识别@Async,则必须在入口文件中,开启异步注解功能
package com.demo; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.scheduling.annotation.EnableAsync; //开启异步注解功能 @EnableAsync @SpringBootApplication public class SpringbootTaskApplication { public static void main(String[] args) { SpringApplication.run(SpringbootTaskApplication.class, args); } }
这里有个坑!
如果遇到报错:需要加上 proxyTargetClass = true
The bean 'xxxService' could not be injected as a'com.xxxx.xxx.xxxService' because it is a JDK dynamic proxy that implements:
xxxxxx
Action:
Consider injecting the bean as one of its interfaces orforcing the use of CGLib-based proxiesby setting proxyTargetClass=true on @EnableAsync and/or @EnableCaching.
当我service层处理完逻辑,吧list分成7个小list然后调用异步方法(异步方法的参数不用管,没影响,只截取核心代码)
List<List<DistributedPredictDTO>> partition = Lists.partition(userList, userList.size() / 7); for (List<DistributedPredictDTO> distributedPredictDTOS : partition) { //调用异步方法 threadService.getI(beginDate, endDate, tableName, distributedPredictDTOS, hMap, i); }
@Slf4j @Service public class ThreadServiceImpl { @Resource ResourcePoolUrlProperties properties; @Resource private MonitorDao monitorDao; @Async Integer getI(String beginDate, String endDate, String tableName, List<DistributedPredictDTO> userList, Map<String, String> hMap, int i) { log.info("我开始执行"); for (DistributedPredictDTO e : userList) { String responseStr; HashMap<String, String> pMap = Maps.newHashMap(); pMap.put("scheduleId", e.getScheduleId()); pMap.put("scheduleName", e.getScheduleName()); pMap.put("distribsunStationId", e.getLabel()); pMap.put("distribsunStationName", e.getValue()); pMap.put("beginTime", beginDate); pMap.put("endTime", endDate); try { if ("180".equals(properties.getNewPowerSys().getDistributedPredictUrl().substring(17, 20))) { pMap = null; } responseStr = HttpClientUtil.doPost(properties.getNewPowerSys().getDistributedPredictUrl(), hMap, pMap); } catch (Exception exception) { throw new RuntimeException(e.getValue() + "的功率预测接口异常" + hMap + pMap); } if (org.springframework.util.StringUtils.isEmpty(responseStr)) { log.info(e + "数据为空"); continue; } JSONObject resJson = JSONObject.parseObject(responseStr); JSONObject obj = (JSONObject) resJson.get("obj"); JSONArray tableData = (JSONArray) obj.get("tabledata"); final List<DistributedUserPower> userPowers = Lists.newArrayList(); for (Object o : tableData) { final DistributedUserPower distributedUserPower = new DistributedUserPower(); distributedUserPower.setData(((JSONObject) o).get("data").toString()); distributedUserPower.setData2(((JSONObject) o).get("data2").toString()); distributedUserPower.setDataTime(((JSONObject) o).get("time").toString()); distributedUserPower.setUserId(e.getLabel()); distributedUserPower.setUserName(e.getValue()); distributedUserPower.setAreaName(e.getScheduleName()); distributedUserPower.setCreateTime(DateUtils.getDate()); userPowers.add(distributedUserPower); } monitorDao.saveBatch(userPowers, tableName); i++; } return i; }
这里有两个坑!
第一个坑:
我调用的异步方法在当前类中,则直接导致
@Async注解失效
正确操作,异步方法不要和同步调用方法写在同一个类中,应该重新调用其他类
第二个坑:
如果出现这个报错:
Null return value from advice does not mat
问题分析
代码中采用异步调用,AOP 做来一层切面处理,底层是通过 JDK 动态代理实现
不管采用 JDK 还是 CGLIB 代理,返回值必须是包装类型,所以才会导致上诉的报错信息
处理方案
将异步方法的返回值修改为基本类型的对应包装类型即可,如 int -> Integer
5分钟测试效果图:
最后一张是7线程:
总结
加载全部内容