Hello,欢迎来到程序员社区。 今天聊一聊 Spring Cloud Stream 异常处理,希望对大家有所帮助。
Java面试手册PDF下载:http://117.78.51.75/219-2
文章目录
-
- Spring Cloud Stream 异常处理
-
- 失败自动重试
- 自编程电子书汇总定义错误处理逻辑
- 全局异常处理
- 死信队列
- 参考
Spring Cloud Stream 异常处理
失败自动重试
Spring Cloud Stream 支持失败之后自动重试,其实已经包含了默认的配置,但是我们可以通过如下的配置自定义我们的配置。
spring:
rabbitmq:
host: 192.168.99.100
port: 5672
username: guest
password: guest
cloud:
stream:
bindings:
input:
destination: greetings
consumer:
max-attempts: 4 # 默认为3
backOffInitialInterval: 5000 #消息消费失败后重试消费消息的初始化间隔时间。默认1000ms,即第一次重试消费会在1s后进行
backOffMultiplier: 5 #相邻两次重试之间的间隔时间的倍数。默认2,即第二次是第一次间隔时间的2倍,第三次是第二次的2倍
backOffMaxInterval: 60000 #下一次尝试重试的最大时间间隔,默认为10000ms,即10s
代码参考:https://gitee.com/cckevincyh/cloud-stream-demo/tree/demo11/
自定义错误处理逻辑
首先我们的配置如下:
spring:
rabbitmq:
host: 192.168.99.100
port: 5672
username: guest
password: guest
cloud:
stream:
bindings:
input:
destination: greetings
group: stream-exception-handler
自定义错误处理也就是本地错误处理,就是处理指定的通道。我们需要定义一个处理异常的方法,并在此方法上添加@ServiceActivator
批注。批注具有inputChannel
属性,该属性以{destination}.{group}.errors
的形式指编程电子书汇总定要处理的通道编程电子书汇总。
代码如下:
@EnableBinding({Sink.class})
public class SinkReceiver {
private static final Logger LOGGER = LoggerFactory.getLogger(SinkReceiver.class);
@StreamListener(Sink.INPUT)
public void receive(Object payload) {
LOGGER.info("Received: {},{}", payload,new Date());
throw new RuntimeException("error!");
}
/**
* 消息消费失败的降级处理逻辑
*
* @param message
*/
@ServiceActivator(inputChannel =Java面试手册 "greetings.stream-exception-handler.errors")
public void error(Message?> message) {
LOGGER.info("Message consumer failed, call fallback!");
}
}
运行结果如下:
代码参考:https://gitee.com/cckevincyh/cloud-stream-demo/tree/demo12/
全局异常处理
全局处理能够处理所有通道抛出的异常,所有通道抛出的异常都会生成一个ErrorMessage对象,即错误消息。错误消息放置在专用通道中,该专用通道是错误通道。因此,通过侦听errorChannel,可以处理全局异常。
代码如下:
@EnableBinding({Sink.class})
public class SinkReceiver {
private static final Logger LOGGER = LoggerFactory.getLogger(SinkReceiver.class);
@StreamListener(Sink.INPUT)
public void receive(Object payload) {
LOGGER.info("Received: {},{}", payload,new Date());
throw new RuntimeException("error!");
}
/**
* A Method of Handling Global Exceptions
Java面试手册 *
* @param errorMessage Exception message object
*/
@StreamListener("errorChannel")
public void handleError(ErrorMessage errorMessage) {
LOGGER.error("exception occurred. errorMessage = {}", errorMessage);
}
}
代码参考:https://gitee.com/cckevincyh/cloud-stream-demo/tree/demo14/
死信队列
配置如下:
spring:
rabbitmq:
host: 192.168.99.100
port: 5672
username: guest
password: guest
cloud:
stream:
bindings:
input:
destination: greetings
group: stream-exception-handler
consumer:
max-attempts: 4
rabbit:
bindings:
input:
consumer:
auto-bind-dlq: true #开启DLQ(死信队列)
启动控制台如下:
当出现异常之后就会把message丢入到死信队列
然后我们可以利用控制台的Move messages
的功能把死信队列的message丢回到处理的队列中去。
代码参考:https://gitee.com/cckevincyh/cloud-stream-demo/tree/demo13/
参考
Spring Cloud Stream 进阶配置——高可用(一)——失败重试
Spring Cloud Stream消费失败后的处理策略(一):自动重试
Spring Cloud Stream消费失败后的处理策略(二):自定义错误处理逻辑
Spring Cloud Stream 进阶配置——高可用(二)——死信队列
Spring Cloud Stream 重点与总结
Spring Cloud Stream exception handling
Spring Cloud Stream消费失败后的处理策略(三):使用DLQ队列(RabbitMQ)
Spring Cloud Stream消费失败后的处理策略(四):重新入队(RabbitMQ)
时间不一定能证明很多东西,但是一定能看透很多东西。坚信自己的选择,不动摇,使劲跑,明天会更好。