2024-06-20
后端技术
00
请注意,本文编写于 91 天前,最后修改于 91 天前,其中某些信息可能已经过时。

目录

1、RabbitMQ.MessageProducer
2、RabbitMQ.MessageConsumer01
3、注意
1、自动消费
2、手动消费

1、RabbitMQ.MessageProducer

新建类 ConsumptionACKConfirm

csharp
public static void Show() { var factory = new ConnectionFactory() { HostName = "localhost", UserName = "guest", Password = "guest" }; using var connection = factory.CreateConnection(); using var channel = connection.CreateModel(); Console.WriteLine("生产者准备就绪"); channel.QueueDeclare("ConsumptionACKConfirmQueue", true, false, false, null); channel.ExchangeDeclare("ConsumptionACKConfirmQueueExchange", ExchangeType.Direct, true, false); channel.QueueBind("ConsumptionACKConfirmQueue", "ConsumptionACKConfirmQueueExchange", "ConsumptionACKConfirmKey"); // 设置优先级 var props = channel.CreateBasicProperties(); for (int i = 1; i < 1000; i++) { props.DeliveryMode = 2; string message = $"消息{i}"; channel.BasicPublish("ConsumptionACKConfirmQueueExchange", "ConsumptionACKConfirmKey", props, Encoding.UTF8.GetBytes(message)); Thread.Sleep(300); Console.WriteLine($"【{message}】 已发送"); } }

2、RabbitMQ.MessageConsumer01

新建类 ConsumptionACKConfirm

csharp
public static void Show() { var factory = new ConnectionFactory() { HostName = "localhost", UserName = "guest", Password = "guest" }; using var connection = factory.CreateConnection(); using var channel = connection.CreateModel(); #region 定义消费者 var consumer = new EventingBasicConsumer(channel); int i = 0; consumer.Received += (model, ea) => { var message = Encoding.UTF8.GetString(ea.Body.ToArray()); //Console.WriteLine($"接受到消息:{message}"); if (i < 50) { // 手动确认 消息正常消费,告诉Broker,可以将当前消息删除 channel.BasicAck(ea.DeliveryTag, false); Console.WriteLine(message); } else { // 否定,告诉 Broker,消息没有正常消费,重新写入到队列中 channel.BasicReject(ea.DeliveryTag, true); } i++; }; Console.WriteLine("消费者准备就绪"); { // 处理消息 //autoAsk true 自动确认 //channel.BasicConsume("ConsumptionACKConfirmQueue", true, consumer); } { // 处理消息 //autoAsk false 手动(显示)确认 channel.BasicConsume("ConsumptionACKConfirmQueue", false, consumer); } Console.ReadKey(); #endregion }

3、注意

1、自动消费

BasicConsume 的第二个参数设置为 true,会在初次进入接收消息时,RabbitMQ 网页客户端会清空消息,会将所有消息交由消费端去处理,无法保证消息处理的正确性

2、手动消费

需要将 BasicConsume 的第二个参数设置为 false,使用 channel.BasicAck(ea.DeliveryTag, false) 进行手动确认,若确认失败,使用 channel.BasicReject(ea.DeliveryTag, true) 重新将消息写入队列

本文作者:一叶知秋

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!