大战熟女丰满人妻av-荡女精品导航-岛国aaaa级午夜福利片-岛国av动作片在线观看-岛国av无码免费无禁网站-岛国大片激情做爰视频

專注Java教育14年 全國咨詢/投訴熱線:400-8080-105
動力節點LOGO圖
始于2009,口口相傳的Java黃埔軍校
首頁 hot資訊 使用RabbitMQ配置死信隊列

使用RabbitMQ配置死信隊列

更新時間:2022-05-10 10:18:20 來源:動力節點 瀏覽3292次

當發生以下情況之一時,來自消息隊列的可能是“死信”:

消息被拒絕并且重新排隊設置為 false

消息的 TTL 過期

超出隊列長度限制

為了通過示例進行演示,我選擇了第一種情況,即消息被拒絕。生產者將PaymentOrders作為消息發送,這些消息將由消費者處理。當PaymentOrder付款人賬戶資金不足時,消息將被拒絕。

生產者

生產者是一個 Spring Boot 應用程序,它使用Spring AMQP庫向PaymentOrderRabbitMQ 發送消息。

生產者的 API

生產者 API 的第一部分是定義交換器的名稱、路由密鑰、傳入和死信隊列。

public class Constants {
    public static final String EXCHANGE_NAME = "payment-orders.exchange";
    public static final String ROUTING_KEY_NAME = "payment-orders";
    public static final String INCOMING_QUEUE_NAME = "payment-orders.incoming.queue";
    public static final String DEAD_LETTER_QUEUE_NAME = "payment-orders.dead-letter.queue";
}

第二部分是定義消息格式。我們在此示例中使用 JSON。以下 JSON 文檔顯示了我們如何建模PaymentOrder

{
  "from":"SA54 22PS JCLV 7LWT 7LHY EBLO",
  "to":"IT23 K545 5414 339G WLPI 2YF6 VBP",
  "amount":54.75
}

請注意,最好不要使用自定義序列化格式(如有效負載的 Java 序列化),因為這意味著您需要有一個基于 Java 的使用者。好的做法是將有效負載格式化為 JSON。每個平臺和/或語言都可以解析 JSON。

生產者配置

我們需要配置 AMQP 基礎設施。死信隊列配置封裝在傳入隊列聲明中。

有一個死信交換direct(DLX) 的概念,它是類型topic或的正常交換fanout。如果在處理從隊列中獲取的消息期間發生故障,RabbitMQ 會檢查是否為該隊列配置了死信交換。如果通過x-dead-letter-exchange參數配置了一個,那么它將使用原始路由密鑰將失敗的消息路由到它。可以通過x-dead-letter-routing-key參數覆蓋此路由鍵。

在此示例中,我們使用default exchange(no-name) 作為 the dead letter exchange,并使用死信隊列名稱作為新的路由鍵。這將起作用,因為任何隊列都綁定到默認交換,綁定鍵等于隊列名稱。

@Configuration
public class AmqpConfig {
    @Bean
    DirectExchange exchange() {
        return new DirectExchange(Constants.EXCHANGE_NAME);
    }
    @Bean
    Queue incomingQueue() {
        return QueueBuilder.durable(Constants.INCOMING_QUEUE_NAME)
                .withArgument("x-dead-letter-exchange", "")
                .withArgument("x-dead-letter-routing-key", Constants.DEAD_LETTER_QUEUE_NAME)
                .build();
    }
    @Bean
    Binding binding() {
        return BindingBuilder.bind(incomingQueue()).to(exchange()).with(Constants.ROUTING_KEY_NAME);
    }
    @Bean
    Queue deadLetterQueue() {
        return QueueBuilder.durable(Constants.DEAD_LETTER_QUEUE_NAME).build();
    }
    @Bean
    public Jackson2JsonMessageConverter jackson2JsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }
}

用于隊列和交換的構建器 API 非常方便,并且從 Spring AMQP 庫的 1.6 版本開始可用。

在 RabbitMQ 管理控制臺中,DLX和DLK標簽指示在傳入隊列上設置了dead letter exchange和dead letter routing key參數。

生產者邏輯

生產者每 5 秒生成一次隨機PaymentOrder消息,這些消息被發送到 RabbitMQ 進行進一步處理。SpringAmqpTemplate是自動配置的,它可以連接到我們的組件中。由于消息格式是 JSON ,Jackson2JsonMessageConverter因此定義了它將自動關聯到 auto-configured AmqpTemplate。

@Component
public class Producer {
    private AmqpTemplate amqpTemplate;
    public Producer(AmqpTemplate amqpTemplate) {
        this.amqpTemplate = amqpTemplate;
    }
    @Scheduled(fixedDelay = 1000L)
    public void send() {
        PaymentOrder paymentOrder = new PaymentOrder(
                Iban.random().toFormattedString(),
                Iban.random().toFormattedString(),
                new BigDecimal(1D + new Random().nextDouble() * 100D).setScale(2, BigDecimal.ROUND_FLOOR));
        amqpTemplate.convertAndSend(Constants.EXCHANGE_NAME, Constants.ROUTING_KEY_NAME, paymentOrder);
    }
}

消費者

對于這個簡單的示例,消費者也是一個 Spring Boot 應用程序,但在實際應用程序中,消費者和生產者不必在同一平臺/語言上。

消費者 API

消費者 API 的第一部分是指定它連接到哪個隊列。

public class Constants {
    public static final String DEAD_LETTER_QUEUE_NAME = "payment-orders.dead-letter.queue";
    public static final String INCOMING_QUEUE_NAME = "payment-orders.incoming.queue";
}

第二部分是適應生產者定義的消息格式。請注意,在這種情況下,兩個應用程序都是基于 Java 的,因此我可以創建一個包含PaymentOrder類文件的 jar 文件并與消費者和生產者共享它。然而,這是不好的做法,因為它引入了基于共享庫的緊密耦合。更好的方法是使用一些代碼重復(PaymentOrder在這種情況下為類)并通過同意消息格式來使用更松散的耦合方法。

public class PaymentOrder {
    String from;
    String to;
    BigDecimal amount;
    @JsonCreator
    public PaymentOrder(@JsonProperty("from") String from,
                        @JsonProperty("to") String to,
                        @JsonProperty("amount") BigDecimal amount) {
        this.from = from;
        this.to = to;
        this.amount = amount;
    }
    // getters and toString()
}

消費者配置

消費者只關心從中獲取消息的隊列。傳入隊列必須存在,否則消費者將無法啟動。請注意,dead letter queue消費者啟動時不必存在 ,但在消息需要“死信”時它應該存在。如果它丟失,則消息將被靜默丟棄。

@Configuration
public class AmqpConfig {
    @Bean
    Queue incomingQueue() {
        return QueueBuilder.durable(Constants.INCOMING_QUEUE_NAME)
                .withArgument("x-dead-letter-exchange", "")
                .withArgument("x-dead-letter-routing-key", Constants.DEAD_LETTER_QUEUE_NAME)
                .build();
    }
    @Bean
    public Jackson2JsonMessageConverter jackson2JsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }
}

默認情況下啟用重新排隊。為了“死信”消息,您需要將以下屬性設置為 false。

spring:
  rabbitmq:
    listener:
      default-requeue-rejected: false

但是,如果您想在某些錯誤情況下啟用重新排隊,最好保持啟用重新排隊并利用AmqpRejectAndDontRequeueException將發送basic.reject帶有 requeue=false 的選項。

消費邏輯

每當傳入隊列上有消息可用時,將使用反序列化的實例process調用該方法。在這里,我們通過拋出一個擴展異常PaymentOrder來模擬消息拒絕。InsufficientFundsExceptionAmqpRejectAndDontRequeueException

@Component
public class Consumer {
    @RabbitListener(queues = Constants.INCOMING_QUEUE_NAME)
    public void process(@Payload PaymentOrder paymentOrder) throws InsufficientFundsException {
        if (new Random().nextBoolean()) {
            throw new InsufficientFundsException("insufficient funds on account " + paymentOrder.getFrom());
        }
    }
}

