程序员社区

Spring Cloud Stream 异常处理

Hello,欢迎来到程序员社区。 今天聊一聊 Spring Cloud Stream 异常处理,希望对大家有所帮助。

Java面试手册PDF下载:http://117.78.51.75/219-2

文章目录

    • Spring Cloud Stream 异常处理
    • 参考

Spring Cloud Stream 异常处理

失败自动重试

Spring Cloud Stream 支持失败之后自动重试,其实已经包含了默认的配置,但是我们可以通过如下的配置自定义我们的配置。

spring:
 rabbitmq:
    host: 192.168.99.100
    port: 5672
    username: guest
    password: guest
 cloud:
   stream:
     bindings:
       input:
         destination: greetings
         consumer:
           max-attempts: 4 # 默认为3
           backOffInitialInterval: 5000 #消息消费失败后重试消费消息的初始化间隔时间。默认1000ms,即第一次重试消费会在1s后进行
           backOffMultiplier: 5 #相邻两次重试之间的间隔时间的倍数。默认2,即第二次是第一次间隔时间的2倍,第三次是第二次的2倍
           backOffMaxInterval: 60000 #下一次尝试重试的最大时间间隔,默认为10000ms,即10s

代码参考:https://gitee.com/cckevincyh/cloud-stream-demo/tree/demo11/

自定义错误处理逻辑

首先我们的配置如下:

spring:
 rabbitmq:
    host: 192.168.99.100
    port: 5672
    username: guest
    password: guest
 cloud:
   stream:
     bindings:
       input:
         destination: greetings
         group: stream-exception-handler

自定义错误处理也就是本地错误处理,就是处理指定的通道。我们需要定义一个处理异常的方法,并在此方法上添加@ServiceActivator批注。批注具有inputChannel属性,该属性以{destination}.{group}.errors的形式指编程电子书汇总定要处理的通道编程电子书汇总

代码如下:

@EnableBinding({Sink.class})
public class SinkReceiver {
    private static final Logger LOGGER = LoggerFactory.getLogger(SinkReceiver.class);

    @StreamListener(Sink.INPUT)
    public void receive(Object payload) {
        LOGGER.info("Received: {},{}", payload,new Date());
        throw new RuntimeException("error!");
    }

    /**
     * 消息消费失败的降级处理逻辑
     *
     * @param message
     */
    @ServiceActivator(inputChannel =Java面试手册 "greetings.stream-exception-handler.errors")
    public void error(Message?> message) {
        LOGGER.info("Message consumer failed, call fallback!");
    }
}

运行结果如下:

Spring Cloud Stream 异常处理插图

代码参考:https://gitee.com/cckevincyh/cloud-stream-demo/tree/demo12/

全局异常处理

全局处理能够处理所有通道抛出的异常,所有通道抛出的异常都会生成一个ErrorMessage对象,即错误消息。错误消息放置在专用通道中,该专用通道是错误通道。因此,通过侦听errorChannel,可以处理全局异常。

代码如下:

@EnableBinding({Sink.class})
public class SinkReceiver {
    private static final Logger LOGGER = LoggerFactory.getLogger(SinkReceiver.class);

    @StreamListener(Sink.INPUT)
    public void receive(Object payload) {
        LOGGER.info("Received: {},{}", payload,new Date());
        throw new RuntimeException("error!");
    }

    /**
     * A Method of Handling Global Exceptions
   Java面试手册  *
     * @param errorMessage Exception message object
     */
    @StreamListener("errorChannel")
    public void handleError(ErrorMessage errorMessage) {
        LOGGER.error("exception occurred. errorMessage = {}", errorMessage);
    }
}

Spring Cloud Stream 异常处理插图1

代码参考:https://gitee.com/cckevincyh/cloud-stream-demo/tree/demo14/

死信队列

配置如下:

spring:
 rabbitmq:
    host: 192.168.99.100
    port: 5672
    username: guest
    password: guest
 cloud:
   stream:
     bindings:
       input:
         destination: greetings
         group: stream-exception-handler
         consumer:
           max-attempts: 4
     rabbit:
       bindings:
         input:
           consumer:
             auto-bind-dlq: true #开启DLQ(死信队列)

启动控制台如下:

Spring Cloud Stream 异常处理插图2

当出现异常之后就会把message丢入到死信队列

Spring Cloud Stream 异常处理插图3

然后我们可以利用控制台的Move messages的功能把死信队列的message丢回到处理的队列中去。

Spring Cloud Stream 异常处理插图4

代码参考:https://gitee.com/cckevincyh/cloud-stream-demo/tree/demo13/

参考

Spring Cloud Stream 进阶配置——高可用(一)——失败重试

Spring Cloud Stream消费失败后的处理策略(一):自动重试

Spring Cloud Stream消费失败后的处理策略(二):自定义错误处理逻辑

Spring Cloud Stream 进阶配置——高可用(二)——死信队列

Spring Cloud Stream 重点与总结

Spring Cloud Stream exception handling

Spring Cloud Stream消费失败后的处理策略(三):使用DLQ队列(RabbitMQ)

Spring Cloud Stream消费失败后的处理策略(四):重新入队(RabbitMQ)

时间不一定能证明很多东西,但是一定能看透很多东西。坚信自己的选择,不动摇,使劲跑,明天会更好。

赞(0) 打赏
未经允许不得转载:IDEA激活码 » Spring Cloud Stream 异常处理

一个分享Java & Python知识的社区