更新時(shí)間:2021-06-17 11:24:42 來源:動(dòng)力節(jié)點(diǎn) 瀏覽1146次
最簡單的使用
1.引用 rabbitMQ.Client
Install-Package RabbitMQ.Client
2.創(chuàng)建生產(chǎn)者
//創(chuàng)建連接工廠
ConnectionFactory factory = new ConnectionFactory
{
UserName = "mqAdmin",//用戶名
Password = "mqAdmin",//密碼
HostName = "192.168.1.103"http://rabbitmq ip
};
//創(chuàng)建連接
var connection = factory.CreateConnection();
//創(chuàng)建通道
var channel = connection.CreateModel();
//聲明一個(gè)隊(duì)列
channel.QueueDeclare("hello", false, false, false, null);
Console.WriteLine("\nRabbitMQ連接成功,請(qǐng)輸入消息,輸入exit退出!");
string input;
do
{
input = Console.ReadLine();
var sendBytes = Encoding.UTF8.GetBytes(input);
//發(fā)布消息
channel.BasicPublish("", "hello", null, sendBytes);
} while (input.Trim().ToLower()!="exit");
channel.Close();
connection.Close();
3.創(chuàng)建生產(chǎn)者
//創(chuàng)建連接工廠
ConnectionFactory factory = new ConnectionFactory
{
UserName = "admin",//用戶名
Password = "admin",//密碼
HostName = "192.168.157.130"http://rabbitmq ip
};
//創(chuàng)建連接
var connection = factory.CreateConnection();
//創(chuàng)建通道
var channel = connection.CreateModel();
//事件基本消費(fèi)者
EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
//接收到消息事件
consumer.Received += (ch, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body);
Console.WriteLine($"收到消息: {message}");
//確認(rèn)該消息已被消費(fèi)
channel.BasicAck(ea.DeliveryTag, false);
};
//啟動(dòng)消費(fèi)者 設(shè)置為手動(dòng)應(yīng)答消息
channel.BasicConsume("hello", false, consumer);
Console.WriteLine("消費(fèi)者已啟動(dòng)");
Console.ReadKey();
channel.Dispose();
connection.Close();
RabbitMQ消費(fèi)失敗的處理
RabbitMQ采用消息應(yīng)答機(jī)制,即消費(fèi)者收到一個(gè)消息之后,需要發(fā)送一個(gè)應(yīng)答,然后RabbitMQ才會(huì)將這個(gè)消息從隊(duì)列中刪除,如果消費(fèi)者在消費(fèi)過程中出現(xiàn)異常,斷開連接切沒有發(fā)送應(yīng)答,那么RabbitMQ會(huì)將這個(gè)消息重新投遞。
//接收到消息事件
consumer.Received += (ch, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body);
Console.WriteLine($"收到消息: {message}");
Console.WriteLine($"收到該消息[{ea.DeliveryTag}] 延遲10s發(fā)送回執(zhí)");
Thread.Sleep(10000);
//確認(rèn)該消息已被消費(fèi)
channel.BasicAck(ea.DeliveryTag, false);
Console.WriteLine($"已發(fā)送回執(zhí)[{ea.DeliveryTag}]");
};
使用RabbitMQ的Exchange
前面我們可以看到生產(chǎn)者將消息投遞到Queue中,實(shí)際上這在RabbitMQ中這種事情永遠(yuǎn)都不會(huì)發(fā)生。實(shí)際的情況是,生產(chǎn)者將消息發(fā)送到Exchange(交換器),由Exchange將消息路由到一個(gè)或多個(gè)Queue中(或者丟棄)
AMQP協(xié)議中的核心思想就是生產(chǎn)者和消費(fèi)者隔離,生產(chǎn)者從不直接將消息發(fā)送給隊(duì)列。生產(chǎn)者通常不知道是否一個(gè)消息會(huì)被發(fā)送到隊(duì)列中,只是將消息發(fā)送到一個(gè)交換機(jī)。先由Exchange來接收,然后Exchange按照特定的策略轉(zhuǎn)發(fā)到Queue進(jìn)行存儲(chǔ)。同理,消費(fèi)者也是如此。Exchange 就類似于一個(gè)交換機(jī),轉(zhuǎn)發(fā)各個(gè)消息分發(fā)到相應(yīng)的隊(duì)列中。
RabbitMQ提供了四種Exchange模式:direct,fanout,topic,header 。但是 header模式在實(shí)際使用中較少,所以這里只介紹前三種模式。
Exchange不是消費(fèi)者關(guān)心的,所以消費(fèi)者的代碼完全不用變,用上面的消費(fèi)者就行了。
Direct Exchange
所有發(fā)送到Direct Exchange的消息被轉(zhuǎn)發(fā)到具有指定RouteKey的Queue。
Direct模式,可以使用rabbitMQ自帶的Exchange:default Exchange 。所以不需要將Exchange進(jìn)行任何綁定(binding)操作 。消息傳遞時(shí),RouteKey必須完全匹配,才會(huì)被隊(duì)列接收,否則該消息會(huì)被拋棄。
//創(chuàng)建連接
var connection = factory.CreateConnection();
//創(chuàng)建通道
var channel = connection.CreateModel();
//定義一個(gè)Direct類型交換機(jī)
channel.ExchangeDeclare(exchangeName, ExchangeType.Direct, false, false, null);
//定義一個(gè)隊(duì)列
channel.QueueDeclare(queueName, false, false, false, null);
//將隊(duì)列綁定到交換機(jī)
channel.QueueBind(queueName, exchangeName, routeKey, null);
Fanout Exchange
所有發(fā)送到Fanout Exchange的消息都會(huì)被轉(zhuǎn)發(fā)到與該Exchange 綁定(Binding)的所有Queue上。
Fanout Exchange 不需要處理RouteKey 。只需要簡單的將隊(duì)列綁定到exchange 上。這樣發(fā)送到exchange的消息都會(huì)被轉(zhuǎn)發(fā)到與該交換機(jī)綁定的所有隊(duì)列上。類似子網(wǎng)廣播,每臺(tái)子網(wǎng)內(nèi)的主機(jī)都獲得了一份復(fù)制的消息。
所以,F(xiàn)anout Exchange 轉(zhuǎn)發(fā)消息是最快的。
為了演示效果,定義了兩個(gè)隊(duì)列,分別為hello1,hello2,每個(gè)隊(duì)列都擁有一個(gè)消費(fèi)者。
static void Main(string[] args)
{
string exchangeName = "TestFanoutChange";
string queueName1 = "hello1";
string queueName2 = "hello2";
string routeKey = "";
//創(chuàng)建連接工廠
ConnectionFactory factory = new ConnectionFactory
{
UserName = "admin",//用戶名
Password = "admin",//密碼
HostName = "192.168.157.130"http://rabbitmq ip
};
//創(chuàng)建連接
var connection = factory.CreateConnection();
//創(chuàng)建通道
var channel = connection.CreateModel();
//定義一個(gè)Direct類型交換機(jī)
channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout, false, false, null);
//定義隊(duì)列1
channel.QueueDeclare(queueName1, false, false, false, null);
//定義隊(duì)列2
channel.QueueDeclare(queueName2, false, false, false, null);
//將隊(duì)列綁定到交換機(jī)
channel.QueueBind(queueName1, exchangeName, routeKey, null);
channel.QueueBind(queueName2, exchangeName, routeKey, null);
//生成兩個(gè)隊(duì)列的消費(fèi)者
ConsumerGenerator(queueName1);
ConsumerGenerator(queueName2);
Console.WriteLine($"\nRabbitMQ連接成功,\n\n請(qǐng)輸入消息,輸入exit退出!");
string input;
do
{
input = Console.ReadLine();
var sendBytes = Encoding.UTF8.GetBytes(input);
//發(fā)布消息
channel.BasicPublish(exchangeName, routeKey, null, sendBytes);
} while (input.Trim().ToLower() != "exit");
channel.Close();
connection.Close();
}
/// <summary>
/// 根據(jù)隊(duì)列名稱生成消費(fèi)者
/// </summary>
/// <param name="queueName"></param>
static void ConsumerGenerator(string queueName)
{
//創(chuàng)建連接工廠
ConnectionFactory factory = new ConnectionFactory
{
UserName = "admin",//用戶名
Password = "admin",//密碼
HostName = "192.168.157.130"http://rabbitmq ip
};
//創(chuàng)建連接
var connection = factory.CreateConnection();
//創(chuàng)建通道
var channel = connection.CreateModel();
//事件基本消費(fèi)者
EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
//接收到消息事件
consumer.Received += (ch, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body);
Console.WriteLine($"Queue:{queueName}收到消息: {message}");
//確認(rèn)該消息已被消費(fèi)
channel.BasicAck(ea.DeliveryTag, false);
};
//啟動(dòng)消費(fèi)者 設(shè)置為手動(dòng)應(yīng)答消息
channel.BasicConsume(queueName, false, consumer);
Console.WriteLine($"Queue:{queueName},消費(fèi)者已啟動(dòng)");
}
Topic Exchange
所有發(fā)送到Topic Exchange的消息被轉(zhuǎn)發(fā)到能和Topic匹配的Queue上,
Exchange 將路由進(jìn)行模糊匹配。可以使用通配符進(jìn)行模糊匹配,符號(hào)“#”匹配一個(gè)或多個(gè)詞,符號(hào)“”匹配不多不少一個(gè)詞。因此“XiaoChen.#”能夠匹配到“XiaoChen.pets.cat”,但是“XiaoChen.” 只會(huì)匹配到“XiaoChen.money”。
所以,Topic Exchange 使用非常靈活。
string exchangeName = "TestTopicChange";
string queueName = "hello";
string routeKey = "TestRouteKey.*";
//創(chuàng)建連接工廠
ConnectionFactory factory = new ConnectionFactory
{
UserName = "admin",//用戶名
Password = "admin",//密碼
HostName = "192.168.157.130"http://rabbitmq ip
};
//創(chuàng)建連接
var connection = factory.CreateConnection();
//創(chuàng)建通道
var channel = connection.CreateModel();
//定義一個(gè)Direct類型交換機(jī)
channel.ExchangeDeclare(exchangeName, ExchangeType.Topic, false, false, null);
//定義隊(duì)列1
channel.QueueDeclare(queueName, false, false, false, null);
//將隊(duì)列綁定到交換機(jī)
channel.QueueBind(queueName, exchangeName, routeKey, null);
Console.WriteLine($"\nRabbitMQ連接成功,\n\n請(qǐng)輸入消息,輸入exit退出!");
string input;
do
{
input = Console.ReadLine();
var sendBytes = Encoding.UTF8.GetBytes(input);
//發(fā)布消息
channel.BasicPublish(exchangeName, "TestRouteKey.one", null, sendBytes);
} while (input.Trim().ToLower() != "exit");
channel.Close();
connection.Close();
以上就是動(dòng)力節(jié)點(diǎn)小編介紹的"Rabbitmq使用教程",希望對(duì)大家有幫助,如有疑問,請(qǐng)?jiān)诰€咨詢,有專業(yè)老師隨時(shí)為您服務(wù)。
0基礎(chǔ) 0學(xué)費(fèi) 15天面授
有基礎(chǔ) 直達(dá)就業(yè)
業(yè)余時(shí)間 高薪轉(zhuǎn)行
工作1~3年,加薪神器
工作3~5年,晉升架構(gòu)
提交申請(qǐng)后,顧問老師會(huì)電話與您溝通安排學(xué)習(xí)
初級(jí) 202925
初級(jí) 203221
初級(jí) 202629
初級(jí) 203743