本文共 7335 字,大约阅读时间需要 24 分钟。
本文运行环境:rabbitmq-server-3.8.3,客户端使用RabbitMQ.Client 3.6.14 开发工具vs2015。
安装好rabbitmq之后,接下来学习一下如何使用,先来添加一个队列并发送一个消息进队列。新建一个发送端的工程并引用RabbitMQ.Client.dll
CS文件中添加如下引用
本文过程分为连接操作和发送操作
连接操作代码:
private void btnConnect_Click(object sender, EventArgs e) { if (btnConnect.Text.Equals("连接")) { factory.HostName = txtIp.Text;//Rabbit server所在地址 int port = 5672;//默认端口 int.TryParse(txtPort.Text, out port); factory.Port = port; factory.UserName = txtLoginID.Text;//登录用户,同登录Rabbit web管理工具 factory.Password = txtPwd.Text;//登录密码,同登录Rabbit web管理工具 factory.RequestedHeartbeat = 60; connection = factory.CreateConnection(); channel = connection.CreateModel(); if (connection.IsOpen && channel.IsOpen) { btnConnect.Text = "断开连接"; btnConnect.BackColor = Color.Green; //channel.QueueDeclare(txtQueueName.Text, false, false, false, null);//创建一个名称为txtQueueName.Text的消息队列,非持久化的队列 channel.QueueDeclare(txtQueueName.Text, true, false, false, null);//创建一个名称为txtQueueName.Text的消息队列,持久化的队列 } else { btnConnect.Text = "连接"; btnConnect.BackColor = Color.Red; } } else { if (!channel.IsOpen) { channel.Close(); channel.Dispose(); } if(!connection.IsOpen) { connection.Close(); connection.Dispose(); } if (!connection.IsOpen && !channel.IsOpen) btnConnect.Text = "连接"; } }
发送操作代码:
private void btnSend_Click(object sender, EventArgs e) { if(txtMessage.Text != "") { var properties = channel.CreateBasicProperties(); properties.DeliveryMode = 1; properties.Persistent = true; channel.BasicPublish("", "scott", properties, Encoding.UTF8.GetBytes(txtMessage.Text)); } }
以下是运行结果
可以在web管理工具界面看到新增的队列及发送出去的消息。在上述代码中,使用了队列持久化参数durable = true,channel.QueueDeclare(txtQueueName.Text, true, false, false, null); QueueDeclare第二个参数,该队列在服务端关机重启后还会存在。
队列和消息都已经产生了,接近来新建一个用于接收消息的工程,需要的引用同上。RabbitMQ接收消息分为两种情况,主动获取和等待推送。主动获取的方法:var result = channel.BasicGet("scott", true); 主动获取需要定时去请求数据,这种操作比较耗资源,性能不高,试想一下,如果一天新消息并不多,程序要产生大量的请求然而多数情况下是一无所获,所以并非首选。本文使用等待推送的方式。
先在接收消息窗体文件中添加全局变量
ConnectionFactory factory = new ConnectionFactory();IConnection connection;IModel channel;EventingBasicConsumer consumer;
连接操作代码
private void btnConnect_Click(object sender, EventArgs e) { if (btnConnect.Text.Equals("连接")) { factory.HostName = txtIp.Text;//主机名,Rabbit会拿这个IP生成一个endpoint,就是socket绑定的那个终结点。 int port = 5672; int.TryParse(txtPort.Text, out port); factory.Port = port; factory.UserName = txtLoginID.Text;//默认用户名,用户可以在服务端自定义创建,有相关命令行 factory.Password = txtPwd.Text;//默认密码 factory.RequestedHeartbeat = 60; connection = factory.CreateConnection(); channel = connection.CreateModel(); if (connection.IsOpen && channel.IsOpen) { btnConnect.Text = "断开连接"; btnConnect.BackColor = Color.Green; btnReceiving.Enabled = true; } else { btnConnect.Text = "连接"; btnConnect.BackColor = Color.Red; btnReceiving.Enabled = false; } } else { if (channel.IsOpen) { channel.Close(); channel.Dispose(); } if (connection.IsOpen) { connection.Close(); connection.Dispose(); } if (!connection.IsOpen && !channel.IsOpen) btnConnect.Text = "连接"; } }
接收操作代码
private void btnReceiving_Click(object sender, EventArgs e) { btnReceiving.Enabled = false; try { channel.QueueDeclare(txtQueueName.Text, true, false, false, null); channel.BasicQos(0, 1, false); consumer = new EventingBasicConsumer(channel);//消费者 consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Action act = delegate () { lstMsg.Items.Add(message); }; this.Invoke(act); channel.BasicAck(ea.DeliveryTag,true); }; channel.BasicConsume("scott", false, consumer);//消费消息 } catch { } finally { } }
接收新消息后显示添加到list显示
可以看到队列scott中的消息已经被获取到了,也就是被消费了,确认消费代码 channel.BasicAck(ea.DeliveryTag,true);
在接收消息测试中,网上包括官网提供的代码,以下是官网代码来自于
using RabbitMQ.Client;using RabbitMQ.Client.Events;using System;using System.Text;class Receive{ public static void Main() { var factory = new ConnectionFactory() { HostName = "localhost" }; using(var connection = factory.CreateConnection()) using(var channel = connection.CreateModel()) { channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body.ToArray()); Console.WriteLine(" [x] Received {0}", message); }; channel.BasicConsume(queue: "hello", autoAck: true, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } }}
以下是网上查到的部分写法
using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { channel.QueueDeclare("scott", true, false, false, null); var consumer = new EventingBasicConsumer(channel);//消费者 consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine($"Received:{message}"); }; }}
上面这两种写法在我本机环境下测试不会触发推送事件,也就是using (var connection = factory.CreateConnection())和 using (var channel = connection.CreateModel())这种模式影响了接收事件consumer.Received,因为这里using作用域结束后,connection和channel已经被释放掉了。所以我在本文中测试中将它们都定义为全局的了,至于官网和网上这种写法目前还是有疑问,也不知道是不是版本问题呢。
文中代码位置:
转载地址:http://kgqbi.baihongyu.com/