在上篇的基础上新建控制台
csharpdonnet new console -n RabbitMQ.MessageConsumer02 -f net5.0
dotnet sln add .\RabbitMQ.MessageConsumer02
新建类
FanoutExchange
csharppublic static void Show()
{
var factory = new ConnectionFactory()
{
HostName = "localhost",
UserName = "guest",
Password = "guest"
};
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
// 定义两个队列
channel.QueueDeclare("FanoutExchange01", true, false, false, null);
channel.QueueDeclare("FanoutExchange02", true, false, false, null);
// 定义交换机名称,交换机类型
channel.ExchangeDeclare("FanoutExchange", ExchangeType.Fanout, true, false, null);
// 将两个队列绑定到交换机上
channel.QueueBind("FanoutExchange01", "FanoutExchange", string.Empty, null);
channel.QueueBind("FanoutExchange02", "FanoutExchange", string.Empty, null);
// 在控制台输入消息,按enter键发送消息
int i = 1;
while (true)
{
var message = $"通知{i}";
var body = Encoding.UTF8.GetBytes(message);
// 基本发布
channel.BasicPublish("FanoutExchange", string.Empty, null, body);
Console.WriteLine($"通知【{message}】已发送到队列");
Thread.Sleep(2000);
i++;
}
}
新建类
FanoutExchange
csharppublic static void Show()
{
var factory = new ConnectionFactory()
{
HostName = "localhost",
UserName = "guest",
Password = "guest"
};
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
// 声明交换机
channel.ExchangeDeclare("FanoutExchange", ExchangeType.Fanout, true, false, null);
// 声明队列
channel.QueueDeclare("FanoutExchange01", true, false, false, null);
// 绑定 exchange 和 queue
channel.QueueBind("FanoutExchange01", "FanoutExchange", string.Empty, null);
// 定义消费者
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body.ToArray());
Console.WriteLine($"接收成功!【{message}】,邮件通知");
};
Console.WriteLine("通知服务准备就绪");
string result = channel.BasicConsume("FanoutExchange01", true, consumer);
Console.WriteLine(result);
Console.ReadLine();
}
新建类
FanoutExchange
csharppublic static void Show()
{
var factory = new ConnectionFactory()
{
HostName = "localhost",
UserName = "guest",
Password = "guest"
};
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
// 声明交换机
channel.ExchangeDeclare("FanoutExchange", ExchangeType.Fanout, true, false, null);
// 声明队列
channel.QueueDeclare("FanoutExchange02", true, false, false, null);
// 绑定 exchange 和 queue
channel.QueueBind("FanoutExchange02", "FanoutExchange", string.Empty, null);
// 定义消费者
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body.ToArray());
Console.WriteLine($"接收成功!【{message}】,邮件通知");
};
Console.WriteLine("通知服务准备就绪");
channel.BasicConsume("FanoutExchange02", true, consumer);
Console.ReadLine();
}
两个队列都能接收到消息并消费
本文作者:一叶知秋
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!