目录
disruptor是一个高性能队列,常见的还有kafka、rabbitmq等,下面体验一下~
1、disruptor简介
disruptor 是英国外汇交易公司lmax开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题(在性能测试中发现竟然与i/o操作处于同样的数量级)。基于 disruptor 开发的系统单线程能支撑每秒 600 万订单,2010 年在 qcon 演讲后,获得了业界关注。
其特点简单总结如下:
- 开源的java框架,用于生产者-消费者场景;
- 高吞吐量和低延迟;
- 有界队列;
disruptor在github网址为:
2、disruptor概念
- ring buffer:环形的缓冲区,环形数组中的元素采用覆盖方式,避免了jvm的gc;
- sequence disruptor:通过顺序递增的序号来编号管理通过其进行交换的数据(事件),对数据(事件)的处理过程总是沿着序号逐个递增处理;
- sequencer:sequencer 是 disruptor 的真正核心。此接口有两个实现类 singleproducersequencer、multiproducersequencer ,它们定义在生产者和消费者之间快速、正确地传递数据的并发算法;
- sequence barrier:用于保持对ringbuffer的 main published sequence 和consumer依赖的其它consumer的 sequence 的引用;
- wait strategy:定义 consumer 如何进行等待下一个事件的策略;
- event:在 disruptor 的语义中,生产者和消费者之间进行交换的数据被称为事件(event)。它不是一个被 disruptor 定义的特定类型,而是由 disruptor 的使用者定义并指定;
- eventprocessor:eventprocessor 持有特定消费者(consumer)的 sequence,并提供用于调用事件处理实现的事件循环(event loop);
- eventhandler:定义的事件处理接口,由用户实现,用于处理事件,是 consumer 的真正实现;
- producer:生产者,只是泛指调用 disruptor 发布事件的用户代码,disruptor 没有定义特定接口或类型;
3、springboot disruptor实例
在pom.xml文件中添加依赖
com.lmax disruptor 3.3.4
消息体model
@data public class messagemodel { private string message; }
构造eventfactory
public class helloeventfactory implements eventfactory{ @override public messagemodel newinstance() { return new messagemodel(); } }
构造消费者
@slf4j public class helloeventhandler implements eventhandler{ @override public void onevent(messagemodel event, long sequence, boolean endofbatch) { try { //这里停止1000ms是为了确定消费消息是异步的 thread.sleep(1000); log.info("消费者处理消息开始"); if (event != null) { log.info("消费者消费的信息是:{}",event); } } catch (exception e) { log.info("消费者处理消息失败"); } log.info("消费者处理消息结束"); } }
构造mqmanager
@configuration public class mqmanager { @bean("messagemodel") public ringbuffermessagemodelringbuffer() { //定义用于事件处理的线程池, disruptor通过java.util.concurrent.executorserivce提供的线程来触发consumer的事件处理 executorservice executor = executors.newfixedthreadpool(2); //指定事件工厂 helloeventfactory factory = new helloeventfactory(); //指定ringbuffer字节大小,必须为2的n次方(能将求模运算转为位运算提高效率),否则将影响效率 int buffersize = 1024 * 256; //单线程模式,获取额外的性能 disruptor disruptor = new disruptor<>(factory, buffersize, executor, producertype.single, new blockingwaitstrategy()); //设置事件业务处理器---消费者 disruptor.handleeventswith(new helloeventhandler()); //启动disruptor线程 disruptor.start(); //获取ringbuffer环,用于接取生产者生产的事件 ringbuffer ringbuffer = disruptor.getringbuffer(); return ringbuffer; } }
构造生产者
@configuration public class mqmanager { @bean("messagemodel") public ringbuffermessagemodelringbuffer() { //定义用于事件处理的线程池, disruptor通过java.util.concurrent.executorserivce提供的线程来触发consumer的事件处理 executorservice executor = executors.newfixedthreadpool(2); //指定事件工厂 helloeventfactory factory = new helloeventfactory(); //指定ringbuffer字节大小,必须为2的n次方(能将求模运算转为位运算提高效率),否则将影响效率 int buffersize = 1024 * 256; //单线程模式,获取额外的性能 disruptor disruptor = new disruptor<>(factory, buffersize, executor, producertype.single, new blockingwaitstrategy()); //设置事件业务处理器---消费者 disruptor.handleeventswith(new helloeventhandler()); //启动disruptor线程 disruptor.start(); //获取ringbuffer环,用于接取生产者生产的事件 ringbuffer ringbuffer = disruptor.getringbuffer(); return ringbuffer; } }
测试
/** * 项目内部使用disruptor做消息队列 * @throws exception */ @test public void sayhellomqtest() throws exception{ helloeventproducer.sayhellomq("hello world!"); log.info("消息队列已发送完毕"); //这里停止2000ms是为了确定是处理消息是异步的 thread.sleep(2000); }
运行结果如下
4、小结
引用disruptor作为内部的高性能队列,应用于生产者-消费者模式中还是非常nice的,后面若有工程需求可以尝试一下。