RabbitMQ.MessageProducer 新建类
ProductionMessageTx
csharppublic static void Show()
{
var factory = new ConnectionFactory()
{
HostName = "localhost",
UserName = "guest",
Password = "guest"
};
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
Console.WriteLine("生产者准备就绪");
channel.QueueDeclare("MessageTxQueue01", true, false, false, null);
channel.QueueDeclare("MessageTxQueue02", true, false, false, null);
channel.ExchangeDeclare("MessageTxQueueExchange", ExchangeType.Direct, true, false, null);
channel.QueueBind("MessageTxQueue01", "MessageTxQueueExchange", "MessageTxKey01");
channel.QueueBind("MessageTxQueue02", "MessageTxQueueExchange", "MessageTxKey02");
string message = "";
int i = 0;
while (!message.Equals("quit", StringComparison.CurrentCultureIgnoreCase))
{
message = Console.ReadLine();
var body = Encoding.UTF8.GetBytes(message);
try
{
// 开启事务机制
channel.TxSelect();
// 发送消息
channel.BasicPublish("MessageTxQueueExchange", "MessageTxKey01", null, body);
if (i == 2)
throw new Exception("发送消息异常");
channel.BasicPublish("MessageTxQueueExchange", "MessageTxKey02", null, body);
// 事务提交
channel.TxCommit();
i++;
Console.WriteLine($"{message} 发送到Broke成功");
}
catch (Exception)
{
Console.WriteLine($"{message} 发送到Broke失败");
channel.TxRollback();
}
}
Console.Read();
}
Confirm
模式RabbitMQ.MessageProducer 新建类
ProductionMessageConfirm
csharppublic static void Show()
{
var factory = new ConnectionFactory()
{
HostName = "localhost",
UserName = "guest",
Password = "guest"
};
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
Console.WriteLine("生产者准备就绪");
channel.QueueDeclare("ConfirmSelectQueue", true, false, false, null);
channel.ExchangeDeclare("ConfirmSelectQueueExchange", ExchangeType.Direct, true, false, null);
channel.ExchangeBind("ConfirmSelectQueue", "ConfirmSelectQueueExchange", "ConfirmSelectKey");
string message = "";
while (!message.Equals("quit", StringComparison.CurrentCultureIgnoreCase))
{
message = Console.ReadLine();
var body = Encoding.UTF8.GetBytes(message);
try
{
// 开启事务机制
channel.ConfirmSelect();
// 发送消息
channel.BasicPublish("ConfirmSelectQueueExchange", "ConfirmSelectKey", null, body);
// 事务提交
if (channel.WaitForConfirms())
Console.WriteLine($"{message} 发送到Broke成功");
// 所有消息发送成功,正常执行,有消息发送失败,抛出异常
// channel.WaitForConfirmsOrDie();
}
catch (Exception)
{
Console.WriteLine($"{message} 发送到Broke失败");
// 通知管理员,写入日志,重试
}
}
Console.Read();
}
本文作者:一叶知秋
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!