2024-06-20
后端技术
00
请注意,本文编写于 91 天前,最后修改于 91 天前,其中某些信息可能已经过时。

目录

1、TX事务模式
2、Confirm 模式

1、TX事务模式

RabbitMQ.MessageProducer 新建类 ProductionMessageTx

csharp
public static void Show() { var factory = new ConnectionFactory() { HostName = "localhost", UserName = "guest", Password = "guest" }; using var connection = factory.CreateConnection(); using var channel = connection.CreateModel(); Console.WriteLine("生产者准备就绪"); channel.QueueDeclare("MessageTxQueue01", true, false, false, null); channel.QueueDeclare("MessageTxQueue02", true, false, false, null); channel.ExchangeDeclare("MessageTxQueueExchange", ExchangeType.Direct, true, false, null); channel.QueueBind("MessageTxQueue01", "MessageTxQueueExchange", "MessageTxKey01"); channel.QueueBind("MessageTxQueue02", "MessageTxQueueExchange", "MessageTxKey02"); string message = ""; int i = 0; while (!message.Equals("quit", StringComparison.CurrentCultureIgnoreCase)) { message = Console.ReadLine(); var body = Encoding.UTF8.GetBytes(message); try { // 开启事务机制 channel.TxSelect(); // 发送消息 channel.BasicPublish("MessageTxQueueExchange", "MessageTxKey01", null, body); if (i == 2) throw new Exception("发送消息异常"); channel.BasicPublish("MessageTxQueueExchange", "MessageTxKey02", null, body); // 事务提交 channel.TxCommit(); i++; Console.WriteLine($"{message} 发送到Broke成功"); } catch (Exception) { Console.WriteLine($"{message} 发送到Broke失败"); channel.TxRollback(); } } Console.Read(); }

2、Confirm 模式

RabbitMQ.MessageProducer 新建类 ProductionMessageConfirm

csharp
public static void Show() { var factory = new ConnectionFactory() { HostName = "localhost", UserName = "guest", Password = "guest" }; using var connection = factory.CreateConnection(); using var channel = connection.CreateModel(); Console.WriteLine("生产者准备就绪"); channel.QueueDeclare("ConfirmSelectQueue", true, false, false, null); channel.ExchangeDeclare("ConfirmSelectQueueExchange", ExchangeType.Direct, true, false, null); channel.ExchangeBind("ConfirmSelectQueue", "ConfirmSelectQueueExchange", "ConfirmSelectKey"); string message = ""; while (!message.Equals("quit", StringComparison.CurrentCultureIgnoreCase)) { message = Console.ReadLine(); var body = Encoding.UTF8.GetBytes(message); try { // 开启事务机制 channel.ConfirmSelect(); // 发送消息 channel.BasicPublish("ConfirmSelectQueueExchange", "ConfirmSelectKey", null, body); // 事务提交 if (channel.WaitForConfirms()) Console.WriteLine($"{message} 发送到Broke成功"); // 所有消息发送成功,正常执行,有消息发送失败,抛出异常 // channel.WaitForConfirmsOrDie(); } catch (Exception) { Console.WriteLine($"{message} 发送到Broke失败"); // 通知管理员,写入日志,重试 } } Console.Read(); }

本文作者:一叶知秋

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!