更新時間:2021-07-28 16:52:23 來源:動力節點 瀏覽1201次
消息中間件顧名思義實現的就是在兩個系統或兩個客戶端之間進行消息傳送
ActiveMQ是一種開源的基于JMS(Java Message Servie)規范的一種消息中間件的實現,ActiveMQ的設計目標是提供標準的,面向消息的,能夠跨越多語言和多系統的應用集成消息通信中間件。
ActiveMQ常被應用與系統業務的解耦,異步消息的推送,增加系統并發量,提高用戶體驗。例如以我在工作中的使用,在比較耗時且異步的遠程開鎖操作時
(1)AcitveMQ的數據傳送流程
(2)ActiveMQ的兩種消息傳遞類型
(3)點對點傳輸,即一個生產者對應一個消費者,生產者向broke推送數據,數據存儲在broke的一個隊列中,當消費者接受該條隊列里的數據。
(4)基于發布/訂閱模式的傳輸,即根據訂閱話題來接收相應數據,一個生產者可向多個消費者推送數據,與MQTT協議的實現是類似的,對MQTT協議有興趣的可跳轉到https://www.cnblogs.com/xiguadadage/p/11216463.html
兩種消息傳遞類型的不同,點對點傳輸消費者可以接收到在連接之前生產者所推送的數據,而基于發布/訂閱模式的傳輸方式消費者只能接收到連接之后生產者推送的數據。
(1)官網下載對應服務器版本
(2)解壓后進入apache-activemq-5.15.9/bin目錄
(3)執行./activemq start啟動ActiveMQ
(4)瀏覽器輸入ActiveMQ啟動的服務器ip:8161便可進入web界面,點擊Manage ActiveMQ broker可以查看消息推送的狀態,默認賬號密碼為admin,admin
(5)啟動錯誤分析
進入/root/apache-activemq-5.15.9/data目錄查看activemq.log文件,根據錯誤提示信息修改,例如端口號被占用等。
(1)構建maven項目,引入依賴
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.9.0</version>
</dependency>
(2)生產者類
/**
* @Description 生產者
* @Date 2019/7/20
* @Created by yqh
*/
public class MyProducer {
private static final String ACTIVEMQ_URL = "tcp://192.168.168.242:61616";
public static void main(String[] args) throws JMSException {
// 創建連接工廠
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
// 創建連接
Connection connection = activeMQConnectionFactory.createConnection();
// 打開連接
connection.start();
// 創建會話
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 創建隊列目標,并標識隊列名稱,消費者根據隊列名稱接收數據
Destination destination = session.createQueue("myQueue");
// 創建一個生產者
MessageProducer producer = session.createProducer(destination);
// 向隊列推送10個文本消息數據
for (int i = 1 ; i <= 10 ; i++){
// 創建文本消息
TextMessage message = session.createTextMessage("第" + i + "個文本消息");
//發送消息
producer.send(message);
//在本地打印消息
System.out.println("已發送的消息:" + message.getText());
}
//關閉連接
connection.close();
}
}
運行結果:
已發送的消息:第1個文本消息
已發送的消息:第2個文本消息
已發送的消息:第3個文本消息
已發送的消息:第4個文本消息
已發送的消息:第5個文本消息
已發送的消息:第6個文本消息
已發送的消息:第7個文本消息
已發送的消息:第8個文本消息
已發送的消息:第9個文本消息
已發送的消息:第10個文本消息
測試查看web后臺顯示,有10條消息在隊列中等待消費
(3)消費者類
/**
* @Description 消費者類
* @Date 2019/7/20 0020
* @Created by yqh
*/
public class MyConsumer {
private static final String ACTIVEMQ_URL = "tcp://192.168.168.242:61616";
public static void main(String[] args) throws JMSException {
// 創建連接工廠
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
// 創建連接
Connection connection = activeMQConnectionFactory.createConnection();
// 打開連接
connection.start();
// 創建會話
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 創建隊列目標,并標識隊列名稱,消費者根據隊列名稱接收數據
Destination destination = session.createQueue("myQueue");
// 創建消費者
MessageConsumer consumer = session.createConsumer(destination);
// 創建消費的監聽
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("消費的消息:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
}
}
測試結果:
消費的消息:第1個文本消息
消費的消息:第2個文本消息
消費的消息:第3個文本消息
消費的消息:第4個文本消息
消費的消息:第5個文本消息
消費的消息:第6個文本消息
消費的消息:第7個文本消息
消費的消息:第8個文本消息
消費的消息:第9個文本消息
消費的消息:第10個文本消息
web后臺顯示有一個消費者處于連接狀態,且已消費了10個message,而該條隊列已沒有message待消費了
(4)當我們運行兩個消費者類,消息又是怎么被消費的呢?是兩個消費者都能收到生產者生產的message,還是只有其中一個消費者能消費呢?
我們先運行兩個消費者,在運行一個生產者對目標隊列生產10個message,會發現有以下情況
// Consumer1控制臺
消費的消息:第1個文本消息
消費的消息:第3個文本消息
消費的消息:第5個文本消息
消費的消息:第7個文本消息
消費的消息:第9個文本消息
// Consumer2控制臺
消費的消息:第2個文本消息
消費的消息:第4個文本消息
消費的消息:第6個文本消息
消費的消息:第8個文本消息
消費的消息:第10個文本消息
即隊列中的數據會平均的分給每一個消費者消費,且每一條數據只能被消費一次
(5)以上是基于隊列點對點的傳輸類型,以下是基于發布/訂閱模式傳輸的類型測試
/**
* @Description 基于發布/訂閱模式傳輸類型的生產者測試
* @Date 2019/7/20 0020
* @Created by yqh
*/
public class MyProducerForTopic {
private static final String ACTIVEMQ_URL = "tcp://192.168.168.242:61616";
public static void main(String[] args) throws JMSException {
// 創建連接工廠
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
// 創建連接
Connection connection = activeMQConnectionFactory.createConnection();
// 打開連接
connection.start();
// 創建會話
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 創建隊列目標,并標識隊列名稱,消費者根據隊列名稱接收數據
Destination destination = session.createTopic("topicTest");
// 創建一個生產者
MessageProducer producer = session.createProducer(destination);
// 向隊列推送10個文本消息數據
for (int i = 1 ; i <= 10 ; i++){
// 創建文本消息
TextMessage message = session.createTextMessage("第" + i + "個文本消息");
//發送消息
producer.send(message);
//在本地打印消息
System.out.println("已發送的消息:" + message.getText());
}
//關閉連接
connection.close();
}
}
/**
* @Description 基于發布/訂閱模式傳輸類型的消費者測試
* @Date 2019/7/20 0020
* @Created by yqh
*/
public class MyConsumerForTopic {
private static final String ACTIVEMQ_URL = "tcp://192.168.168.242:61616";
public static void main(String[] args) throws JMSException {
// 創建連接工廠
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
// 創建連接
Connection connection = activeMQConnectionFactory.createConnection();
// 打開連接
connection.start();
// 創建會話
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 創建隊列目標,并標識隊列名稱,消費者根據隊列名稱接收數據
Destination destination = session.createTopic("topicTest");
// 創建消費者
MessageConsumer consumer = session.createConsumer(destination);
// 創建消費的監聽
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("消費的消息:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
}
}
現在如果我們先啟動生產者,再啟動消費者,會發現消費者是無法接收到之前生產者之前所生產的數據,只有消費者先啟動,再讓生產者消費才可以正常接收數據,這也是發布/訂閱的主題模式與點對點的隊列模式的一個明顯區別。
而如果啟動兩個消費者,那么每一個消費者都能完整的接收到生產者生產的數據,即每一條數據都被消費了兩次,這是發布/訂閱的主題模式與點對點的隊列模式的另一個明顯區別。
以上就是動力節點小編介紹的"ActiveMQ的使用",希望對大家有幫助,想了解更多可查看ActiveMQ教程。動力節點在線學習教程,針對沒有任何Java基礎的讀者學習,讓你從入門到精通,主要介紹了一些Java基礎的核心知識,讓同學們更好更方便的學習和了解Java編程,感興趣的同學可以關注一下。
0基礎 0學費 15天面授
有基礎 直達就業
業余時間 高薪轉行
工作1~3年,加薪神器
工作3~5年,晉升架構
提交申請后,顧問老師會電話與您溝通安排學習