springboot disruptor高性能队列使用-kb88凯时官网登录

来自:网络
时间:2023-09-07
阅读:
目录

disruptor是一个高性能队列,常见的还有kafka、rabbitmq等,下面体验一下~

1、disruptor简介

disruptor 是英国外汇交易公司lmax开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题(在性能测试中发现竟然与i/o操作处于同样的数量级)。基于 disruptor 开发的系统单线程能支撑每秒 600 万订单,2010 年在 qcon 演讲后,获得了业界关注。

其特点简单总结如下:

  • 开源的java框架,用于生产者-消费者场景;
  • 高吞吐量和低延迟;
  • 有界队列;

disruptor在github网址为:

2、disruptor概念

  • ring buffer:环形的缓冲区,环形数组中的元素采用覆盖方式,避免了jvm的gc;
  • sequence disruptor:通过顺序递增的序号来编号管理通过其进行交换的数据(事件),对数据(事件)的处理过程总是沿着序号逐个递增处理;
  • sequencer:sequencer 是 disruptor 的真正核心。此接口有两个实现类 singleproducersequencer、multiproducersequencer ,它们定义在生产者和消费者之间快速、正确地传递数据的并发算法;
  • sequence barrier:用于保持对ringbuffer的 main published sequence 和consumer依赖的其它consumer的 sequence 的引用;
  • wait strategy:定义 consumer 如何进行等待下一个事件的策略;
  • event:在 disruptor 的语义中,生产者和消费者之间进行交换的数据被称为事件(event)。它不是一个被 disruptor 定义的特定类型,而是由 disruptor 的使用者定义并指定;
  • eventprocessor:eventprocessor 持有特定消费者(consumer)的 sequence,并提供用于调用事件处理实现的事件循环(event loop);
  • eventhandler:定义的事件处理接口,由用户实现,用于处理事件,是 consumer 的真正实现;
  • producer:生产者,只是泛指调用 disruptor 发布事件的用户代码,disruptor 没有定义特定接口或类型;

3、springboot disruptor实例

在pom.xml文件中添加依赖

		
            com.lmax
            disruptor
            3.3.4
        

消息体model

@data
public class messagemodel {
    private string message;
}

构造eventfactory

public class helloeventfactory implements eventfactory {
    @override
    public messagemodel newinstance() {
        return new messagemodel();
    }
}

构造消费者

@slf4j
public class helloeventhandler implements eventhandler {
    @override
    public void onevent(messagemodel event, long sequence, boolean endofbatch) {
        try {
            //这里停止1000ms是为了确定消费消息是异步的
            thread.sleep(1000);
            log.info("消费者处理消息开始");
            if (event != null) {
                log.info("消费者消费的信息是:{}",event);
            }
        } catch (exception e) {
            log.info("消费者处理消息失败");
        }
        log.info("消费者处理消息结束");
    }
}

构造mqmanager

@configuration
public class mqmanager {
    @bean("messagemodel")
    public ringbuffer messagemodelringbuffer() {
        //定义用于事件处理的线程池, disruptor通过java.util.concurrent.executorserivce提供的线程来触发consumer的事件处理
        executorservice executor = executors.newfixedthreadpool(2);
        //指定事件工厂
        helloeventfactory factory = new helloeventfactory();
        //指定ringbuffer字节大小,必须为2的n次方(能将求模运算转为位运算提高效率),否则将影响效率
        int buffersize = 1024 * 256;
        //单线程模式,获取额外的性能
        disruptor disruptor = new disruptor<>(factory, buffersize, executor, producertype.single, new blockingwaitstrategy());
        //设置事件业务处理器---消费者
        disruptor.handleeventswith(new helloeventhandler());
        //启动disruptor线程
        disruptor.start();
        //获取ringbuffer环,用于接取生产者生产的事件
        ringbuffer ringbuffer = disruptor.getringbuffer();
        return ringbuffer;
    }
}

构造生产者

@configuration
public class mqmanager {
    @bean("messagemodel")
    public ringbuffer messagemodelringbuffer() {
        //定义用于事件处理的线程池, disruptor通过java.util.concurrent.executorserivce提供的线程来触发consumer的事件处理
        executorservice executor = executors.newfixedthreadpool(2);
        //指定事件工厂
        helloeventfactory factory = new helloeventfactory();
        //指定ringbuffer字节大小,必须为2的n次方(能将求模运算转为位运算提高效率),否则将影响效率
        int buffersize = 1024 * 256;
        //单线程模式,获取额外的性能
        disruptor disruptor = new disruptor<>(factory, buffersize, executor, producertype.single, new blockingwaitstrategy());
        //设置事件业务处理器---消费者
        disruptor.handleeventswith(new helloeventhandler());
        //启动disruptor线程
        disruptor.start();
        //获取ringbuffer环,用于接取生产者生产的事件
        ringbuffer ringbuffer = disruptor.getringbuffer();
        return ringbuffer;
    }
}

测试

	/**
     * 项目内部使用disruptor做消息队列
     * @throws exception
     */
    @test
    public void sayhellomqtest() throws exception{
        helloeventproducer.sayhellomq("hello world!");
        log.info("消息队列已发送完毕");
        //这里停止2000ms是为了确定是处理消息是异步的
        thread.sleep(2000);
    }

运行结果如下

4、小结

引用disruptor作为内部的高性能队列,应用于生产者-消费者模式中还是非常nice的,后面若有工程需求可以尝试一下。

返回顶部
顶部
网站地图