简单概述
spring cloud stream是一个用于构建与共享消息传递系统连接的高度可扩展的事件驱动型微服务的框架。
应用程序通过inputs或outputs来与spring cloud stream中binder对象交互,binder对象负责与消息中间件交互。也就是说:spring cloud stream能够屏蔽底层消息中间件【rabbitmq,kafka等】的差异,降低切换成本,统一消息的编程模型。
相关概念
channel(通道):channel是消息的传输管道,用于在生产者和消费者之间传递消息。生产者通过输出通道将消息发送到destination,消费者通过输入通道从destination接收消息。
在spring cloud stream中,有两种类型的通道:输入(input)和输出(output)。这两种通道分别用于消费者接收消息和生产者发送消息。
- input(输入):input通道用于消费者从消息代理接收消息。消费者可以通过监听input通道来实时接收传入的消息
- output(输出):output通道用于生产者向消息代理发送消息。生产者可以通过向output通道发送消息来发布新的消息
destination(目标):destination是消息的目的地,通常对应于消息代理中的topic或queue。生产者将消息发送到特定的destination,消费者从其中接收消息。
binder(绑定器):binder是spring cloud stream的核心组件之一。它作为消息代理与外部消息中间件进行交互,并负责将消息发送到消息总线或从消息总线接收消息。binder负责处理消息传递、序列化、反序列化、消息路由等底层细节,使得开发者能够以统一的方式与不同的消息中间件进行交互。spring cloud stream提供了多个可用的binder实现,包括rabbitmq、kafka等。
**消费者组:**在spring cloud stream中,消费组(consumer group)是一组具有相同功能的消费者实例。当多个消费者实例属于同一个消费组时,消息代理会将消息均匀地分发给消费者实例,以实现负载均衡。如果其中一个消费者实例失效,消息代理会自动将消息重新分配给其他可用的消费者实例,以实现高可用性。(对于一个消息来说,每个消费者组只会有一个消费者消费消息)
分区:spring cloud stream支持在多个消费者实例之间创建分区,这样我们通过某些特征量做消息分发,保证相同标识的消息总是能被同一个消费者处理
spring message
spring message是spring framework的一个模块,其作用就是统一消息的编程模型。
package org.springframework.messaging; public interface message{ t getpayload(); messageheaders getheaders(); }
消息通道 messagechannel 用于接收消息,调用send方法可以将消息发送至该消息通道中:
@functionalinterface public interface messagechannel { long indefinite_timeout = -1; default boolean send(message message) { return send(message, indefinite_timeout); } boolean send(message message, long timeout); }
消息通道里的消息由消息通道的子接口可订阅的消息通道subscribablechannel
实现,被messagehandler
消息处理器所订阅
public interface subscribablechannel extends messagechannel { boolean subscribe(messagehandler handler); boolean unsubscribe(messagehandler handler); }
由messagehandler
真正地消费/处理消息
@functionalinterface public interface messagehandler { void handlemessage(message message) throws messagingexception; }
spring integration
spring integration 提供了 spring 编程模型的扩展用来支持企业集成模式(enterprise integration patterns),是对 spring messaging 的扩展。
它提出了不少新的概念,包括消息路由messageroute、消息分发messagedispatcher、消息过滤filter、消息转换transformer、消息聚合aggregator、消息分割splitter等等。同时还提供了messagechannel和messagehandler的实现,分别包括 directchannel、executorchannel、publishsubscribechannel和messagefilter、serviceactivatinghandler、methodinvokingsplitter 等内容。
spring-cloud-stream的架构
快速入门
引入依赖
org.springframework.cloud spring-cloud-starter-stream-rabbit
增加配置文件
spring: cloud: stream: # 定义消息中间件 binders: myrabbit: type: rabbit environment: spring: rabbitmq: host: localhost port: 5672 username: root password: root vhost: / bindings: # 生产者中定义,定义发布对象 myinput: destination: mystreamexchange group: mystreamgroup binder: myrabbit # 消费者中定义,定义订阅的对象 myoutput-in-0: destination: mystreamexchange group: mystreamgroup binder: myrabbit # 消费者中定义,定义输出的函数 function: definition: myoutput
生产者
@resource private streambridge streambridge; public void sendnormal() { streambridge.send("myinput", "hello world"); }
消费者
@bean("myoutput") public consumer> myoutput() { return (message) -> { messageheaders headers = message.getheaders(); system.out.println("myoutput head is : " headers); string payload = message.getpayload(); system.out.println("myoutput payload is : " payload); }; }
如何自定义binder
- 添加spring-cloud-stream依赖
- 提供
provisioningprovider
的实现提供 messageproducer
的实现提供messagehandler
的实现提供binder
的实现创建binder的配置- 在
meta-inf/spring.binders
中定义绑定器
添加spring-cloud-stream依赖
org.springframework.cloud spring-cloud-stream ${spring.cloud.stream.version}
提供provisioningprovider的实现
provisioningprovider
负责提供消费者和生产者目的地,并需要将 application.yml 或 application.properties 文件中包含的逻辑目的地转换为物理目的地引用。
public class fileprovisioningprovider implements provisioningprovider< extendedconsumerproperties, extendedproducerproperties > { public fileprovisioningprovider() { super(); } @override public producerdestination provisionproducerdestination(string name, extendedproducerproperties properties) throws provisioningexception { return new filemessagedestination(name); } @override public consumerdestination provisionconsumerdestination(string name, string group, extendedconsumerproperties properties) throws provisioningexception { return new filemessagedestination(name); } private static class filemessagedestination implements producerdestination, consumerdestination { private final string destination; private filemessagedestination(final string destination) { this.destination = destination; } @override public string getname() { return destination.trim(); } @override public string getnameforpartition(int partition) { throw new unsupportedoperationexception("partitioning is not implemented for file messaging."); } } }
提供messageproducer的实现
messageproducer负责使用事件并将其作为消息处理,发送给配置为使用此类事件的客户端应用程序。
super.oninit(); executorservice = executors.newscheduledthreadpool(1); } @override public void dostart() { executorservice.schedulewithfixeddelay(() -> { string payload = getpayload(); if (payload != null) { messagereceivedmessage = messagebuilder.withpayload(payload).build(); sendmessage(receivedmessage); } }, 0, 50, timeunit.milliseconds); } @override protected void dostop() { executorservice.shutdownnow(); } private string getpayload() { try { list alllines = files.readalllines(paths.get(fileextendedbindingproperties.getpath() file.separator destination.getname() ".txt")); string currentpayload = alllines.get(alllines.size() - 1); if (!currentpayload.equals(previouspayload)) { previouspayload = currentpayload; return currentpayload; } } catch (ioexception e) { fileutil.touch(new file(fileextendedbindingproperties.getpath() file.separator destination.getname() ".txt")); } return null; } }
提供messagehandler的实现
messagehandler
提供产生事件所需的逻辑。
public class filemessagehandler extends abstractmessagehandler { fileextendedbindingproperties fileextendedbindingproperties; producerdestination destination; public filemessagehandler(producerdestination destination, fileextendedbindingproperties fileextendedbindingproperties) { this.destination = destination; this.fileextendedbindingproperties = fileextendedbindingproperties; } @override protected void handlemessageinternal(message message) { try { if (message.getpayload() instanceof byte[]) { files.write(paths.get(fileextendedbindingproperties.getpath() file.separator destination.getname() ".txt"), (byte[]) message.getpayload()); } else { throw new runtimeexception("处理消息失败"); } } catch (ioexception e) { throw new runtimeexception(e); } } }
提供binder的实现
提供自己的binder
抽象实现:
- 扩展
abstractmessagechannelbinder
类 - 将自定义的 provisioningprovider 指定为 abstractmessagechannelbinder 的通用参数
- 重写
createproducermessagehandler
和createconsumerendpoint
方法
public class filemessagechannelbinder extends abstractmessagechannelbinder, extendedproducerproperties , fileprovisioningprovider> implements extendedpropertiesbinder { fileextendedbindingproperties fileextendedbindingproperties; public filemessagechannelbinder(string[] headerstoembed, fileprovisioningprovider provisioningprovider, fileextendedbindingproperties fileextendedbindingproperties) { super(headerstoembed, provisioningprovider); this.fileextendedbindingproperties = fileextendedbindingproperties; } @override protected messagehandler createproducermessagehandler(producerdestination destination, extendedproducerproperties producerproperties, messagechannel errorchannel) throws exception { filemessagehandler filemessagehandler = new filemessagehandler(destination, fileextendedbindingproperties); return filemessagehandler; } @override protected messageproducer createconsumerendpoint(consumerdestination destination, string group, extendedconsumerproperties properties) throws exception { filemessageproduceradapter filemessageproduceradapter = new filemessageproduceradapter(destination, fileextendedbindingproperties); return filemessageproduceradapter; } @override public fileconsumerproperties getextendedconsumerproperties(string channelname) { return fileextendedbindingproperties.getextendedconsumerproperties(channelname); } @override public fileproducerproperties getextendedproducerproperties(string channelname) { return fileextendedbindingproperties.getextendedproducerproperties(channelname); } @override public string getdefaultsprefix() { return fileextendedbindingproperties.getdefaultsprefix(); } @override public class getextendedpropertiesentryclass() { return fileextendedbindingproperties.getextendedpropertiesentryclass(); } }
创建binder的配置
严格要求创建一个 spring 配置来初始化你的绑定器实现的 bean
@enableconfigurationproperties(fileextendedbindingproperties.class) @configuration public class filemessagebinderconfiguration { @bean @conditionalonmissingbean public fileprovisioningprovider filemessagebinderprovisioner() { return new fileprovisioningprovider(); } @bean @conditionalonmissingbean public filemessagechannelbinder filemessagebinder(fileprovisioningprovider filemessagebinderprovisioner, fileextendedbindingproperties fileextendedbindingproperties) { return new filemessagechannelbinder(null, filemessagebinderprovisioner, fileextendedbindingproperties); } @bean public fileproducerproperties fileconsumerproperties() { return new fileproducerproperties(); } }
详细的代码见