Hello,欢迎来到程序员社区。 今天聊一聊 Spring Cloud Stream 简单使用,希望对大家有所帮助。
Java面试手册PDF下载:http://117.78.51.75/219-2
文章目录
-
- Spring Cloud Stream 简单使用
-
- 开启绑定功能
- 绑定消息通道
- 注入绑定接口
- 注入消息通道
- 消息生产与消费
-
- Spring Integration原生支持
- 消息反馈
- 参考
Spring Cloud Stream 简单使用
开启绑定功能
在Spring Cloud Stream中,我们需要通过@EnableBinding
注解来为应用启动消息驱动的功能,该注解我们在快速入门中已经有了基本的介绍,下面来详细看看它的定义:
@Target({ElementType.TYPE, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Configuration
@Import({BindingBeansRegistrar.class, BinderFactoryAutoConfiguration.class})
@EnableIntegration
public @interface EnableBinding {
Class?>[] value() default {};
}
从该注解的定义中我们可以看到,它自身包含了
@configuration
注解,所以用它注解的类也会成为Spring的基本配置类。另外该注解还通过@Import
加载了Spring Cloud Stream运行需要的几个基础配置类。
@EnableBinding
注解只有一个唯一的属性:value。由于该注解@Import
了BindingBeansRegistrar
实现,所以在加载了基础配置内容之后,它会回调来读取value中的类,以创建消息通道的绑定。另外,由于value是一个Class类型的数组,所以我们可以通过value属性一次性指定多个关于消息通道的配置。
绑定消息通道
在Spring Cloud Steam中,我们可以在接口中通过
@Input
和@Output
注解来定义消息通道,而用于定义绑定消息通道的接口则可以被@EnableBinding
注解的value参数来指定,从而在应用启动的时候实现对定义消息通道的绑定。
Sink
接口是Spring cloud Steam 提供的一个默认实现,除此之外还有Source
和Processor
,可从它们的源码中学习它们的定义方式:
public interface Sink {
String INPUT = "input";
@Input("input")
SubscribableChannel input();
}
public interface Source {
String OUTPUT = "output";
@Output("output")
MessageChannel output();
}
public interface Processor extends Source, Sink {
}
从上面的源码中,我们可以看到,Sink和Source中分别通过
@Input
和@Output
注解定义了输入通道和输出通道,而Processor通过继承Source和sink的方式同时定义了一个输入通道和一个输出通道。
另外,
@Input
和@Output
注解都还有一个value属性,该属性可以用来设置消息通道的名称,这里Sink和Source中指定的消息通道名称分别为input和output。如果我们直接使用这两个注解而没有指定具体的value值,将默认使用方法名作为消息通道的名称。
最后,需要注意一点,当我们定义输出通道的时候,需要返回
Messagechannel
接口对象,该接口定义了向消息通道发送消息的方法;而定义输入通道时,需要返回SubscribableChannel
接口对象,该接口继承自MessageChannel
接口,它定义了维护消息通道订阅者的方法。
注入绑定接口
在完成了消息通道绑定的定义之后,Spring Cloud Stream会为其创建具体的实例,而开发者只需要通过注入的方式来获取这些实例并直接使用即可,下面可以通过注入的方式实现一个消息生成者,向input消息通道发送数据。
- 创建一个将Input消息通道作为输出通道的接口,具体如下:
package com.example.stream;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.MessageChannel;
public interface SinkSender {
@Output(Sink.INPUT)
MessageChannel output();
}
- 在
@EnableBinding
注解中增加对SinkSender
接口的指定,使Spring Cloud Stream能创建出对应的实例。
package com.example.stream;
import org.slf4j.LoJava面试手册gger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
@EnableBinding({Sink.class,SinkSender.class})
public class SinkReceiver {
private static final Logger LOGGER = LoggerFactory.getLogger(SinkReceiver.class);
@StreamListener(Sink.INPUT)
public void receive(Object payload) {
LOGGER.info("Received: {}", payload);
}
}
- 创建一个单元测试类,通过
@Autowired
注解注入SinkSender
的实例,并在测试用例中调用它的发送消息方法。
package com.example.stream;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.messaging.support.MessageBuilder;
@SpringBootTest
class SinkSenderTest {
@Autowired
private SinkSender sinkSender;
@Test
public void contextLoads(){
sinkSender.output().send(MessageBuilder.withPayload("From SinkSender").build());
}
}
- 运行该单元测试用例,如果可以在控制台中找到如下输出内容,表明我们的试验已经成功了,消息被正确地发送到了input通道中,并被相对应的消息消费编程电子书汇总者输出。
2020-07-28 09:37:32.007 INFO 17224 --- [ main] com.example.stream.SinkReceiver : Received: From SinkSender
源代码参考:https://gitee.com/cckevincyh/cloud-stream-demo/tree/demo01/
注入消息通道
由于Spring Cloud Stream会根据绑定接口中的@Input
和@Output
注解来创建消息通道实例,所以我们也可以通过直接注入的方式来使用消息通道对象。比如,我们可以通过下面的示例,注入上面例子中SinkSender
接口中定义的名为input的消息输入通道。
package com.example.stream;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
@SpringBootTest
class SinkSenderTest {
@Autowired
private MessageChannel input;
@Test
public void contextLoads(){
input.send(MessageBuilder.withPayload("From SinkSender").build());
}
}
源代码参考:https://gitee.com/cckevincyh/cloud-stream-demo/tree/demo2/
上面定义的内容,完成了与之前通过注入绑定接口sinksender方式实现的测试用例相同的操作。因为在通过注入绑定接口实现时,
sinkSender.output()
方法实际获得的就是Sinksender
接口中定义的Messagechannel
实例,只是在这里我们直接通过注入的方式来实现了而已。这种用法虽然很直接,但是也容易犯错,很多时候我们在一个微服务应用中可能会创建多个不同名的Messagechannel
实例,这样通过@Autowired
注入时,要注意参数命名需要与通道同名才能被正确注入,或者也可以使用@Qualifier
注解来特别指定具体实例的名称,该名称需要与定义Messagechannel
的@Output
中的value参数一致,这样才能被正确注入。比如下面的例子,在一个接口中定义了两个输出通道,分别命名为Output-1
和Output-2
,当要使用Output-1
的时候,可以通过
@Qualifier("Output-1")
来指定这个具体的实例来注入使用。
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
public interface MySource {
String OUTPUT_1 = "Output-1";
String OUTPUT_2 = "Output-2";
@Output(MySource.OUTPUT_1)
MessageChannel output1();
@Output(MySource.OUTPUT_2)
MessageChannel output2();
}
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
@SpringBootTest
class StreamAppTest {
@Qualifier("Output-1")
@Autowired
private MessageChannel output1;
@Qualifier("Output-2")
@Autowired
private MessageChannel output2;
@Test
public void contextLoads(){
output1.send(MessageBuilder.withPayload("From Output-1").build());
output2.send(MessageBuilder.withPayload("From Output-2").build());
}
}
源代码参考:https://gitee.com/cckevincyh/cloud-stream-demo/tree/demo3/
消息生产与消费
由于Spring Cloud Stream是基于Spring Integration构建起来的,所以在使用Spring Cloud Stream构建消息驱动服务的时候,完全可以使用Spring Integration的原生注解来实现各种业务需求。同时,为了简化面向消息的编程模型,Spring Cloud Stream还提供了
@StreamListener
注解对输入通道的处理做了进一步优化。下面我们分别从这两方面来学习一下对消息的处理。
Spring Integration原生支持
通过之前的内容,我们已经能够通过注入绑定接口和消息通道的方式实现向名为input的消息通道发送信息。接下来,我们通过Spring Integration 原生的
@ServiceActivator
和@InboundChannelAdapter
注解来尝试实现相同的功能
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.integration.annotation.ServiceActivator;
@EnableBinding({Sink.class})
public class SinkReceiver {
private static final Logger LOGGER = LoggerFactory.getLogger(SinkReceiver.class);
@ServiceActivator(inputChannel = Sink.INPUT)
public void receive(Object payload) {
LOGGER.info("Received: {}", payload);
}
}
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.core.MessageSource;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.GenericMessage;
import java.util.Date;
@EnableBinding(value = {SinkSender.SinkOutPut.class})
public class SinkSender {
private static final Logger LOGGER = LoggerFactory.getLogger(SinkSender.class);
@Bean
@InboundChannelAdapter(value = SinkOutPut.OUTPUT, poller = @Poller(fixedDelay = "2000"))
public MessageSourceDate> timerMessageSource() {
return () -> new GenericMessage>(new Date());
}
public interface SinkOutPut {
String OUTPUT = Sink.INPUT;
@Output(SinkOutPut.OUTPUT)
MessageChannel output();
}
}
-
SinkReceiver
类属于消息消费者实现,与之前实现的类似,只是做了一些修改:
使用原生的@ServiceActivator
注解替换了@StreamListener
,实现对Sink.INPUT
通道的监听处理,而该通道绑定了名为input的主题。 -
SinkSender
类属于消息生产者实现,它在内部定义了SinkOutPut
接口来将输出通道绑定到名为input的主题中。由于SinkSender
和SinkReceiver
共用一个主题,所以它们构成了一组生产者与消费者。另外,在SinkSender中还创建了用于生产消息的timerMessageSource
方法,该方法会将当前时间作为消息返回。而@InboundChannelAdapter
注解定义了该方法是对SinkOutPut.OUTPUT
通道的输出绑定,同时使用poller参数将该方法设置为轮询执行,这里我们定义为2000毫秒,所以它会以2秒的频率向SinkOutPut.OUTPUT
通道输出当前时间。
源代码参考:https://gitee.com/cckevincyh/cloud-stream-demo/tree/demo4/
另外,还可以通过@Transformer
注解对指定通道的消息进行转换。
@Transformer(inputChannel = MySink.INPUT_1, outputChannel = MySource.OUTPUT_2)
public Object transformer(Date message) {
return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(message);
}
源代码参考:https://gitee.com/cckevincyh/cloud-stream-demo/tree/demo5/
注意:如果使用
@StreamListener
注释从同一channel中消费时,将使用pub-sub模型。带有注释的每种方法均@StreamListener接收其自己的消息副本,并且每种方法都有其自己的消费组。但是,但是如果使用Spring Integration的注解,比如@Transformer
或@ServiceActivator
,这些属于消费竞争模型。没有为每个订阅创建单独的消费者组。
演示代码如下:
- 使用
@StreamListener
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
@EnableBinding({Sink.class})
public class SinkReceiver {
private static final Logger LOGGER = LoggerFactory.getLogger(SinkReceiver.class);
@StreamListener(Sink.INPUT)
public void receive1(Object payload) {
LOGGER.info("Received1: {}", payload);
}
@StreamListener(Sink.INPUT)
public void receive2(Object payload) {
LJava面试手册OGGER.info("Received2: {}", payload);
}
}
- 使用
@ServiceActivator
import org.slf4j.Logg编程电子书汇总er;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.integration.annotation.ServiceActivator;
@EnableBinding({Sink.class})
public class SinkReceiver {
private static final Logger LOGGER = LoggerFactory.getLogger(SinkReceiver.class);
@ServiceActivator(inputChannel = Sink.INPUT)
public void receive1(Object payload) {
LOGGER.info("Received1: {}", payload);
}
@ServiceActivator(inputChannel = Sink.INPUT)
public void receive2(Object payload) {
LOGGER.info("Received2: {}", payload);
}
}
源代码参考: https://gitee.com/cckevincyh/cloud-stream-demo/tree/demo6/
消息反馈
很多时候在处理完输入消息之后,需要反馈一个消息给对方,这时候可以通
@SendTo
注解来指定返回内容的输出通道。
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.handler.annotation.SendTo;
import java.text.SimpleDateFormat;
import java.util.Date;
@EnableBinding({MySink.class})
public class SinkReceiver {
private static final Logger LOGGER = LoggerFactory.getLogger(SinkReceiver.class);
@StreamListener(MySink.INPUT_1)
@SendTo(MySource.OUTPUT_2)
public Object transformer(Date message) {
LOGGER.info("Received from input1: {}", message);
return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(message);
}
@StreamListener(MySink.INPUT_2)
public void receive2(Object payload) {
LOGGE编程电子书汇总R.info("Received from input2: {}", payload);
}
}
源代码参考:https://gitee.com/cckevincyh/cloud-stream-demo/tree/demo8/
参考
Spring Cloud微服务实战.pdf
Spring Cloud Stream知识点盘点
https://cloud.spring.io/spring-cloud-stream/spring-cloud-stream.html#spring-cloud-stream-reference
时间不一定能证明很多东西,但是一定能看透很多东西。坚信自己的选择,不动摇,使劲跑,明天会更好。