Springboot线程池
zhangbeizhen18 人气:0版本:Spring Boot 2.6.3
一、案例场景
1>web端接收restful请求生成任务A,并把任务放入队列Queue_A。
2>线程池A的任务线程从队列Queue_A取出任务,处理完成后放入Queue_B。
3>线程池B的任务线程从Queue_B取出任务,处理完成后入库。
本例就使用两个任务步骤,按需扩展延长任务链。
二、使用类
java.util.LinkedHashMap,双向链表。
java.util.concurrent.BlockingQueue,阻塞队列接口。
java.util.concurrent.LinkedBlockingQueue,阻塞队列实现类。
java.util.concurrent.CountDownLatch,线程计数器。
java.util.concurrent.locks.ReentrantLock,可重入锁。
三、本例说明
1.接收web请求
OrderController接收web请求,业务数据封装成任务对象,并写入队列QUEUE_A。Web请求结束,立即返回。
2.后台任务处理
FlowStarter流程启动器
管理FlowManager,创建流程管理器和启动流程管理器。创建线程池容器StepContainer,指定队列、线程池线程数量,以及业务处理Handler。
FlowManager流程管理器
管理线程池容器StepContainer。创建线程池容器,启动线程池容器,关闭线程池容器,线程池容器之间数据传递。使用LinkedHashMap维护一个流程中的多个线程池容器。
StepContainer线程池容器
创建线程池,启动线程执行器(Executor),初始化业务处理Handler,读写队列。使用LinkedHashMap维护一个流程中的多个StepExecutor。
StepExecutor线程执行器
执行抽象公用业务逻辑。实现线程Runnable接口。调用StepHandler的实现类的execute执行具体业务逻辑。
StepHandler业务处理器handler
具体业务在StepHandler的实现类的execute中实现。
任务模型对象StepModel和执行结果对象StepResult
每个具体业务数据必须包装成任务模型对象StepModel,执行结果包装成执行结果对象StepResult,才能在线程池和队列中流转。
3.关系说明
一个FlowStarter可以启动一个或者多个FlowManager。支持一对多和一对一,按需扩展。
一个FlowManager对应一个业务流程。一个业务流程可以拆分为多个步骤。一个步骤对应一个线程池容器StepContainer。一个线程池容器StepContainer,启动多个线程执行器StepExecutor。效果就是并发执行任务。
一个业务流程拆分成若干个步骤,每个步骤之间数据流转,使用任务模型StepModel中的状态标识isFinished,isPutInQueueAgain,isPutInQueueNext 字段来分析任务流向。使用StepModel的StepResult的 nextStepName字段来识别具体流向的线程池容器。
四、代码
1.OrderController
OrderController,接收请求、封装任务、写队列。
@Slf4j @RestController @RequestMapping("/order") public class OrderController { @PostMapping("/f1") public Object f1(@RequestBody Object obj) { log.info("OrderController->f1,接收参数,obj = " + obj.toString()); Map objMap = (Map) obj; OrderInfo orderInfo = new OrderInfo(); orderInfo.setUserName((String) objMap.get("userName")); orderInfo.setTradeName((String) objMap.get("tradeName")); orderInfo.setOrderTime(System.currentTimeMillis()); LinkedBlockingQueue<StepModel> queueA = FlowQueue.getBlockingQueue("QUEUE_A"); QueueUtils.putStepPutInQueue(queueA,orderInfo); log.info("OrderController->f1,返回." ); return ResultObj.builder().code("200").message("成功").build(); } }
2.FlowStarter流程启动器
FlowStarter,后台任务线程池和线程启动。实现InitializingBean了接口。那么在spring初始化化bean完成后,就能触发启动线程池和线程。
@Slf4j @Service public class FlowStarter implements InitializingBean { @Override public void afterPropertiesSet() throws Exception { log.info("FlowWorker创建流程."); FlowManager flowManager = new FlowManager(); flowManager.buildContainer(ConstantUtils.STEP_01,5, FlowQueue.getBlockingQueue("QUEUE_A"), Step01Handler.class ); flowManager.buildContainer(ConstantUtils.STEP_02,5, FlowQueue.getBlockingQueue("QUEUE_B"), Step02Handler.class ); flowManager.startContainers(); log.info("FlowWorker启动流程完成."); } }
3.FlowManager流程管理器
一个FlowManager流程管理器,维护多个线程池容器StepContainer,共同完成一个流程的多个步骤。
public class FlowManager { // 管理器名称 private String name; // 管理线程池容器 private Map<String, StepContainer> stepContainerMap = new LinkedHashMap<>(); public FlowManager() {} // 创建线程池容器 public void buildContainer(String name, int poolSize, BlockingQueue<StepModel> queue, Class<? extends StepHandler> handlerClazz) { StepContainer stepWorker = new StepContainer(); stepWorker.createThreadPool(poolSize, queue, handlerClazz); stepWorker.setName(name); stepWorker.setFlowManager(this); this.stepContainerMap.put(name, stepWorker); } // 启动线程池容器 public void startContainers() { for (StepContainer stepContainer : this.stepContainerMap.values()) { stepContainer.startRunExecutor(); } } // 关闭线程池容器 public void stopContainers() { for (StepContainer stepContainer : this.stepContainerMap.values()) { stepContainer.stopRunExecutor(); } this.stepContainerMap.clear(); } // 任务放入下一个线程池 public boolean sendToNextContainer(String nextStepName, Object obj) { if (nextStepName != null && !StringUtils.equals(nextStepName, "")) { if (this.stepContainerMap.containsKey(nextStepName)) { this.stepContainerMap.get(nextStepName).putStepInQueue(obj); return true; } else { return false; } } else { return false; } } public String getName() { return name; } }
4.StepContainer线程池容器
StepContainer线程池容器,维护多个线程执行器StepExecutor,实现多线程异步完成每个独立任务。
@Slf4j public class StepContainer { // 线程池名称 private String name; // 线程池 private ExecutorService threadPool; // 线程数目 private int nThreads = 0; // 线程处理业务handler类 private Class handlerClazz; // 线程处理业务队列 private BlockingQueue<StepModel> queue = null; // 线程池内线程管理 private Map<String, StepExecutor> stepExecutorMap = new LinkedHashMap<>(); // 线程池运行状态 private boolean isRun = false; // 线程池管理器 private FlowManager flowManager = null; // 构造函数 public StepContainer() {} // 创建线程池 public boolean createThreadPool(int nThreads, BlockingQueue<StepModel> queue, Class<? extends StepHandler> handlerClazz) { try { this.nThreads = nThreads; this.queue = queue; this.handlerClazz = handlerClazz; this.threadPool = Executors.newFixedThreadPool(this.nThreads, new ThreadFactory() { @Override public Thread newThread(Runnable runnable) { return new Thread(runnable); } }); } catch (Exception e) { e.printStackTrace(); return false; } return true; } // 启动线程 public void startRunExecutor() { if (!this.isRun) { if (this.handlerClazz != null) { log.info("线程池: " + this.name + ",启动,加载线程Executor."); StepExecutor stepExecutor; String executorName = ""; for (int num = 0; num < this.nThreads; num++) { try { executorName = this.name + "_" + (num + 1); StepHandler stepHandler = (StepHandler) createStepHandler(this.handlerClazz); stepExecutor = new StepExecutor(executorName, this.queue, stepHandler, this); this.threadPool.execute(stepExecutor); this.stepExecutorMap.put(executorName, stepExecutor); } catch (Exception e) { e.printStackTrace(); } } this.isRun = true; } } } // 关闭线程 public void stopRunExecutor() { if (isRun) { Iterator iterator = this.stepExecutorMap.values().iterator(); while (iterator.hasNext()) { StepExecutor stepExecutor = (StepExecutor) iterator.next(); stepExecutor.stop(); } this.stepExecutorMap.clear(); this.isRun = false; } } // 从队列获取任务 public StepModel getStepFromQueue() { StepModel stepModel = null; synchronized (this.queue) { try { if (this.queue.size() > 0) { stepModel = this.queue.take(); } } catch (Exception e) { log.info("从队列获取任务异常."); e.printStackTrace(); } } return stepModel; } // 任务放入队列 public void putStepInQueue(Object obj) { try { StepModel stepModel = new StepModel(obj); stepModel.setPutInQueueTime(System.currentTimeMillis()); this.queue.put(stepModel); } catch (InterruptedException e) { log.info("任务放入队列异常."); e.printStackTrace(); } } // 重新放入 public void putStepInQueueAgain(StepModel stepModel) { stepModel.setFinished(false); stepModel.setPutInQueueNext(false); stepModel.setPutInQueueAgain(false); try { this.queue.put(stepModel); } catch (InterruptedException e) { log.info("任务重新放入队列异常."); e.printStackTrace(); } } // 清空队列 public void clearQueue() { if (this.queue != null) { this.queue.clear(); } } // 初始化实例对象 public Object createStepHandler(Class clazz) throws InstantiationException, IllegalAccessException { Object object = clazz.newInstance(); return object; } public String getName() { return name; } public void setName(String name) { this.name = name; } public FlowManager getFlowManager() { return flowManager; } public void setFlowManager(FlowManager flowManager) { this.flowManager = flowManager; } }
5.StepExecutor线程执行器
StepExecutor线程执行器,实现Runnable接口。线程执行单元通用逻辑,具体业务逻辑通过调用StepHandler的execute方法实现。
@Slf4j public class StepExecutor implements Runnable { // 执行器名称 private String name; // 线程执行的任务 private StepModel stepModel; // 线程执行的队列 private BlockingQueue<StepModel> queue; // 线程执行的业务处理逻辑 private Object stepHandler; // 线程运行状态 private volatile boolean isRun = false; // 线程开启(True)和关闭(False) private volatile boolean isClose = false; // 线程隶属容器 private StepContainer stepContainer; // 线程计数器(关闭线程使用) private CountDownLatch countDownLatch = null; public StepExecutor() {} public StepExecutor(String name, BlockingQueue<StepModel> queue, StepHandler stepHandler, StepContainer stepContainer) { this.name = name; this.queue = queue; this.stepHandler = stepHandler; this.stepContainer = stepContainer; } @Override public void run() { this.isRun = true; this.countDownLatch = new CountDownLatch(1); // 没收到关闭信号,则循环运行 while (!this.isClose) { this.stepModel = null; String threadName = "【线程池:" + this.stepContainer.getName() + ",线程:" + Thread.currentThread().getName() + "】"; // 循环运行,为防止中断和卡主,需捕获异常 try { StepHandler stepHandler = (StepHandler) this.stepHandler; this.stepModel = this.stepContainer.getStepFromQueue(); if (this.stepModel != null) { log.info(threadName + ",处理任务."); this.stepModel.getStepResultList().clear(); stepHandler.execute(this.stepModel); // 执行完成后结果数据 List<StepResult> stepResultList = this.stepModel.getStepResultList(); boolean isFinished = this.stepModel.isFinished(); boolean isPutInQueueAgain = this.stepModel.isPutInQueueAgain(); boolean isPutInQueueNext = this.stepModel.isPutInQueueNext(); if (isFinished && !isPutInQueueAgain && !isPutInQueueNext) { log.info(threadName + ",任务结束."); } if (!isFinished && isPutInQueueAgain && !isPutInQueueNext) { log.info(threadName + ",任务在本步骤未完成,重新放队列."); this.stepContainer.putStepInQueueAgain(this.stepModel); } if (!isFinished && !isPutInQueueAgain && isPutInQueueNext) { int resultNum = stepResultList.size(); if (resultNum > 0) { for (StepResult stepResult : stepResultList) { log.info(threadName + ",任务在本步骤已经完成,发送给下一个线程池: " + stepResult.getNextStepName() + ",执行."); this.stepContainer.getFlowManager().sendToNextContainer( stepResult.getNextStepName(), stepResult.getResult()); } } } } else { threadToSleep(1000 * 3L); } } catch (Exception e) { log.info("执行器异常."); e.printStackTrace(); this.stepContainer.putStepInQueueAgain(this.stepModel); } } // 跳出循环后,线程计数减1 this.countDownLatch.countDown(); this.isRun = false; } public void stop() { this.isClose = true; if (this.countDownLatch != null) { while (this.countDownLatch.getCount() > 0L) { try { this.countDownLatch.await(); } catch (InterruptedException e) { log.info("线程关闭异常."); e.printStackTrace(); } } } this.isClose = false; } public void threadToSleep(long time) { try { Thread.sleep(time); } catch (Exception e) { log.info("线程休眠异常."); e.printStackTrace(); } } }
6.StepHandler业务处理handler
StepHandler是StepExecutor线程执行器,具体执行业务逻辑的入口。
StepHandler抽象类
每个具体的实现类都继承抽象的StepHandler。
public abstract class StepHandler { public StepHandler() {} public abstract void execute(StepModel stepModel); }
Step01Handler
Step01Handler是StepHandler实现类,从队列中取任务执行,执行完成后放入下一个业务处理器Step02Handler。
@Slf4j public class Step01Handler extends StepHandler { @Override public void execute(StepModel stepModel) { log.info("Step01Handler执行开始,stepModel: " + stepModel.toString()); OrderInfo orderInfo = (OrderInfo) stepModel.getObj(); List<StepResult> stepResultList = stepModel.getStepResultList(); try { log.info("Step01Handler执行,处理订单."); String orderNo = UUID.randomUUID().toString() .replace("-", "").toUpperCase(); orderInfo.setOrderNo(orderNo); orderInfo.setPlatformType("线上"); orderInfo.setOrderSource("Web"); stepModel.setFinished(false); stepModel.setPutInQueueNext(true); stepModel.setPutInQueueAgain(false); stepResultList.add(new StepResult(ConstantUtils.STEP_02, orderInfo)); } catch (Exception e) { stepModel.setFinished(false); stepModel.setPutInQueueNext(false); stepModel.setPutInQueueAgain(true); stepResultList.add(new StepResult(ConstantUtils.STEP_01, orderInfo)); } log.info("Step01Handler执行完成,stepModel: " + stepModel.toString()); } }
Step02Handler
Step02Handler是StepHandler实现类,从队列中取任务执行。
@Slf4j public class Step02Handler extends StepHandler{ @Override public void execute(StepModel stepModel) { log.info("Step02Handler执行开始,stepModel: " + stepModel.toString()); OrderInfo orderInfo = (OrderInfo) stepModel.getObj(); List<StepResult> stepResultList = stepModel.getStepResultList(); try { orderInfo.setEndTime(System.currentTimeMillis()); stepModel.setFinished(true); stepModel.setPutInQueueNext(false); stepModel.setPutInQueueAgain(false); log.info("Step02Handler执行,入库."); } catch (Exception e) { stepModel.setFinished(true); stepModel.setPutInQueueNext(false); stepModel.setPutInQueueAgain(false); } log.info("Step02Handler执行完成,stepModel: " + stepModel.toString()); } }
7.阻塞队列
BlockingQueue是线程安全的阻塞队列。
7.1 FlowQueue
FlowQueue,管理本例使用的两个阻塞队列。
public class FlowQueue { private static final LinkedBlockingQueue<StepModel> queueA = new LinkedBlockingQueue<StepModel>(); private static final LinkedBlockingQueue<StepModel> queueB = new LinkedBlockingQueue<StepModel>(); public static LinkedBlockingQueue<StepModel> getBlockingQueue(String queueName) { LinkedBlockingQueue<StepModel> queue = null; switch (queueName) { case "QUEUE_A": queue = queueA; break; case "QUEUE_B": queue = queueB; break; } return queue; } }
7.2 QueueUtils
QueueUtils,队列简易工具。
@Slf4j public class QueueUtils { public static StepModel getStepFromQueue( LinkedBlockingQueue<StepModel> queue) { StepModel stepModel = null; try { if (queue.size() > 0) { stepModel = queue.take(); } } catch (Exception e) { log.info("读队列异常."); e.printStackTrace(); } return stepModel; } public static void putStepPutInQueue( LinkedBlockingQueue<StepModel> queue, Object obj) { try { StepModel stepModel = new StepModel(obj); stepModel.setPutInQueueTime(System.currentTimeMillis()); queue.put(stepModel); } catch (Exception e) { log.info("写队列异常."); e.printStackTrace(); } } public static int getQueueSize( LinkedBlockingQueue<StepModel> queue) { int size = 0; try { size = queue.size(); } catch (Exception e) { log.info("获取队列Size异常."); e.printStackTrace(); } return size; } }
7.3 ConstantUtils
ConstantUtils,管理常量,即线程池名称。
public class ConstantUtils { public static final String STEP_01 = "STEP_01_THREAD_POOL"; public static final String STEP_02 = "STEP_02_THREAD_POOL"; }
8.任务模型
任务模型,即具体需要处理对象,封装成线程使用的任务模型,这样可以把业务和流程框架解耦。
8.1 StepModel
StepModel,任务模型封装。
@Data public class StepModel { // 任务对象 private Object obj; // 任务执行结果 private List<StepResult> stepResultList; // 任务接收时间 private long putInQueueTime; // 任务完成标识 private boolean isFinished = false; // 任务重新放入队列标识 private boolean isPutInQueueAgain = false; // 任务放入下一个队列标识 private boolean isPutInQueueNext = false; public StepModel(Object object) { this.obj = object; this.stepResultList = new ArrayList<>(); } }
8.2 StepResult
StepResult,执行结果模型封装。
@Data public class StepResult { // 目标线程池名 private String nextStepName; // 执行结果 private Object result; public StepResult(String nextStepName,Object result){ this.nextStepName = nextStepName; this.result = result; } }
9.业务数据模型
业务数据模型,即生成具体需要处理的数据,在传入给线程池的线程执行前,需要封装成任务模型。
9.1 OrderInfo
OrderInfo,本例要处理的业务数据模型。
@Data @NoArgsConstructor public class OrderInfo { private String userName; private String orderNo; private String tradeName; private String platformType; private String orderSource; private long orderTime; private long endTime; }
9.2 ResultObj
ResultObj,web请求返回的统一封装对象。
@Data @NoArgsConstructor @AllArgsConstructor @Builder public class ResultObj { private String code; private String message; }
10.测试
包括web请求和后台任务
10.1 web请求
请求URL: http://127.0.0.1:8080/server/order/f1
入参:
{
"userName": "HangZhou0614",
"tradeName": "Vue进阶教程"
}
返回值:
{
"code": "200",
"message": "成功"
}
10.2 后台任务日志
日志输出:
加载全部内容