java quasar协程池
爪哇盘古 人气:0业务场景:golang与swoole都拥抱了协程,在同任务并发数量下,协程可比线程多几倍。所以最近在查询java时了解java本身是没有协程的,但是某牛自行实现了协程,也就是本文的主角quasar(纤程)!在csdn中基本都是对它的基本使用,用法和线程差不多。不过没看到谁公开一下手写协程池的骚操作(谁会直接new它用?那是没挨过社会的毒打呀~)
一个线程可以多个协程,一个进程也可以单独拥有多个协程。
线程进程都是同步机制,而协程则是异步。
协程能保留上一次调用时的状态,每次过程重入时,就相当于进入上一次调用的状态。
线程是抢占式,而协程是非抢占式的,所以需要用户自己释放使用权来切换到其他协程,因此同一时间其实只有一个协程拥有运行权,相当于单线程的能力。
协程并不是取代线程, 而且抽象于线程之上, 线程是被分割的CPU资源, 协程是组织好的代码流程, 协程需要线程来承载运行, 线程是协程的资源, 但协程不会直接使用线程, 协程直接利用的是执行器(Interceptor), 执行器可以关联任意线程或线程池, 可以使当前线程, UI线程, 或新建新程.。
线程是协程的资源。协程通过Interceptor来间接使用线程这个资源。
废话不多说,直接上代码:
导入包:
<dependency> <groupId>co.paralleluniverse</groupId> <artifactId>quasar-core</artifactId> <version>0.7.9</version> <classifier>jdk8</classifier> </dependency>
WorkTools工具类:
package com.example.ai; import co.paralleluniverse.fibers.Fiber; import co.paralleluniverse.fibers.SuspendExecution; import co.paralleluniverse.strands.SuspendableRunnable; import java.util.concurrent.ArrayBlockingQueue; public class WorkTools { //协程池中默认协程的个数为5 private static int WORK_NUM = 5; //队列默认任务为100 private static int TASK_COUNT = 100; //工做协程数组 private Fiber[] workThreads; //等待队列 private final ArrayBlockingQueue<SuspendableRunnable> taskQueue; //用户在构造这个协程池时,但愿启动的协程数 private final int workerNum; //构造方法:建立具备默认协程个数的协程池 public WorkTools() { this(WORK_NUM,TASK_COUNT); } //建立协程池,workNum为协程池中工做协程的个数 public WorkTools(int workerNum, int taskCount) { if (workerNum <= 0) { workerNum = WORK_NUM; } if (taskCount <= 0) { taskCount = TASK_COUNT; } this.workerNum = workerNum; taskQueue = new ArrayBlockingQueue(taskCount); workThreads = new Fiber[workerNum]; for (int i = 0; i < workerNum; i++) { int finalI = i; workThreads[i] = new Fiber<>(new SuspendableRunnable() { @Override public void run() throws SuspendExecution, InterruptedException { SuspendableRunnable runnable = null; while (true){ try{ //取任务,没有则阻塞。 runnable = taskQueue.take(); }catch (Exception e){ System.out.println(e.getMessage()); } //存在任务则运行。 if(runnable != null){ runnable.run(); } runnable = null; } } }); //new一个工做协程 workThreads[i].start(); //启动工做协程 } Runtime.getRuntime().availableProcessors(); } //执行任务,其实就是把任务加入任务队列,何时执行由协程池管理器决定 public void execute(SuspendableRunnable task) { try { taskQueue.put(task); //put:阻塞接口的插入 } catch (Exception e) { // TODO: handle exception System.out.println("阻塞"); } } //销毁协程池,该方法保证全部任务都完成的状况下才销毁全部协程,不然等待任务完成再销毁 public void destory() { //工做协程中止工做,且置为null System.out.println("ready close thread..."); for (int i = 0; i < workerNum; i++) { workThreads[i] = null; //help gc } taskQueue.clear(); //清空等待队列 } //覆盖toString方法,返回协程信息:工做协程个数和已完成任务个数 @Override public String toString() { return "WorkThread number:" + workerNum + " ==分割线== wait task number:" + taskQueue.size(); } }
测试代码:
package com.example.ai; import co.paralleluniverse.strands.SuspendableRunnable; import lombok.SneakyThrows; import org.springframework.boot.autoconfigure.SpringBootApplication; import java.util.concurrent.CountDownLatch; @SpringBootApplication public class AiApplication { @SneakyThrows public static void main(String[] args) { //等待协程任务完毕后再结束主线程 CountDownLatch cdl = new CountDownLatch(50); //开启5个协程,50个任务列队。 WorkTools myThreadPool = new WorkTools(5, 50); for (int i = 0; i< 50; i++){ int finalI = i; myThreadPool.execute(new SuspendableRunnable() { @Override public void run() { System.out.println(finalI); try { //延迟1秒 Thread.sleep(1000); cdl.countDown(); } catch (InterruptedException e) { System.out.println("阻塞中"); } } }); } //阻塞 cdl.await(); } }
具体代码都有注释了,自行了解。我也是以线程池写法实现。
当前为解决问题:在协程阻塞过程中Fiber类会报阻塞警告,满脸懵逼啊,看着很讨厌。暂时没有办法处理,看各位大神谁有招下方评论提供给下思路。万分感谢~
加载全部内容