知乐空间

c#消息队列实际应用

在消息队列模型中,如何将消息广播到所有的消费者,这种模式称为“发布/订阅”。本文主要以一个简单的小例子,简述通过fanout交换机,实现消息的发布与订阅,仅供学习分享使用,如有不足之处,还请指正。

Fanout交换机模型

扇形交换机,采用广播模式,根据绑定的交换机,路由到与之对应的所有队列。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout交换机转发消息是最快的。

C#利用RabbitMQ实现消息订阅与发布

RabbitMQ控制台操作

新增两个队列

在同一个Virtual host下新增两个队列Q1,Q2,如下图所示:

C#利用RabbitMQ实现消息订阅与发布

绑定fanout交换机

将两个队列绑定到系统默认的fanout交换机,如下所示:

C#利用RabbitMQ实现消息订阅与发布

示例效果图

生产者,采用Fanout类型交换机发布消息,如下图所示:

C#利用RabbitMQ实现消息订阅与发布

当生产者发布 一条消息时,Q1,Q2两个队列均会收到,如下图所示:

C#利用RabbitMQ实现消息订阅与发布

当启动消费者后,两个消费者,均会订阅到相关消息,如下图所示:

C#利用RabbitMQ实现消息订阅与发布

核心代码

消息发布

建立连接后,将通道声明类型为Fanout的交换机,如下所示:

C#利用RabbitMQ实现消息订阅与发布
 1     /// 
 2     /// fanout类型交换机,发送消息
 3     /// 
 4     public class RabbitMqFanoutSendHelper : RabbitMqHelper {
 5         /// 
 6         /// 发送消息
 7         /// 
 8         /// 
 9         /// 
10         public bool SendMsg(string msg)
11         {
12             try
13             {
14                 using (var conn = GetConnection("/Alan.hsiang"))
15                 {
16                     using (var channel = conn.CreateModel())
17                     {
18                         channel.ExchangeDeclare(exchange: "amq.fanout", type: ExchangeType.Fanout,durable:true);
19                         
20                         var body = Encoding.UTF8.GetBytes(msg);
21 
22                         channel.BasicPublish(exchange: "amq.fanout",
23                                              routingKey: "",
24                                              basicProperties: null,
25                                              body: body);
26 
27                         //Console.WriteLine(" [x] Sent {0}", message);
28                     };
29                 };
30                 return true;
31             }
32             catch (Exception ex)
33             {
34                 throw ex;
35             }
36         }
37     }
C#利用RabbitMQ实现消息订阅与发布

消息订阅

建立连接后,通道声明类型为Fanout的交换机,并绑定队列进行订阅,如下所示:

C#利用RabbitMQ实现消息订阅与发布
 1    /// 
 2     /// 扇形交换机接收消息
 3     /// 
 4     public class RabbitMqFanoutReceiveHelper : RabbitMqHelper
 5     {
 6         public RabbitMqReceiveEventHandler OnReceiveEvent;
 7 
 8         private IConnection conn;
 9 
10         private IModel channel;
11 
12         private EventingBasicConsumer consumer;
13 
14         public bool StartReceiveMsg(string queueName)
15         {
16             try
17             {
18                 conn = GetConnection("/Alan.hsiang");
19 
20                 channel = conn.CreateModel();
21                 channel.ExchangeDeclare(exchange: "amq.fanout", type: ExchangeType.Fanout,durable:true);
22                 //此处随机取出交换机下的队列
23                 //var queueName = channel.QueueDeclare().QueueName;
24                 channel.QueueBind(queue: queueName, exchange: "amq.fanout", routingKey: "");
25                 consumer = new EventingBasicConsumer(channel);
26                 consumer.Received += (model, ea) =>
27                 {
28                     var body = ea.Body.ToArray();
29                     var message = Encoding.UTF8.GetString(body);
30                     //Console.WriteLine(" [x] Received {0}", message);
31                     if (OnReceiveEvent != null)
32                     {
33                         OnReceiveEvent(queueName+"::"+message);
34                     }
35                 };
36                 channel.BasicConsume(queue: queueName,
37                                         autoAck: true,
38                                         consumer: consumer);
39                 return true;
40             }
41             catch (Exception ex)
42             {
43                 throw ex;
44             }
45         }
46     }

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,请发送邮件至 ZLME@ZLME.COM 举报,一经查实,立刻删除。

留言与评论(共有 0 条评论)
验证码: