更新時間:2022-04-14 09:25:46 來源:動力節(jié)點 瀏覽1416次
簡單模式
很簡單:生產者、隊列和消費者。生產者向隊列發(fā)送消息,消費者監(jiān)聽隊列并消費消息
工作模式
工作:一個生產者,一個隊列和多個消費者。生產者向隊列發(fā)送消息,多個消費者監(jiān)聽同一個隊列的消費消息
發(fā)布/訂閱模式
發(fā)布/訂閱:發(fā)布/訂閱模式包括一個生產者、一個交換機、多個隊列和多個消費者。交換機(Exchange)直接綁定到隊列。生產者通過交換機(Exchange)將消息存儲在綁定到交換機的隊列中,消費者監(jiān)聽隊列并消費
路由模式
路由:路由模式可以根據(jù)路由鍵將消息發(fā)送到指定隊列。Exchange 和隊列通過路由鍵綁定。生產者通過 Exchange 和路由鍵將消息準確地發(fā)送到隊列。消費者監(jiān)聽隊列并消費消息
主題模式
主題:主題模式支持在路由模式的基礎上進行通配符操作。交換機會根據(jù)通配符將消息存入匹配隊列,消費者監(jiān)聽隊列并消費
標頭模式
Header:header模式取消了路由key,而是使用header中的key/value對來匹配。匹配成功后,會通過交換機將消息發(fā)送到隊列中,由消息制造者獲取和消費
RPC 模式
RPC:RPC方式主要用于獲取消費者的處理結果。通常,生產者將消息發(fā)送給消費者。消費者收到消息并消費后,將處理結果返回給生產者
首先,搭建SpringBoot項目,在POM XML文件中添加如下依賴
<依賴>
<groupid>org.springframework.boot</groupid>
<artifactid>spring-boot-starter-amqp</artifactid>
</依賴>
<依賴>
<groupid>org.springframework.boot</groupid>
<artifactid>spring-boot-starter-web</artifactid>
</依賴>
修改配置文件,添加如下RabbitMQ配置
服務器:
port: 8888 # 設置端口號
Spring:
rabbitMQ:
host: 127.0.0.1 # 設置 RabbitMQ 的主機
port: 5672 # 設置 RabbitMQ 服務端口
username: guest # 設置 RabbitMQ 用戶名
password: guest # 設置 RabbitMQ 密碼
新的公共常量類
public interface RabbitConstant {
/**
* 簡單模式
*/
String SIMPLE_QUEUE_NAME = "simple_queue";
/**
* 工作模式
*/
String WORK_QUEUE_NAME = "work_queue";
/**
* 發(fā)布/訂閱模式
*/
String PUBLISH_SUBSCRIBE_EXCHANGE_NAME = "publish_subscribe_exchange";
字符串 PUBLISH_SUBSCRIBE_FIRST_QUEUE_NAME = "publish_subscribe_first_queue";
字符串 PUBLISH_SUBSCRIBE_SECOND_QUEUE_NAME = "publish_subscribe_second_queue";
/**
* 路由模式
*/
String ROUTING_EXCHANGE_NAME = "routing_exchange";
字符串 ROUTING_FIRST_QUEUE_NAME = "routing_first_queue";
字符串 ROUTING_SECOND_QUEUE_NAME = "routing_second_queue";
字符串 ROUTING_THIRD_QUEUE_NAME = "routing_third_queue";
字符串 ROUTING_FIRST_QUEUE_ROUTING_KEY_NAME = "routing_first_queue_routing_key";
字符串 ROUTING_SECOND_QUEUE_ROUTING_KEY_NAME = "routing_second_queue_routing_key";
字符串 ROUTING_THIRD_QUEUE_ROUTING_KEY_NAME = "routing_third_queue_routing_key";
/**
* 主題模式
*/
String TOPICS_EXCHANGE_NAME = "topics_exchange";
字符串 TOPICS_FIRST_QUEUE_NAME = "topics_first_queue";
字符串 TOPICS_SECOND_QUEUE_NAME = "
字符串 TOPICS_THIRD_QUEUE_NAME = "topics_third_queue";
String TOPICS_FIRST_QUEUE_ROUTING_KEY = "topics.first.routing.key";
String TOPICS_SECOND_QUEUE_ROUTING_KEY = "topics.second.routing.key";
String TOPICS_THIRD_QUEUE_ROUTING_KEY = "topics.third.routing.key";
字符串 TOPICS_ROUTING_KEY_FIRST_WILDCARD = "#.first.#";
字符串 TOPICS_ROUTING_KEY_SECOND_WILDCARD = "*.second.#";
字符串 TOPICS_ROUTING_KEY_THRID_WILDCARD = "*.third.*";
/**
* 標題模式
*/
String HEADER_EXCHANGE_NAME = "header_exchange";
字符串 HEADER_FIRST_QUEUE_NAME = "header_first_queue";
字符串 HEADER_SECOND_QUEUE_NAME = "header_second_queue";
/**
* rpc 模式
*/
String RPC_QUEUE_NAME = "rpc_queue";
}
添加一個Controller請求類(用于驗證結果,最后可以添加)
導入 com.example.rabbitmq.constant.RabbitConstant;
導入 org.springframework.amqp.core.Message;
導入 org.springframework.amqp.core.MessageProperties;
導入 org.springframework.amqp.rabbit.core.RabbitTemplate;
導入 org.springframework.beans.factory.annotation.Autowired;
導入 org.springframework.web.bind.annotation.GetMapping;
導入 org.springframework.web.bind.annotation.RestController;
導入 java.nio.charset.StandardCharsets;
@RestController
public class RabbitController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping(value = "/simple")
public void simple() {
rabbitTemplate.convertAndSend(RabbitConstant.SIMPLE_QUEUE_NAME, "你好世界!");
}
@GetMapping(value = "/work")
public void work() {
rabbitTemplate.convertAndSend(RabbitConstant.WORK_QUEUE_NAME, "work hello!");
}
@GetMapping(value = "/pubsub")
public void pubsub() {
rabbitTemplate.convertAndSend(RabbitConstant.PUBLISH_SUBSCRIBE_EXCHANGE_NAME, null, "發(fā)布/訂閱你好");
}
@GetMapping(value = "/routing")
public void routing() {
// 向第一個隊列發(fā)送消息
rabbitTemplate.convertAndSend(RabbitConstant.ROUTING_EXCHANGE_NAME, RabbitConstant.ROUTING_FIRST_QUEUE_ROUTING_KEY_NAME, "路由你好");
}
@GetMapping(value = "/topics")
public void topics() {
// 向第一個隊列發(fā)送消息。這時候隊列可以接收到消息,因為隊列的通配符是#first.#,而routing_key是topics first。路由。鍵,匹配成功
rabbitTemplate.convertAndSend(RabbitConstant.TOPICS_EXCHANGE_NAME, RabbitConstant.TOPICS_FIRST_QUEUE_ROUTING_KEY, "topics hello");
// 向第二個隊列發(fā)送消息。這時候隊列也能收到消息了,因為隊列的通配符是*秒#,而routing_key是topic秒。路由。鍵,匹配成功
rabbitTemplate.convertAndSend(RabbitConstant.TOPICS_EXCHANGE_NAME, RabbitConstant.TOPICS_SECOND_QUEUE_ROUTING_KEY, "topics hello");
// 向第三個隊列發(fā)送消息。此時隊列無法接受消息,因為隊列通配符是*第三個*,而routing_key是topics第三個。路由。鍵,匹配失敗
rabbitTemplate.convertAndSend(RabbitConstant.TOPICS_EXCHANGE_NAME, RabbitConstant.TOPICS_THIRD_QUEUE_ROUTING_KEY, "topics hello");
}
@GetMapping(value = "/header")
public void header() {
// 這個消息應該被兩個隊列接收。第一個隊列全部匹配成功,第二個隊列 Hello 值任意匹配成功
MessageProperties messageProperties = new MessageProperties();
messageProperties.setHeader("matchAll", "YES");
messageProperties.setHeader("你好", "world");
Message message = new Message("header first hello".getBytes(StandardCharsets.UTF_8), messageProperties);
rabbitTemplate.convertAndSend(RabbitConstant.HEADER_EXCHANGE_NAME, null, message);
// 這個消息應該只被第二個隊列接受。第一個隊列全部匹配失敗,
MessageProperties messagePropertiesSecond = new MessageProperties();
messagePropertiesSecond.setHeader("matchAll", "NO");
Message messageSecond = new Message("header second hello".getBytes(StandardCharsets.UTF_8), messagePropertiesSecond);
rabbitTemplate.convertAndSend(RabbitConstant.HEADER_EXCHANGE_NAME, null, messageSecond);
}
@GetMapping(value = "/rpc")
public void rpc() {
Object responseMsg = rabbitTemplate.convertSendAndReceive(RabbitConstant.RPC_QUEUE_NAME, "rpc hello!");
System.out.println("rabbit rpc 響應消息:" + responseMsg);
}
}
生產者聲明隊列并向隊列發(fā)送消息
導入 com.example.rabbitmq.constant.RabbitConstant;
導入 org.springframework.amqp.core.Queue;
導入 org.springframework.context.annotation.Bean;
導入 org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitSimpleProvider {
@Bean
public Queue simpleQueue() {
return new Queue(RabbitConstant.SIMPLE_QUEUE_NAME);
}
}
消費者監(jiān)聽隊列并消費消息
導入 com.example.rabbitmq.constant.RabbitConstant;
導入 org.springframework.amqp.rabbit.annotation.RabbitHandler;
導入 org.springframework.amqp.rabbit.annotation.RabbitListener;
導入 org.springframework.stereotype.Component;
@Component
public class RabbitSimpleConsumer {
@RabbitHandler
@RabbitListener(queues = RabbitConstant.SIMPLE_QUEUE_NAME)
public void simpleListener(String context) {
System.out.println("rabbit receiver: " + context);
}
}
單元測試
@Test
public void simple() {
rabbitTemplate.convertAndSend(RabbitConstant.SIMPLE_QUEUE_NAME, "hello world!");
}
響應結果
生產者聲明隊列并向隊列生成消息
導入 com.example.rabbitmq.constant.RabbitConstant;
導入 org.springframework.amqp.core.Queue;
導入 org.springframework.context.annotation.Bean;
導入 org.springframework.context.annotation.Configuration;
@Configuration
公共類 RabbitWorkProvider {
@Bean
公共隊列 workQueue() {
return new Queue(RabbitConstant.WORK_QUEUE_NAME);
}
}
消費者監(jiān)聽隊列并消費消息(有兩個消費者監(jiān)聽同一個隊列)
導入 com.example.rabbitmq.constant.RabbitConstant;
導入 org.springframework.amqp.rabbit.annotation.RabbitHandler;
導入 org.springframework.amqp.rabbit.annotation.RabbitListener;
導入 org.springframework.stereotype.Component;
@Component
public class RabbitWorkConsumer {
@RabbitListener(queues = RabbitConstant.WORK_QUEUE_NAME)
@RabbitHandler
public void workQueueListenerFirst(String context) {
System.out.println("rabbit workQueue listener first receiver:" + context);
}
@RabbitListener(queues = RabbitConstant.WORK_QUEUE_NAME)
@RabbitHandler
public void workQueueListenerSecond(String context) {
System.out.println("rabbit workQueue listener 第二個接收者:" + context);
}
}
單元測試
@Test
public void work() {
rabbitTemplate.convertAndSend(RabbitConstant.WORK_QUEUE_NAME, "工作你好!");
}
響應結果(由于有兩個消費者監(jiān)聽同一個隊列,所以消息只能被其中一個消費者消費。默認情況下,消息是負載均衡發(fā)送給所有消費者的)
生產者聲明兩個隊列和一個扇出交換機,并將兩個隊列綁定到交換機
開關有四種類型:fanout、direct、topic和header(文末有介紹)
導入 com.example.rabbitmq.constant.RabbitConstant;
導入 org.springframework.amqp.core.*;
導入 org.springframework.context.annotation.Bean;
導入 org.springframework.context.annotation.Configuration;
@Configuration
公共類 RabbitPublishSubscribeProvider {
@Bean
公共隊列 pubsubQueueFirst() {
return new Queue(RabbitConstant.PUBLISH_SUBSCRIBE_FIRST_QUEUE_NAME);
}
@Bean
公共隊列 pubsubQueueSecond() {
return new Queue(RabbitConstant.PUBLISH_SUBSCRIBE_SECOND_QUEUE_NAME);
}
@Bean
公共 FanoutExchange fanoutExchange() {
// 創(chuàng)建扇出類型開關,表示交換機會向所有綁定隊列發(fā)送消息
return new FanoutExchange(RabbitConstant.PUBLISH_SUBSCRIBE_EXCHANGE_NAME);
}
@Bean
public Binding pubsubQueueFirstBindFanoutExchange() {
// 隊列綁定開關
return BindingBuilder.bind(pubsubQueueFirst()).to(fanoutExchange());
}
@Bean
public Binding pubsubQueueSecondBindFanoutExchange() {
// 隊列二綁定開關
return BindingBuilder.bind(pubsubQueueSecond()).to(fanoutExchange());
}
}
消費者監(jiān)聽隊列并消費
導入 com.example.rabbitmq.constant.RabbitConstant;
導入 org.springframework.amqp.rabbit.annotation.RabbitHandler;
導入 org.springframework.amqp.rabbit.annotation.RabbitListener;
導入 org.springframework.stereotype.Component;
@Component
public class RabbitPublishSubscribeConsumer {
@RabbitListener(queues = RabbitConstant.PUBLISH_SUBSCRIBE_FIRST_QUEUE_NAME)
@RabbitHandler
public void pubsubQueueFirst(String context) {
System.out.println("rabbit pubsub queue first receiver:" + context);
}
@RabbitListener(queues = RabbitConstant.PUBLISH_SUBSCRIBE_SECOND_QUEUE_NAME)
@RabbitHandler
public void pubsubQueueSecond(String context) {
System.out.println("rabbit pubsub 隊列第二個接收者:" + context);
}
}
單元測試
@Test
public void pubsub() {
rabbitTemplate.convertAndSend(RabbitConstant.PUBLISH_SUBSCRIBE_EXCHANGE_NAME, null, "發(fā)布/訂閱你好");
}
響應結果
生產者聲明三個隊列和一個直接交換機,將三個隊列綁定到交換機,并設置交換機和隊列之間的路由
import com.example.rabbitmq.constant.RabbitConstant;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitRoutingProvider {
@Bean
public Queue rabbitRoutingFirstQueue() {
return new Queue(RabbitConstant.ROUTING_FIRST_QUEUE_NAME);
}
@Bean
public Queue rabbitRoutingSecondQueue() {
return new Queue(RabbitConstant.ROUTING_SECOND_QUEUE_NAME);
}
@Bean
public Queue rabbitRoutingThirdQueue() {
return new Queue(RabbitConstant.ROUTING_THIRD_QUEUE_NAME);
}
@Bean
public DirectExchange directExchange() {
// Create a direct switch, indicating that the exchange opportunity sends messages to routing_ Queue with identical key
return new DirectExchange(RabbitConstant.ROUTING_EXCHANGE_NAME);
}
@Bean
public Binding routingFirstQueueBindDirectExchange() {
// Bind the direct switch to the queue and set the routing_key is routing_first_queue_routing_key
return BindingBuilder.bind(rabbitRoutingFirstQueue()).to(directExchange()).with(RabbitConstant.ROUTING_FIRST_QUEUE_ROUTING_KEY_NAME);
}
@Bean
public Binding routingSecondQueueBindDirectExchange() {
// Queue 2 binds the direct switch and sets routing_key is routing_second_queue_routing_key
return BindingBuilder.bind(rabbitRoutingSecondQueue()).to(directExchange()).with(RabbitConstant.ROUTING_SECOND_QUEUE_ROUTING_KEY_NAME);
}
@Bean
public Binding routingThirdQueueBindDirectExchange() {
// The queue 3 is bound to the direct switch and the routing is set_ Key is routing_third_queue_routing_key
return BindingBuilder.bind(rabbitRoutingThirdQueue()).to(directExchange()).with(RabbitConstant.ROUTING_THIRD_QUEUE_ROUTING_KEY_NAME);
}
}
消費者監(jiān)聽隊列并消費
導入 com.example.rabbitmq.constant.RabbitConstant;
導入 org.springframework.amqp.rabbit.annotation.RabbitHandler;
導入 org.springframework.amqp.rabbit.annotation.RabbitListener;
導入 org.springframework.stereotype.Component;
@Component
public class RabbitRoutingConsumer {
@RabbitListener(queues = RabbitConstant.ROUTING_FIRST_QUEUE_NAME)
@RabbitHandler
public void routingFirstQueueListener(String context) {
System.out.println("rabbit routing queue first receiver:" + context);
}
@RabbitListener(queues = RabbitConstant.ROUTING_SECOND_QUEUE_NAME)
@RabbitHandler
public void routingSecondQueueListener(String context) {
System.out.println("rabbit pubsub 隊列第二個接收者:" + context);
}
@RabbitListener(queues = RabbitConstant.ROUTING_THIRD_QUEUE_NAME)
@RabbitHandler
public void routingThirdQueueListener(String context) {
System.out.println("rabbit pubsub 隊列第三個接收者:" + context);
}
}
單元測試
@Test
public void routing() {
// 向第一個隊列發(fā)送消息
rabbitTemplate.convertAndSend(RabbitConstant.ROUTING_EXCHANGE_NAME, RabbitConstant.ROUTING_FIRST_QUEUE_ROUTING_KEY_NAME, "routing hello");
}
響應結果
生產者聲明了三個隊列和一個主題切換。隊列分別與主題交換機綁定,并設置了路由鍵統(tǒng)一字符。如果路由鍵滿足交換機和隊列之間的通配符要求,則將消息存儲在隊列中
#通配符可以匹配一個或多個單詞,*通配符可以匹配一個單詞;如果Exchange和隊列之間的routing key通配符是#hello.#,則表示中間所有帶hello的routing key都滿足條件,消息會被存入隊列
導入 com.example.rabbitmq.constant.RabbitConstant;
導入 org.springframework.amqp.core.Binding;
導入 org.springframework.amqp.core.BindingBuilder;
導入 org.springframework.amqp.core.Queue;
導入 org.springframework.amqp.core.TopicExchange;
導入 org.springframework.context.annotation.Bean;
導入 org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitTopicProvider {
@Bean
public Queue topicFirstQueue() {
return new Queue(RabbitConstant.TOPICS_FIRST_QUEUE_NAME);
}
@Bean
公共隊列 topicSecondQueue() {
return new Queue(RabbitConstant.TOPICS_SECOND_QUEUE_NAME);
}
@Bean
公共隊列 topicThirdQueue() {
return new Queue(RabbitConstant.TOPICS_THIRD_QUEUE_NAME);
}
@Bean
public TopicExchange topicExchange() {
// 創(chuàng)建一個主題類型切換,表示交換機會發(fā)送消息到 routing_key 通配符匹配隊列成功
return new TopicExchange(RabbitConstant.TOPICS_EXCHANGE_NAME);
}
@Bean
public Binding topicFirstQueueBindExchange() {
// 綁定topic類型切換到隊列1并設置routing_key通配符#first.#
return BindingBuilder.bind(topicFirstQueue()).to(topicExchange()).with(RabbitConstant.TOPICS_ROUTING_KEY_FIRST_WILDCARD);
}
@Bean
public Binding topicSecondQueueBindExchange() {
//第二個隊列綁定主題類型切換,設置路由_key通配符為* second.#
return BindingBuilder.bind(topicSecondQueue()).to(topicExchange()) .with(RabbitConstant.TOPICS_ROUTING_KEY_SECOND_WILDCARD);
}
@Bean
public Binding topicThirdQueueBindExchange() {
// 三個隊列綁定主題切換,設置routing_key通配符為*third.*
return BindingBuilder.bind(topicThirdQueue()).to(topicExchange()).with(RabbitConstant.TOPICS_ROUTING_KEY_THRID_WILDCARD);
}
}
消費者監(jiān)聽隊列并消費
導入 com.example.rabbitmq.constant.RabbitConstant;
導入 org.springframework.amqp.rabbit.annotation.RabbitHandler;
導入 org.springframework.amqp.rabbit.annotation.RabbitListener;
導入 org.springframework.stereotype.Component;
@Component
public class RabbitTopicsConsumer {
@RabbitListener(queues = RabbitConstant.TOPICS_FIRST_QUEUE_NAME)
@RabbitHandler
public void topicFirstQueue(String context) {
System.out.println("rabbit topics queue first receiver:" + context);
}
@RabbitListener(queues = RabbitConstant.TOPICS_SECOND_QUEUE_NAME)
@RabbitHandler
public void topicSecondQueue(String context) {
System.out.println("兔子主題隊列第二個接收者:" + context);
}
@RabbitListener(queues = RabbitConstant.TOPICS_THIRD_QUEUE_NAME)
@RabbitHandler
public void topicThirdQueue(String context) {
System.out.println("rabbit 主題隊列第三個接收者:" + context);
}
}
單元測試
@Test
public void topics() {
// 向第一個隊列發(fā)送消息。這時候隊列可以接收到消息,因為隊列的通配符是#first.#,而routing_key是topics first。路由。鍵,匹配成功
rabbitTemplate.convertAndSend(RabbitConstant.TOPICS_EXCHANGE_NAME, RabbitConstant.TOPICS_FIRST_QUEUE_ROUTING_KEY, "topics hello");
// 向第二個隊列發(fā)送消息。這時候隊列也能收到消息了,因為隊列的通配符是*秒#,而routing_key是topic秒。路由。鍵,匹配成功
rabbitTemplate.convertAndSend(RabbitConstant.TOPICS_EXCHANGE_NAME, RabbitConstant.TOPICS_SECOND_QUEUE_ROUTING_KEY, "topics hello");
// 向第三個隊列發(fā)送消息。此時隊列無法接受消息,因為隊列通配符是*第三個*,而routing_key是topics第三個。路由。鍵,匹配失敗
rabbitTemplate.convertAndSend(RabbitConstant.TOPICS_EXCHANGE_NAME, RabbitConstant.TOPICS_THIRD_QUEUE_ROUTING_KEY, "topics hello");
}
響應結果
以上就是關于“Rabbitmq的幾種模式介紹”,如果大家想了解更多相關知識,可以關注一下動力節(jié)點的RabbitMQ教程,里面的課程內容細致全面,通俗易懂,適合小白學習,希望對大家能夠有所幫助。
0基礎 0學費 15天面授
有基礎 直達就業(yè)
業(yè)余時間 高薪轉行
工作1~3年,加薪神器
工作3~5年,晉升架構
提交申請后,顧問老師會電話與您溝通安排學習