Dubbo线程池
悠然予夏 人气:01、Dubbo已有线程池
dubbo在使用时,都是通过创建真实的业务线程池进行操作的。目前已知的线程池模型有两个和java中的相互对应:
- fix: 表示创建固定大小的线程池。也是Dubbo默认的使用方式,默认创建的执行线程数为200,并且是没有任何等待队列的。所以在极端的情况下可能会存在问题,比如某个操作大量执行时,可能存在堵塞的情况。后面也会讲相关的处理办法。
- cache: 创建非固定大小的线程池,当线程不足时,会自动创建新的线程。但是使用这种的时候需要注意,如果突然有高TPS的请求过来,方法没有及时完成,则会造成大量的线程创建,对系统的CPU和负载都是压力,执行越多反而会拖慢整个系统。
2、自定义线程池
在真实的使用过程中可能会因为使用fix模式的线程池,导致具体某些业务场景因为线程池中的线程数量不足而产生错误,而很多业务研发是对这些无感知的,只有当出现错误的时候才会去查看告警或者通过客户反馈出现严重的问题才去查看,结果发现是线程池满了。所以可以在创建线程池的时,通过某些手段对这个线程池进行监控,这样就可以进行及时的扩缩容机器或者告警。下面的这个程序就是这样子的,会在创建线程池后进行对其监控,并且及时作出相应处理。
(1)线程池实现, 这里主要是基于对FixedThreadPool 中的实现做扩展出线程监控的部分
package com.lagou.threadpool; import org.apache.dubbo.common.URL; import org.apache.dubbo.common.threadpool.support.fixed.FixedThreadPool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Map; import java.util.concurrent.*; public class WachingThreadPool extends FixedThreadPool implements Runnable { private static final Logger LOGGER = LoggerFactory.getLogger(WachingThreadPool.class); // 定义线程池使用的阀值 private static final double ALARM_PERCENT = 0.90; private final Map<URL, ThreadPoolExecutor> THREAD_POOLS = new ConcurrentHashMap<>(); public WachingThreadPool() { // 每隔3秒打印线程使用情况 Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(this, 1, 3, TimeUnit.SECONDS); } // 通过父类创建线程池 @Override public Executor getExecutor(URL url) { final Executor executor = super.getExecutor(url); if (executor instanceof ThreadPoolExecutor) { THREAD_POOLS.put(url, (ThreadPoolExecutor) executor); } return executor; } @Override public void run() { // 遍历线程池 for (Map.Entry<URL, ThreadPoolExecutor> entry : THREAD_POOLS.entrySet()) { final URL url = entry.getKey(); final ThreadPoolExecutor executor = entry.getValue(); // 计算相关指标 final int activeCount = executor.getActiveCount(); final int poolSize = executor.getCorePoolSize(); double usedPercent = activeCount / (poolSize * 1.0); LOGGER.info("线程池执行状态:[{}/{}:{}%]", activeCount, poolSize, usedPercent * 100); if (usedPercent > ALARM_PERCENT) { LOGGER.error("超出警戒线! host:{} 当前使用率是:{},URL:{}", url.getIp(), usedPercent * 100, url); } } } }
(2)SPI声明,创建文件(固定的)
META-INF/dubbo/org.apache.dubbo.common.threadpool.ThreadPool
watching=包名.线程池名
(3)在服务提供方项目引入该依赖
(4)在服务提供方项目中设置使用该线程池生成器
dubbo.provider.threadpool=watching
(5)接下来需要做的就是模拟整个流程,因为该线程当前是每1秒抓一次数据,所以我们需要对该方法的提供者超过1秒的时间(比如这里用休眠Thread.sleep ),消费者则需要启动多个线程来并行执行,来模拟整个并发情况。
(6)在调用方则尝试简单通过for循环启动多个线程来执行 查看服务提供方的监控情况
package com.lagou; import com.lagou.bean.ConsumerComponent; import org.apache.dubbo.config.spring.context.annotation.EnableDubbo; import org.springframework.context.annotation.AnnotationConfigApplicationContext; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.PropertySource; import java.io.IOException; public class AnnotationConsumerMain { public static void main(String[] args) throws IOException, InterruptedException { AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(ConsumerConfiguration.class); context.start(); ConsumerComponent service = context.getBean(ConsumerComponent.class); while (true) { for (int i = 0; i < 1000; i++) { Thread.sleep(5); new Thread(new Runnable() { @Override public void run() { String msg = service.sayHello("world", 0); System.out.println(msg); } }).start(); } } } @Configuration @PropertySource("classpath:/dubbo-consumer.properties") //@EnableDubbo(scanBasePackages = "com.lagou.bean") @ComponentScan("com.lagou.bean") @EnableDubbo static class ConsumerConfiguration { } }
加载全部内容