什么是集群
集群就是將相同的程序、功能,部署在兩臺或多臺服務(wù)器上,這些服務(wù)器對外提供的功能是完全一樣的。
集群是通過不斷橫向擴展增加服務(wù)器的方式,以提高服務(wù)的能力。
● 集群可以解決單點故障問題
● 集群可以提高系統(tǒng)的可用性
● 集群可以提高系統(tǒng)的服務(wù)能力
通過共享存儲目錄(kahaDB)來實現(xiàn)master和slave的主從信息同步;
所有ActiveMQ的broker都在不斷地獲取共享目錄的控制權(quán),哪個broker搶到了控制權(quán),它就成為master,它將鎖定該目錄,其他broker就只能成為slave。
當(dāng)master主出現(xiàn)故障后,剩下的slave從將再進(jìn)行爭奪共享目錄的控制權(quán),誰搶到共享目錄的控制權(quán),誰就成為主,其他沒有搶到控制權(quán)的稱為從。
由于他們是基于共享目錄,所以當(dāng)主出現(xiàn)故障后,其上沒有被消費的消息在接下來產(chǎn)生的新的master主中可以繼續(xù)進(jìn)行消費。
這種方式客戶端訪問的都是主,從只是起到了一個備份訪問的作用
(1) 架構(gòu)圖
(2) 實現(xiàn)步驟
A、 安裝多個ActiveMQ
因為ActiveMQ的安裝和Tomcat一樣,解壓就可以使用,所以我們直接在/usr/local目錄下復(fù)制多份,就相當(dāng)于安裝了多個ActiveMQ,我們這里復(fù)制3個ActiveMQ出來。
復(fù)制前,先將運行的ActiveMQ停止。
B、 打開三個Xshell,分別連接不同的ActiveMQ方便操作
C、 配置每個activeMQ的conf /activemq.xml文件中的共享目錄
如果集群搭建在一臺機器上需要改端口,如果搭建在多臺上就不需要了
如果搭建在多臺服務(wù)器上,那么存放共享目錄的機器需要通過磁盤掛載的方式掛載到主從機器上。
● 修改三個ActiveMQ的共享目錄
persistenceAdapter>
<!--<kahaDB directory="${activemq.data}/kahadb"/>-->
<kahaDB directory="/opt/kahadb"/>
</persistenceAdapter>
● 修改完持久化目錄后,需要在/opt目錄下創(chuàng)建該目錄
D、 配置每個activeMQ的conf /activemq.xml文件中的端口
為了避免端口號的沖突,前三個地址端口+1,后兩個端口地址-1,可以將文件下載下來替換。
一個ActiveMQ
<transportConnector name="openwire" uri="tcp://0.0.0.0:61617?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5673?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1882?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
第二個ActiveMQ
<transportConnector name="openwire" uri="tcp://0.0.0.0:61618?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5674?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61615?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1881?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61612?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
第三個ActiveMQ
<transportConnector name="openwire" uri="tcp://0.0.0.0:61619?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5675?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1880?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61611?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
● maximumConnections 最大連接數(shù);
● wireFormat.maxFrameSize 表示一個完整消息的最大數(shù)據(jù)量,單位byte;
● 0.0.0.0表示任意ip
E、 修改conf/jetty.xml文件的jetty服務(wù)器端口(管理控制臺)
第一個ActiveMQ
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
<!-- the default port number for the web console -->
<property name="host" value="0.0.0.0"/>
<property name="port" value="8162"/>
</bean>
第二個ActiveMQ
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
<!-- the default port number for the web console -->
<property name="host" value="0.0.0.0"/>
<property name="port" value="8163"/>
</bean>
第三個ActiveMQ
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
<!-- the default port number for the web console -->
<property name="host" value="0.0.0.0"/>
<property name="port" value="8164"/>
</bean>
F、 啟動三臺ActiveMQ,可以測試驗證了
注意:啟動后會有一段時間延時,稍等一會;
瀏覽器訪問http://192.168.235.128:8162/, http://192.168.235.128:8163/ ,http://192.168.235.128:8164/ 判斷主從服務(wù)器;
web控制臺能訪問的是 master,不能訪問的是 slave。
G、 修改11-activemq-java中的程序收發(fā)消息代碼
連接時使用故障轉(zhuǎn)義協(xié)議failover
failover:(tcp://192.168.235.128:61617,tcp://192.168.235.128:61618,tcp://192.168.235.128:61619)
修改BROKER_URL地址
public static final String BROKER_URL = "failover:(tcp://192.168.235.128:61617,tcp://192.168.235.128:61618,tcp://192.168.235.128:61619)";
為了看到效果,發(fā)消息和接收消息我們都是用循環(huán)方式
//發(fā)消息 沒有返回值,是非阻塞的
while(true){
messageProducer.send(message);
}
查看發(fā)送以及接收idea控制臺輸出,停止ActiveMQ主,看效果
注意:如果是事務(wù)消息,被中斷那么程序發(fā)送程序出錯,不能實現(xiàn),所以我們將消息改為非事務(wù)消息進(jìn)行測試,如果是非事務(wù)消息就注釋掉session.commit。
該方式與shared filesystem方式類似,只是共享的存儲介質(zhì)由文件系統(tǒng)改成了數(shù)據(jù)庫。
(1) 架構(gòu)圖
(2) 實現(xiàn)步驟
A、 安裝多個ActiveMQ(已做)
因為ActiveMQ的安裝和Tomcat一樣,解壓就可以使用,所以我們直接在/usr/local目錄下復(fù)制多份,就相當(dāng)于安裝了多個ActiveMQ,我們這里復(fù)制3個ActiveMQ出來復(fù)制前,先將運行的ActiveMQ停止。
B、 打開三個Xshell,分別連接不同的ActiveMQ方便操作(已做)
C、 配置每個activeMQ的conf /activemq.xml文件中的持久化適配器是jdbc數(shù)據(jù)庫方式
<persistenceAdapter>
<jdbcPersistenceAdapter dataSource="#mysql-ds"/>
</persistenceAdapter>
D、 配置每個數(shù)據(jù)庫連接池
注意:連接池的配置需要配置在的外面
<bean id="mysql-ds" class="com.alibaba.druid.pool.DruidDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://localhost:3306/activemq?useUnicode=true&characterEncoding=utf8&useSSL=false"/>
<property name="username" value="root"/>
<property name="password" value="123456"/>
</bean>
E、 在每個ActiveMQ的lib目錄下加入mysql的驅(qū)動包和數(shù)據(jù)庫連接池Druid包,該包在我提供的資料05-ActiveMQ\resources\lib下
可以通過Xftp或者rz命令上傳。
F、 啟動MySQL數(shù)據(jù)庫,并創(chuàng)建activemq數(shù)據(jù)庫
G、 配置每個activeMQ的conf /activemq.xml文件中的端口(已做)
如果集群搭建在一臺機器上需要改端口,如果搭建在多臺上就不需要了;
為了避免端口號的沖突,前三個地址端口+1,后兩個端口地址-1。
第一個ActiveMQ
<transportConnector name="openwire" uri="tcp://0.0.0.0:61617?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5673?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1882?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
第二個ActiveMQ
<transportConnector name="openwire" uri="tcp://0.0.0.0:61618?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5674?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61615?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1881?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61612?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
第三個ActiveMQ
<transportConnector name="openwire" uri="tcp://0.0.0.0:61619?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5675?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1880?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61611?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
● maximumConnections 最大連接數(shù);
● wireFormat.maxFrameSize 表示一個完整消息的最大數(shù)據(jù)量,單位byte;
● 0.0.0.0表示任意ip
H、 修改conf/jetty.xml文件的jetty服務(wù)器端口(管理控制臺) (已做)
第一個ActiveMQ
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
<!-- the default port number for the web console -->
<property name="host" value="0.0.0.0"/>
<property name="port" value="8162"/>
</bean>
第二個ActiveMQ
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
<!-- the default port number for the web console -->
<property name="host" value="0.0.0.0"/>
<property name="port" value="8163"/>
</bean>
第三個ActiveMQ
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
<!-- the default port number for the web console -->
<property name="host" value="0.0.0.0"/>
<property name="port" value="8164"/>
</bean>
I、啟動三臺ActiveMQ,可以測試驗證了
瀏覽器訪問http://192.168.235.128:8162/, http://192.168.235.128:8163/ ,http://192.168.235.128:8164/ 判斷主從服務(wù)器;
web控制臺能訪問的是 master,不能訪問的是 slave。
J、 修改11-activemq-java中的程序收發(fā)消息類(已做)
連接時使用故障轉(zhuǎn)義協(xié)議failover
failover:(tcp://192.168.235.128:61617,tcp://192.168.235.128:61618,tcp://192.168.235.128:61619)
修改BROKER_URL地址
public static final String BROKER_URL = "failover:(tcp://192.168.235.128:61617,tcp://192.168.235.128:61618,tcp://192.168.235.128:61619)";
為了看到效果,發(fā)消息和接收消息我們都是用循環(huán)方式
//發(fā)消息 沒有返回值,是非阻塞的
while(true){
messageProducer.send(message);
session.commit();
}
查看發(fā)送以及接收idea控制臺輸出,停止ActiveMQ主,看效果
注意:如果是事務(wù)消息,被中斷那么程序發(fā)送程序出錯,不能實現(xiàn),所以我們將消息改為非事務(wù)消息進(jìn)行測試,如果是非事務(wù)消息就注釋掉session.commit。
3、Replicated LevelDB Store方式主從集群(常用)
基于可復(fù)制的LevelDB存儲方式的集群;
這種集群方式是ActiveMQ5.9版本以后新增的特性,它使用ZooKeeper從一組broker中協(xié)調(diào)選擇一個broker作為master主,其他broker作為slave從的模式。所有slave從節(jié)點通過復(fù)制master主節(jié)點的消息來實現(xiàn)消息同步,當(dāng)主出現(xiàn)故障后,沒有被消費的消息在從服務(wù)器上也同步了一份,所以不會有消息的丟失。
LevelDB 是 Google開發(fā)的一套用于持久化數(shù)據(jù)的高性能kv數(shù)據(jù)庫,ActiveMQ利用該數(shù)據(jù)庫進(jìn)行數(shù)據(jù)的存儲。
只有master 接受客戶端連接,slave不接受客戶端連接,Master的所有存儲操作都將被復(fù)制到slaves。
在這個模式中,需要有半數(shù)以上的broker是正常的,集群才是可用的,超過半數(shù)broker故障,ZooKeeper的選舉算法將不能選擇master,從而導(dǎo)致集群不可用。
(1)架構(gòu)圖
(2) 實現(xiàn)步驟
A、 安裝多個ActiveMQ(已做)
因為ActiveMQ的安裝和Tomcat一樣,解壓就可以使用,所以我們直接在/usr/local目錄下復(fù)制多份,就相當(dāng)于安裝了多個ActiveMQ,我們這里復(fù)制3個ActiveMQ出來。
復(fù)制前,先將運行的ActiveMQ停止
B、 打開三個Xshell,分別連接不同的ActiveMQ方便操作(已做)
C、 配置每個activeMQ的conf /activemq.xml文件中的持久化適配器replicatedLevelDB方式
<persistenceAdapter>
<replicatedLevelDB
replicas="3"
bind="tcp://0.0.0.0:0"
zkAddress="localhost:2181"/>
</persistenceAdapter>
參數(shù)說明
● replicas :集群中存在的節(jié)點的數(shù)目
● bind :當(dāng)該節(jié)點成為master后,將使用該bind配置的ip和端口進(jìn)行數(shù)據(jù)復(fù)制
● zkAddress :ZooKeeper的地址
D、 啟動ZooKeeper服務(wù)器
E、 啟動三臺ActiveMQ,可以測試驗證了
瀏覽器訪問http://192.168.235.128:8162/, http://192.168.235.128:8163/ ,http://192.168.235.128:8164/ 判斷主從服務(wù)器;
web控制臺能訪問的是 master,不能訪問的是 slave。
F、 修改11-activemq-java中的程序收發(fā)消息類(已做)
連接時使用故障轉(zhuǎn)義協(xié)議failover
ailover:(tcp://192.168.235.128:61617,tcp://192.168.235.128:61618,tcp://192.168.235.128:61619)
修改BROKER_URL地址
public static final String BROKER_URL = "failover:(tcp://192.168.235.128:61617,tcp://192.168.235.128:61618,tcp://192.168.235.128:61619)";
為了看到效果,發(fā)消息和接收消息我們都是用循環(huán)方式
//發(fā)消息 沒有返回值,是非阻塞的
while(true){
messageProducer.send(message);
}
查看發(fā)送以及接收idea控制臺輸出,停止ActiveMQ主,看效果。
G、 把其中的一臺master關(guān)閉,留下兩臺運行,觀察效果
H、 繼續(xù)關(guān)閉下一臺master,留下一臺運行,觀察效果
I、 啟動其中一臺,讓兩個運行,再觀察效果
(3) 總結(jié)
這種方式,不適合集群太大,也就是activemq不能太多,因為多個activemq之間需要復(fù)制消息,這個比較耗資源,占用網(wǎng)絡(luò),建議3、5臺。