新建类
ConsumptionACKConfirm
csharppublic static void Show()
{
var factory = new ConnectionFactory()
{
HostName = "localhost",
UserName = "guest",
Password = "guest"
};
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
Console.WriteLine("生产者准备就绪");
channel.QueueDeclare("ConsumptionACKConfirmQueue", true, false, false, null);
channel.ExchangeDeclare("ConsumptionACKConfirmQueueExchange", ExchangeType.Direct, true, false);
channel.QueueBind("ConsumptionACKConfirmQueue", "ConsumptionACKConfirmQueueExchange", "ConsumptionACKConfirmKey");
// 设置优先级
var props = channel.CreateBasicProperties();
for (int i = 1; i < 1000; i++)
{
props.DeliveryMode = 2;
string message = $"消息{i}";
channel.BasicPublish("ConsumptionACKConfirmQueueExchange", "ConsumptionACKConfirmKey", props, Encoding.UTF8.GetBytes(message));
Thread.Sleep(300);
Console.WriteLine($"【{message}】 已发送");
}
}
新建类
ConsumptionACKConfirm
csharppublic static void Show()
{
var factory = new ConnectionFactory()
{
HostName = "localhost",
UserName = "guest",
Password = "guest"
};
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
#region 定义消费者
var consumer = new EventingBasicConsumer(channel);
int i = 0;
consumer.Received += (model, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body.ToArray());
//Console.WriteLine($"接受到消息:{message}");
if (i < 50)
{
// 手动确认 消息正常消费,告诉Broker,可以将当前消息删除
channel.BasicAck(ea.DeliveryTag, false);
Console.WriteLine(message);
}
else
{
// 否定,告诉 Broker,消息没有正常消费,重新写入到队列中
channel.BasicReject(ea.DeliveryTag, true);
}
i++;
};
Console.WriteLine("消费者准备就绪");
{
// 处理消息
//autoAsk true 自动确认
//channel.BasicConsume("ConsumptionACKConfirmQueue", true, consumer);
}
{
// 处理消息
//autoAsk false 手动(显示)确认
channel.BasicConsume("ConsumptionACKConfirmQueue", false, consumer);
}
Console.ReadKey();
#endregion
}
BasicConsume
的第二个参数设置为 true,会在初次进入接收消息时,RabbitMQ
网页客户端会清空消息,会将所有消息交由消费端去处理,无法保证消息处理的正确性
需要将
BasicConsume
的第二个参数设置为 false,使用channel.BasicAck(ea.DeliveryTag, false)
进行手动确认,若确认失败,使用channel.BasicReject(ea.DeliveryTag, true)
重新将消息写入队列
本文作者:一叶知秋
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!