Java Disruptor入门 Java多线程之Disruptor入门
EileenChang 人气:0想了解Java多线程之Disruptor入门的相关内容吗,EileenChang在本文为您仔细讲解Java Disruptor入门的相关知识和一些Code实例,欢迎阅读和指正,我们先划重点:Java,Disruptor入门,Java,Disruptor,下面大家一起来学习吧。
一、Disruptor简介
Disruptor目前是世界上最快的单机消息队列,由英国外汇交易公司LMAX开发,研发的初衷是解决内存队列的延迟问题(在性能测试中发现竟然与I/O操作处于同样的数量级)。基于Disruptor开发的系统单线程能支撑每秒600万订单,2010年在QCon演讲后,获得了业界关注。2011年,企业应用软件专家Martin Fowler专门撰写长文介绍。同年它还获得了Oracle官方的Duke大奖。目前,包括Apache Storm、Camel、Log4j 2在内的很多知名项目都应用了Disruptor以获取高性能。
二、浅聊Disruptor的核心
Disruptor维护了一个环形队列RingBuffer,这个队列本质上是一个首位相连的数组。相比于LinkedBlockdingQueue,RingBuffer的数组结构在查找方面效率更高。此外,LinkedBlockingQueue需要维护一个头节点指针head和一个尾节点指针tail,而RingBuffer只需要维护一个sequence指向下一个可用的位置即可。所以从这两点来说,RingBuffer比LinkedBlockingQueue要快。
三、Disruptor使用
3.1 pom.xml
<dependency> <groupId>com.lmax</groupId> <artifactId>disruptor</artifactId> <version>3.4.3</version> </dependency>
3.2 事件Event
Disruptor是基于事件的生产者消费者模型。其RingBuffer中存放的其实是将消息封装成的事件。这里定义了一个LongEvent,表示消息队列中存放的是long类型的数据。
public class LongEvent { private long value; public void set(long value) { this.value = value; } @Override public String toString() { return "LongEvent{" + "value=" + value + '}'; } }
3.3 EventFactory
实现EventFactory接口,定义Event工厂,用于填充队列。Event工厂其实是为了提高Disruptor的效率,初始化的时候,会调用Event工厂,对RingBuffer进行内存的提前分配,GC的频率会降低。
import com.lmax.disruptor.EventFactory; public class LongEventFactory implements EventFactory<LongEvent> { public LongEvent newInstance() { return new LongEvent(); } }
3.4 EventHandler
实现EventHandler接口,定义EventHandler(消费者),处理容器中的元素。
import com.lmax.disruptor.EventHandler; public class LongEventHandler implements EventHandler<LongEvent> { public void onEvent(LongEvent event, long sequence, boolean endOfBatch) { System.out.println("Event: " + event + ", sequence: " + sequence); } }
3.5 使用Disruptor原始API发布消息
import cn.flying.space.disruptor.demo.LongEvent; import com.lmax.disruptor.RingBuffer; import java.nio.ByteBuffer; /** * 定义一个生产者,往Disruptor中投递消息 */ public class LongEventProducer { private RingBuffer<LongEvent> ringBuffer; public LongEventProducer(RingBuffer<LongEvent> ringBuffer) { this.ringBuffer = ringBuffer; } public void onData(ByteBuffer byteBuffer) { // 定位到下一个可存放的位置 long sequence = ringBuffer.next(); try { // 拿到该位置的event LongEvent event = ringBuffer.get(sequence); // 设置event的值 event.set(byteBuffer.getLong(0)); } finally { // 发布 ringBuffer.publish(sequence); } } } import cn.flying.space.disruptor.demo.LongEvent; import cn.flying.space.disruptor.demo.LongEventFactory; import cn.flying.space.disruptor.demo.LongEventHandler; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.dsl.Disruptor; import java.nio.ByteBuffer; import java.util.concurrent.Executors; public class TestMain { public static void main(String[] args) throws InterruptedException { // 定义event工厂 LongEventFactory factory = new LongEventFactory(); // ringBuffer长度 int bufferSize = 1024; // 构造一个Disruptor Disruptor<LongEvent> disruptor = new Disruptor<>(factory, bufferSize, Executors.defaultThreadFactory()); // 绑定handler disruptor.handleEventsWith(new LongEventHandler()); // 启动Disruptor disruptor.start(); RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); LongEventProducer producer = new LongEventProducer(ringBuffer); ByteBuffer byteBuffer = ByteBuffer.allocate(8); for (long i = 0; true; i++) { byteBuffer.clear(); byteBuffer.putLong(i); // 投递消息 producer.onData(byteBuffer); Thread.sleep(1000); } } }
3.6 使用Translators发布消息
import cn.flying.space.disruptor.demo.LongEvent; import com.lmax.disruptor.EventTranslatorOneArg; import com.lmax.disruptor.RingBuffer; import java.nio.ByteBuffer; public class LongEventProducerUsingTranslator { private RingBuffer<LongEvent> ringBuffer; public LongEventProducerUsingTranslator(RingBuffer<LongEvent> ringBuffer) { this.ringBuffer = ringBuffer; } private static final EventTranslatorOneArg<LongEvent, ByteBuffer> TRANSLATOR = new EventTranslatorOneArg<LongEvent, ByteBuffer>() { @Override public void translateTo(LongEvent longEvent, long l, ByteBuffer byteBuffer) { longEvent.set(byteBuffer.getLong(0)); } }; public void onData(ByteBuffer byteBuffer) { ringBuffer.publishEvent(TRANSLATOR, byteBuffer); } } import cn.flying.space.disruptor.demo.LongEvent; import cn.flying.space.disruptor.demo.LongEventFactory; import cn.flying.space.disruptor.demo.LongEventHandler; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.util.DaemonThreadFactory; import java.nio.ByteBuffer; /** * @author ZhangSheng * @date 2021-4-26 14:23 */ public class TestMain { public static void main(String[] args) throws InterruptedException { LongEventFactory factory = new LongEventFactory(); int bufferSize = 1024; Disruptor<LongEvent> disruptor = new Disruptor<>(factory, bufferSize, DaemonThreadFactory.INSTANCE); disruptor.handleEventsWith(new LongEventHandler()); disruptor.start(); RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); LongEventProducerUsingTranslator producer = new LongEventProducerUsingTranslator(ringBuffer); ByteBuffer byteBuffer = ByteBuffer.allocate(8); for (long i = 0L; true; i++) { byteBuffer.putLong(0, i); // 发布 producer.onData(byteBuffer); Thread.sleep(1000); } } }
加载全部内容