spring reactor基本介绍和案例-kb88凯时官网登录

来自:
时间:2024-07-04
阅读:

1.1 reactor 线程模型

reactor 线程模型就是通过 单个线程 使用 java nio 包中的 selector 的 select()方法,进行监听。当获取到事件(如 accept、read 等)后,就会分配(dispatch)事件进行相应的事件处理(handle)。

如果要给 reactor 线程模型 下一个更明确的定义,应该是:

reactor线程模式 = reactor(i/o多路复用)  线程池

spring reactor基本介绍和案例

netty、redis 使用了此模型,主要是解决 c10k 问题

c10k 问题:如何支持 10k 个并发连接

1.2 spring reactor

reactor 是 jvm 完全非阻塞的响应式编程基础,响应式编程是一种关注数据流和变化传播的异步编程范式。这意味着可以通过所采用的编程语言轻松地表达静态(例如数组)或动态(例如事件发射器)数据流。

mono> cartinfomono = mono.just( "songjiyang" )
        .map( userservice::finduserbyname )
        .map( userservice::findusershoppingcart );
string user = userservice.finduserbyname( "songjiyang" );
list usershoppingcart = userservice.findusershoppingcart( user );

1.3 区别和联系

联系:

  • 两者都是使用异步的手段来提高系统的性能

区别:

  • reactor 模型主要异步的处理新连接、连接和读写,而 spring reactor 在更高的代码级别提供了异步框架

或者反过来说,新连接、连接和读写等事件触发了 netty reactor 的某些管道处理器流程,某些事件触发了 spring reactor 的执行流程,这也是 reactor(反应器)名称的由来

上面我们一直在讲异步,异步其实是针对调用者的,也就是调用者调用完方法之后就可以做的别的事情了,java 中实现异步就两种方式:

  1. 回调
  2. 多线程

2.1 回调

回调其实就是把当前的事情完成之后,后面需要做的事当成函数传进行,等完成之后调用就行

    public static void main( string[] args ){
       doa( ( next ) -> {
          log.info( "dob" );
          next.run();
       }, () -> log.info( "doc" ) );
    }
    public static void doa( consumer next, runnable nextnext ){
       log.info( "doa" );
       next.accept( nextnext );
    }
    
// output
15:06:52.818 [main] info concurrent.completetest - doa
15:06:52.820 [main] info concurrent.completetest - dob
15:06:52.820 [main] info concurrent.completetest - doc

回调是在一个线程中来完成的,很容易理解,但问题是回调太多代码就变的很复杂,有回调地域的问题

回调只是一种异步的编程方式,本身实现异步其实还是需要多线程,例如单独起一个监听线程来执行回调函数,例如 eventlistener

如果执行的任务不考虑线程安全问题的话,可以使用 completablefuture 来解决,会更加易于阅读

completablefuture
       .runasync( ()-> log.info("doa") )
       .thenrunasync( ()-> log.info("dob") )
       .thenrunasync( ()->log.info("doc") )
       .get();
// output
15:08:04.407 [forkjoinpool.commonpool-worker-1] info concurrent.completetest - doa
15:08:04.410 [forkjoinpool.commonpool-worker-1] info concurrent.completetest - dob
15:08:04.410 [forkjoinpool.commonpool-worker-1] info concurrent.completetest - doc

completablefuture 的 thenrunasync 也是基于回调,每个任务 class 会有一个 next, 多个任务组成一个回调链

mono.just("")
       .doonnext( (x)-> log.info("doa") )
       .doonnext( (x)-> log.info("dob") )
       .doonnext( (x)-> log.info("doc") )
       .block();
15:12:56.160 [main] info concurrent.completetest - doa
15:12:56.160 [main] info concurrent.completetest - dob
15:12:56.161 [main] info concurrent.completetest - doc

2.2 多线程

多线程的方式,大家应该都很熟悉

  1. thread
  2. executorservice 线程池
  3. completionservice 带结果队列的线程池
  4. completablefuture 用于任务编排
  5. runable、callable、future、completablefuture

从上面可以看到一些使用 reactor 的代码中,都可以在原生 jdk 中找到替换,那我们为什么还需要它呢?

  • 可组合和可读性
  • 丰富的操作
  • 订阅之前什么都不会发生
  • 背压

下面是 java9 中 flow 类的类图,springreactor 也是使用这四个类,在 java9 中已经成了规范

spring reactor基本介绍和案例

3.1 publisher

  • mono,提供 0 到 1 个 item

spring reactor基本介绍和案例

  • flux,提供 0 到 n 个 item

spring reactor基本介绍和案例

发布者提供 n 个 item, 经过一些 operator(数据处理操作),完成或者异常中止

核心方法:

  • subscribe

3.1.1 创建

mono nodata = mono.empty(); 
mono data = mono.just("foo");
flux numbersfromfivetoseven = flux.range(5, 3); 
mono.fromsupplier( ()->1 );
mono.fromfuture( completablefuture.runasync( ()-> {} ) );
flux.create((sink)->{
    for( int i = 0; i < 5; i   ){
       sink.next( i ) ;
    }
    sink.complete();
});