下圖顯示了一條消息的示例,該PaymentOrder消息被拒絕并最終進入dead letter queue

有時它有助于自動重試失敗的操作,以防它可能在后續嘗試中成功。RetryTemplateSpring AMQP 庫在Spring Retry項目(從 Spring Batch 中提取)的幫助下提供了對此的支持。Spring Boot 使配置變得非常容易,RetryTemplate如下面的示例所示。

spring:
  rabbitmq:
    listener:
      retry:
        enabled: true
        initial-interval: 2000
        max-attempts: 2
        multiplier: 1.5
        max-interval: 5000

使用上述配置,重試功能已啟用(默認情況下禁用),最多應有 2 次嘗試傳遞消息,第一次和第二次嘗試之間應為 2 秒,稍后與上一次重試間隔乘以 1.5 和最多 5 秒。運行您將在日志中看到的消費者

2016-09-07 21:56:53.396  INFO 11995 --- [cTaskExecutor-1] com.example.consumer.Consumer            : Processing at 'Wed Sep 07 21:56:53 CEST 2016' payload 'PaymentOrder{from='RS32 5346 0536 6006 4886 88', to='FI61 8364 3364 9834 16', amount=45.57}'
2016-09-07 21:56:55.399  INFO 11995 --- [cTaskExecutor-1] com.example.consumer.Consumer            : Processing at 'Wed Sep 07 21:56:55 CEST 2016' payload 'PaymentOrder{from='RS32 5346 0536 6006 4886 88', to='FI61 8364 3364 9834 16', amount=45.57}'
2016-09-07 21:56:55.401  WARN 11995 --- [cTaskExecutor-1] o.s.a.r.r.RejectAndDontRequeueRecoverer  : Retries exhausted for message (Body:'{"from":"RS32 5346 0536 6006 4886 88","to":"FI61 8364 3364 9834 16","amount":45.57}' MessageProperties [headers={__TypeId__=com.example.producer.api.PaymentOrder}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=application/json, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=payment-orders.exchange, receivedRoutingKey=payment-orders, receivedDelay=null, deliveryTag=31, messageCount=0, consumerTag=amq.ctag-vd18OXS9PSOeJmBQLY4o-w, consumerQueue=payment-orders.incoming.queue])

結論

如您所見,使用 RabbitMQ 配置死信隊列非常簡單。如果大家想了解更多相關知識,不妨來關注一下動力節點的RabbitMQ教程,里面有更豐富的知識等著大家去學習,希望對大家能夠有所幫助。

提交申請后,顧問老師會電話與您溝通安排學習

免費課程推薦 >>
技術文檔推薦 >>
主站蜘蛛池模板: 成人黄色一级毛片 | 在线视频 中文字幕 | 五月天亚洲| 四虎影视永久免费观看网址 | 精品免费视在线视频观看 | 110139日韩欧美 | 成人在线免费观看视频 | 欧美亚洲国产激情一区二区 | 添bbb免费观看高清视频 | 天天操丝袜| 国产视频成人 | 日本精品在线观看视频 | 中文字幕免费在线看线人动作大片 | 亚洲精品综合久久 | 久久99热精品免费观看 | 四虎永久免费影院在线 | 久久精品国产亚洲a | 四虎免费在线播放 | 午夜一级成人 | 一级肉体毛片视频免费看看 | 日本午夜免费理论片 | 伊人中文字幕在线 | 久久综合久美利坚合众国 | 亚洲日韩中文字幕在线播放 | 国产精品伦理一区二区三区 | 色综合久久天天影视网 | 精品国产一区二区三区成人 | 国产a v高清一区二区三区 | 这里只有精品久久 | 四虎影视精品 | 欧美日韩精品一区二区三区四区 | 天天碰夜夜 | 久久国产乱子伦精品免费看 | 高清不卡日本v在线二区 | 亚洲一二区视频 | 女bbbbxxxx毛片视频丶 | 美女女女女女女bbbbbb毛片 | 欧美日韩免费在线视频 | 黄色成人在线播放 | 国产成人亚洲精品91专区高清 | 点击进入不卡毛片免费观看 |