为什么使用spring stream ?
spring stream 是用来做消息队列发送消息使用的。他隔离了各种消息队列的区别,使用统一的编程模型来发送消息。
目前支持:
- rabbitmq
- kafka
- rocketmq
启动rocketmq
rocketmq 支持windows
start mqnamesrv.cmd start mqbroker.cmd -n 127.0.0.1:9876 autocreatetopicenable=true
修改pom.xml
com.alibaba.cloud spring-cloud-stream-binder-rocketmq
增加发送接收java代码
public interface inputoutput { string mail_output = "mailoutput"; string mail_input = "mailinput"; string output = "output"; string input = "input"; @output(output) messagechannel output(); @input(input) subscribablechannel input(); @output(mail_output) messagechannel mailoutput(); @input(mail_input) subscribablechannel mailinput(); }
在应用上增加注解
@enablebinding({inputoutput.class})
增加yml配置
spring: cloud: stream: rocketmq: binder: name-server: 127.0.0.1:9876 bindings: output: destination: bpmmessage group: bpmmessage-group input: destination: bpmmessage group: bpmmessage-group-consumer mailoutput: destination: mail group: mail-group mailinput: destination: mail group: mail-group-consumer
编写代码收发消息:
messagemodel messagemodel=new messagemodel(); messagemodel.setmsgtype("mail"); messagemodel.setcontent("helloworld"); inputoutput.mailoutput().send( messagebuilder.withpayload( "mail" ).build()); inputoutput.output().send( messagebuilder.withpayload( messagemodel ).build() );
这里发送的是两类消息。
接收消息:
@service public class messagelistener { @streamlistener(inputoutput.input) public void receive(messagemodel message) { system.err.println(message); system.err.println("ok"); } @streamlistener(inputoutput.mail_input) public void receive(string message) { system.err.println(message); system.err.println("ok"); } }
分别接收两类消息