3.1.2 处理

下面这些都称为 operator,可以很灵活处理其中的 item

  • 转化 map、flatmap、
  • 消费 doonnext、donexterror、dooncancel
  • 过滤 filter、distinct、take
  • 错误处理 onerrorreturn、onerrorcomplete、onerrorresume、dofinally
  • 时间相关 timeout、interval、delay
  • 分隔 window、buffer
  • 转同步 block、tostream

3.1.3 订阅

订阅然后消费发布者的内容

subscribe(); 
subscribe(consumer consumer); 

订阅之后的返回值是disposable****,可以使用这个对象来取消订阅,会告诉发布者停止生产对象,但不保证会立即终止

  • 当然可以给 subscribe 传递参数,自定义 complete 或者 error 时需要做的时
  • 同时可以使用 basesubscriber 类来实现订阅,可以控制消费的数量

3.2 subscriber

消费者一般不用手动创建,通过 subscribe 传进 consumer 函数后,会自动生成一个 lambdasubscriber,核心方法:

  • onsubscribe
  • onnext
  • onerror
  • oncomplete

3.3 processor

既是发布者,又是订阅者

3.4 subscription

订阅,消费者调用 subscribe 方法之后可以在 onsubscribe 回调中获取,可以请求下一个 item 或者取消订阅

  • request
  • cancel

3.5 thread 和 scheduler

没有指定的情况下:

  • 当前的 operator 使用上一个 operator 的线程,最先的 operator 使用调用 subscribe 的线程来执行

reactor 中使用 scheduler 来执行流程,类似 executorservice

  • subscribeon 可以指定订阅时使用的线程,这样可以不阻塞的订阅
  • publishon 指定发布时使用的线程

spring reactor基本介绍和案例

流程中可以优化的点:

  1. 准备数据可以异步,等需要用的时候在去阻塞获取,相当于一个 future
  2. 召回可以完成之后就去等正排数据,新的问题,如何去重?本来拿一次正排数据,现在拿 n 个召回次数据,请求量是不是会变大,耗时是不是也会增加
  3. 过滤的准备数据也可以异步,也就是说某个过滤策略的数据准备好了,就可以去执行过滤了,而且还存在很多不需要依赖数据的过滤策略也需要等
  4. 一般粗排只需要 1000 条数据,过滤时已经拿够了 1000 条就可以跳过了

我们上面所说的异步,其实就是说流程中某些节点是在同时执行的,不必等一个节点完成后再执行另外一个,这其实一个统筹学的问题

spring reactor基本介绍和案例

4.1 解决方法对比

问题 java 原生 reactor
准备数据异步 future,缺点:1. 需要调用方处理异常 2. 不能编排后续流程,eg: 拿完企业信息后继续拿企业治理信息,future 需要 get 阻塞 mono, 使用 onerrorresume 处理异常,使用 map 编排后续流程
召回完成拿正排 需要一个阻塞队列,召回把结果往里面 push,另外一个线程从队列里面拿同时去取正排数据,需要自己维护 map 来去重,需要循环等待到达批次后去取正排 flux,召回使用 sink.next 把结果放进去合并节点订阅,使用 distinct 来去重,使用 buffer 来实现批次数据
过滤准备数据异步 需要阻塞队列 flux, 在依赖任务中把准备好的过滤策略放进去,过滤节点订阅 flux 并过滤
粗排取 1000 条 异步执行过滤,把过滤结果放到一个容器中,粗排节点不断查看这个容器的结果是否够 1000 条,够了就可以执行粗排了 flux, 使用 take(1000)
for (strategyconfig filterconfig : filterconfiglist) {
    dostrategyfilter(filterchaincontext, recommendcontext, recrequest, filterconfig, allfilters, partitioncontext, partitiontrace);
}
readystrategyflux.publishon(executorserviceholder.scheduler).doonnext((readystrategyname) -> {
    try {
        list strategyconfigs = strategynametoconfigs.get(readystrategyname);
        for (strategyconfig strategyconfig : strategyconfigs) {
            dostrategyfilter(filterchaincontext, recommendcontext, recrequest, strategyconfig, allfilters, partitioncontext, partitiontrace);
        }
    } catch (exception e) {
        logger.error("doonnext filter error", e);
    }
}).blocklast();

这里的 blocklast 又回到了同步世界,可以很好的和已有的代码兼容

下面是 20240629 到 20240702 某个场景优化过滤阶段的耗时对比

  pv qps tp99 avg
实验组 4051865 46.90 369.00 230.88
对照组 4054074 46.92 397.00 251.55

业务指标对比

无明显波动

spring reactor 是一个响应式编程框架,非常适合类似 mxn 这样的流程编排系统,也是 java 中异步编程的一种补充,但也会有一些其他的问题,例如潜在的线程安全问题,已有框架的冲突 threadlocal 等

【1】开发者社区

【2】

【3】

返回顶部
顶部
网站地图