博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
RabbitMQ如何发送与接收数据
阅读量:4031 次
发布时间:2019-05-24

本文共 7335 字,大约阅读时间需要 24 分钟。

本文运行环境:rabbitmq-server-3.8.3,客户端使用RabbitMQ.Client 3.6.14 开发工具vs2015。

安装好rabbitmq之后,接下来学习一下如何使用,先来添加一个队列并发送一个消息进队列。新建一个发送端的工程并引用RabbitMQ.Client.dll

CS文件中添加如下引用

本文过程分为连接操作和发送操作

连接操作代码:

private void btnConnect_Click(object sender, EventArgs e)        {            if (btnConnect.Text.Equals("连接"))            {                factory.HostName = txtIp.Text;//Rabbit server所在地址                int port = 5672;//默认端口                int.TryParse(txtPort.Text, out port);                factory.Port = port;                factory.UserName = txtLoginID.Text;//登录用户,同登录Rabbit web管理工具                factory.Password = txtPwd.Text;//登录密码,同登录Rabbit web管理工具                factory.RequestedHeartbeat = 60;                connection = factory.CreateConnection();                channel = connection.CreateModel();                if (connection.IsOpen && channel.IsOpen)                {                    btnConnect.Text = "断开连接";                    btnConnect.BackColor = Color.Green;                    //channel.QueueDeclare(txtQueueName.Text, false, false, false, null);//创建一个名称为txtQueueName.Text的消息队列,非持久化的队列                    channel.QueueDeclare(txtQueueName.Text, true, false, false, null);//创建一个名称为txtQueueName.Text的消息队列,持久化的队列                }                else                {                    btnConnect.Text = "连接";                    btnConnect.BackColor = Color.Red;                }            }            else            {                if (!channel.IsOpen)                {                    channel.Close();                    channel.Dispose();                }                if(!connection.IsOpen)                {                    connection.Close();                    connection.Dispose();                }                if (!connection.IsOpen && !channel.IsOpen) btnConnect.Text = "连接";            }        }

发送操作代码:

private void btnSend_Click(object sender, EventArgs e)        {            if(txtMessage.Text != "")            {                var properties = channel.CreateBasicProperties();                properties.DeliveryMode = 1;                properties.Persistent = true;                channel.BasicPublish("", "scott", properties, Encoding.UTF8.GetBytes(txtMessage.Text));            }        }

以下是运行结果

可以在web管理工具界面看到新增的队列及发送出去的消息。在上述代码中,使用了队列持久化参数durable = true,channel.QueueDeclare(txtQueueName.Text, true, false, false, null); QueueDeclare第二个参数,该队列在服务端关机重启后还会存在。

队列和消息都已经产生了,接近来新建一个用于接收消息的工程,需要的引用同上。RabbitMQ接收消息分为两种情况,主动获取和等待推送。主动获取的方法:var result = channel.BasicGet("scott", true);  主动获取需要定时去请求数据,这种操作比较耗资源,性能不高,试想一下,如果一天新消息并不多,程序要产生大量的请求然而多数情况下是一无所获,所以并非首选。本文使用等待推送的方式。

先在接收消息窗体文件中添加全局变量

ConnectionFactory factory = new ConnectionFactory();IConnection connection;IModel channel;EventingBasicConsumer consumer;

连接操作代码

private void btnConnect_Click(object sender, EventArgs e)        {            if (btnConnect.Text.Equals("连接"))            {                factory.HostName = txtIp.Text;//主机名,Rabbit会拿这个IP生成一个endpoint,就是socket绑定的那个终结点。                int port = 5672;                int.TryParse(txtPort.Text, out port);                factory.Port = port;                factory.UserName = txtLoginID.Text;//默认用户名,用户可以在服务端自定义创建,有相关命令行                factory.Password = txtPwd.Text;//默认密码                factory.RequestedHeartbeat = 60;                connection = factory.CreateConnection();                channel = connection.CreateModel();                if (connection.IsOpen && channel.IsOpen)                {                    btnConnect.Text = "断开连接";                    btnConnect.BackColor = Color.Green;                    btnReceiving.Enabled = true;                }                else                {                    btnConnect.Text = "连接";                    btnConnect.BackColor = Color.Red;                    btnReceiving.Enabled = false;                }            }            else            {                if (channel.IsOpen)                {                    channel.Close();                    channel.Dispose();                }                if (connection.IsOpen)                {                    connection.Close();                    connection.Dispose();                }                if (!connection.IsOpen && !channel.IsOpen) btnConnect.Text = "连接";            }        }

接收操作代码

private void btnReceiving_Click(object sender, EventArgs e)        {            btnReceiving.Enabled = false;            try            {                channel.QueueDeclare(txtQueueName.Text, true, false, false, null);                channel.BasicQos(0, 1, false);                consumer = new EventingBasicConsumer(channel);//消费者                consumer.Received += (model, ea) =>                {                    var body = ea.Body;                    var message = Encoding.UTF8.GetString(body);                    Action act = delegate () {                        lstMsg.Items.Add(message);                    };                    this.Invoke(act);                    channel.BasicAck(ea.DeliveryTag,true);                };                channel.BasicConsume("scott", false, consumer);//消费消息            }            catch            { }            finally            {                            }        }

接收新消息后显示添加到list显示

可以看到队列scott中的消息已经被获取到了,也就是被消费了,确认消费代码 channel.BasicAck(ea.DeliveryTag,true);

在接收消息测试中,网上包括官网提供的代码,以下是官网代码来自于

using RabbitMQ.Client;using RabbitMQ.Client.Events;using System;using System.Text;class Receive{    public static void Main()    {        var factory = new ConnectionFactory() { HostName = "localhost" };        using(var connection = factory.CreateConnection())        using(var channel = connection.CreateModel())        {            channel.QueueDeclare(queue: "hello",                                 durable: false,                                 exclusive: false,                                 autoDelete: false,                                 arguments: null);            var consumer = new EventingBasicConsumer(channel);            consumer.Received += (model, ea) =>            {                var body = ea.Body;                var message = Encoding.UTF8.GetString(body.ToArray());                Console.WriteLine(" [x] Received {0}", message);            };            channel.BasicConsume(queue: "hello",                                 autoAck: true,                                 consumer: consumer);            Console.WriteLine(" Press [enter] to exit.");            Console.ReadLine();        }    }}

 以下是网上查到的部分写法

using (var connection = factory.CreateConnection())            {                using (var channel = connection.CreateModel())                {                    channel.QueueDeclare("scott", true, false, false, null);                                var consumer = new EventingBasicConsumer(channel);//消费者                                        consumer.Received += (model, ea) =>                    {                        var body = ea.Body;                        var message = Encoding.UTF8.GetString(body);                        Console.WriteLine($"Received:{message}");                    };            }}

上面这两种写法在我本机环境下测试不会触发推送事件,也就是using (var connection = factory.CreateConnection())和 using (var channel = connection.CreateModel())这种模式影响了接收事件consumer.Received,因为这里using作用域结束后,connection和channel已经被释放掉了。所以我在本文中测试中将它们都定义为全局的了,至于官网和网上这种写法目前还是有疑问,也不知道是不是版本问题呢。

文中代码位置:

转载地址:http://kgqbi.baihongyu.com/

你可能感兴趣的文章
android raw读取超过1M文件的方法
查看>>
ubuntu下SVN服务器安装配置
查看>>
MPMoviePlayerViewController和MPMoviePlayerController的使用
查看>>
CocoaPods实践之制作篇
查看>>
[Mac]Mac 操作系统 常见技巧
查看>>
苹果Swift编程语言入门教程【中文版】
查看>>
捕鱼忍者(ninja fishing)之游戏指南+游戏攻略+游戏体验
查看>>
iphone开发基础之objective-c学习
查看>>
iphone开发之SDK研究(待续)
查看>>
计算机网络复习要点
查看>>
Variable property attributes or Modifiers in iOS
查看>>
NSNotificationCenter 用法总结
查看>>
C primer plus 基础总结(一)
查看>>
剑指offer算法题分析与整理(一)
查看>>
剑指offer算法题分析与整理(三)
查看>>
部分笔试算法题整理
查看>>
Ubuntu 13.10使用fcitx输入法
查看>>
pidgin-lwqq 安装
查看>>
mint/ubuntu安装搜狗输入法
查看>>
C++动态申请数组和参数传递问题
查看>>