MQTTnet 2.8 及 3.0.16 的使用


十年河东,十年河西,莫欺少年穷

学无止境,精益求精

netcore3.1控制台应用程序,引入MQTTnet 2.8版本

订阅端:


using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
using MQTTnet;
using MQTTnet.Server; 
using MQTTnet.Client;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Generic;
using MQTTnet.Protocol;

namespace swapConsole
{
    class Program
    {
        private static MqttClient mqttClient = null;
        private static  string topic = "test123ABC";
        private static IMqttClientOptions Options
        {
            get
            {
                MqttClientOptionsBuilder builder = new MqttClientOptionsBuilder(); 
                builder.WithCleanSession(false);
                //用户名 密码
                builder.WithCredentials("", "");
                var id = Guid.NewGuid().ToString();
                builder.WithClientId(id);
                builder.WithTcpServer("1270.0.0.0", 1883);
                return builder.Build();
            }
        }
        static async Task Main(string[] args)
        {
            MqttFactory factory = new MqttFactory();
            if (mqttClient == null)
            {
                mqttClient = (MqttClient)factory.CreateMqttClient();
                mqttClient.ApplicationMessageReceived += MqttClient_ApplicationMessageReceived;
                mqttClient.Connected += MqttClient_Connected;
                mqttClient.Disconnected += async (s, e) =>
                 {
                     Console.WriteLine("尝试重连!" + Environment.NewLine);
                     await ConnectToServer();
                 };
            }
            await ConnectToServer(); 

            Console.ReadLine();
        }
        /// <summary>
        /// 连接MQTT服务器
        /// </summary>
        private   static async Task ConnectToServer()
        {
            try
            {
                var res =await  mqttClient.ConnectAsync(Options);
            }
            catch (Exception ex)
            {
                Console.WriteLine($"连接到MQTT服务器失败!" + Environment.NewLine + ex.Message + Environment.NewLine);
            }
        }
        /// <summary>
        /// 连接MQTT服务器触发
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        private static void MqttClient_Connected(object sender, EventArgs e)
        {
            Console.WriteLine("已连接到MQTT服务器!" + Environment.NewLine);
            SubscribeInfo();
        }

        /// <summary>
        /// 接收消息
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        private static void MqttClient_ApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e)
        {
            Console.WriteLine($">> {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}{Environment.NewLine}");
        }

        /// <summary>
        /// 订阅消息
        /// </summary>
        public static void SubscribeInfo()
        {
            if (string.IsNullOrEmpty(topic))
            {
                Console.WriteLine("订阅主题不能为空!");
                return;
            }

            if (!mqttClient.IsConnected)
            {
                Console.WriteLine("MQTT客户端尚未连接!");

                return;
            }
            mqttClient.SubscribeAsync(new List<TopicFilter> {
                new  TopicFilter(topic, MqttQualityOfServiceLevel.ExactlyOnce)
            });

            Console.WriteLine($"已订阅[{topic}]主题" + Environment.NewLine);
        }

        /// <summary>
        /// 退订消息
        /// </summary>
        public static void UnSubscribeInfo()
        { 

            if (string.IsNullOrEmpty(topic))
            {
                Console.WriteLine("退订主题不能为空!");
                return;
            }
            if (!mqttClient.IsConnected)
            {
                Console.WriteLine("MQTT客户端尚未连接!");
                return;
            }
            mqttClient.UnsubscribeAsync(topic);
            Console.WriteLine($"已退订[{topic}]主题" + Environment.NewLine);
        }

    }
}

View Code

发布端:


using MQTTnet;
using MQTTnet.Client;
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace swapPublish
{
    class Program
    {
        private static MqttClient mqttClient = null;
        private static string topic = "test123ABC";
        private static IMqttClientOptions Options
        {
            get
            {
                MqttClientOptionsBuilder builder = new MqttClientOptionsBuilder();
                builder.WithCleanSession(false);
                //用户名 密码
                builder.WithCredentials("", "");
                var id = Guid.NewGuid().ToString();
                builder.WithClientId(id);
                builder.WithTcpServer("127.0.0.1", 1883);
                return builder.Build();
            }
        }
        static async Task  Main(string[] args)
        {
            MqttFactory factory = new MqttFactory();
            if (mqttClient == null)
            {
                mqttClient = (MqttClient)factory.CreateMqttClient(); 
                mqttClient.Connected += MqttClient_Connected;
                mqttClient.Disconnected += async(s, e) =>
                {
                    Console.WriteLine("尝试重连!" + Environment.NewLine);
                    await ConnectToServer();
                };
            }
           await  ConnectToServer();
            Console.WriteLine("已断开MQTT连接!" + Environment.NewLine);

            Console.ReadLine();
        }
        /// <summary>
        /// 连接MQTT服务器
        /// </summary>
        private static async Task ConnectToServer()
        {
            try
            {
                var res = await mqttClient.ConnectAsync(Options);
            }
            catch (Exception ex)
            {
                Console.WriteLine($"连接到MQTT服务器失败!" + Environment.NewLine + ex.Message + Environment.NewLine);
            }
        }
        /// <summary>
        /// 连接MQTT服务器触发
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        private static void MqttClient_Connected(object sender, EventArgs e)
        {
            Console.WriteLine("已连接到MQTT服务器!" + Environment.NewLine);
            for(int i = 0; i < 10; i++)
            {
                var tak = PublishInfo(); 
                Thread.Sleep(2000);
            }
          
        }

        private static async  Task PublishInfo( )
        { 

            if (string.IsNullOrEmpty(topic))
            {
               Console.WriteLine("发布主题不能为空!");
                return;
            }

            string inputString = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss");
            MqttApplicationMessageBuilder builder = new MqttApplicationMessageBuilder(); 
            builder.WithPayload(Encoding.UTF8.GetBytes(inputString));
            builder.WithTopic(topic);
            builder.WithRetainFlag(false);
            builder.WithExactlyOnceQoS();
            await mqttClient.PublishAsync(builder.Build());
        }
    }
}

View Code

 如何只允许一个客户端消费同一个消息,暂时未解决!

大家有解决方法,请贴出评论。谢谢

MQTTnet  3.0.16 版本的使用

客户端:


using MQTTnet;
using MQTTnet.Adapter;
using MQTTnet.Client;
using MQTTnet.Client.Connecting;
using MQTTnet.Client.Disconnecting;
using MQTTnet.Client.Options;
using MQTTnet.Client.Receiving;
using MQTTnet.Protocol;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;

namespace mqttsub
{
    class Program
    {
        static async Task Main(string[] args)
        {
            MqttClient mqtt = new MqttClient();
            await mqtt.StartAsync();
            Console.ReadKey();
        }
    }

    public class MqttClient
    {
        private IMqttClient client; 
        private IMqttClientOptions options;
        MqttClientDto model =null;
        public MqttClient()
        {
            model = new MqttClientDto
            {
                Account = "",
                PassWord = "",
                ClientId = Guid.NewGuid().ToString(),
                IP = "",
                Port = 1883,
                Topic="test/+/ABC" //通配符模式 该模式匹配 test/123/ABC  testABC  test/DDDDD/ABC 等
            };
        }
        public async Task StartAsync()
        {
            try
            {
                client = new MqttFactory().CreateMqttClient();
                var build = new MqttClientOptionsBuilder()
                //配置客户端Id
                .WithClientId(Guid.NewGuid().ToString())
                //配置登录账号
                .WithCredentials(model.Account,model.PassWord)
                //配置服务器IP端口 这里得端口号是可空的
                .WithTcpServer(model.IP, 1883)
                .WithCleanSession();

                options = build.Build();
                //收到服务器发来消息
                client.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(MessageReceivedHandler);
                //client.UseApplicationMessageReceivedHandler(args=> {
                //    Console.WriteLine("===================================================");
                //    Console.WriteLine("收到消息:");
                //    Console.WriteLine($"主题:{args.ApplicationMessage.Topic}");
                //    Console.WriteLine($"消息:{Encoding.UTF8.GetString(args.ApplicationMessage.Payload)}");
                //    Console.WriteLine("+++++++++++++++++++++++++++++++++++++++++++++++++++");
                //    Console.WriteLine();
                //});
                //连接成功 
                client.ConnectedHandler = new MqttClientConnectedHandlerDelegate(ConnectedHandler);
                //client.UseConnectedHandler(args=> {
                //    Console.WriteLine("本客户端已连接成功");
                //    Console.WriteLine($"地址:{model.IP}");
                //    Console.WriteLine($"端口:{model.Port}");
                //    Console.WriteLine($"客户端:{model.ClientId}");
                //    Console.WriteLine($"账号:{model.Account}");
                //    Console.WriteLine();
                //    //第1种订阅方式
                //    client.SubscribeAsync("主题名称").GetAwaiter().GetResult();

                //    //第2种订阅方式
                //    List<MqttTopicFilter> Topics = new List<MqttTopicFilter>();
                //    Topics.Add(new MqttTopicFilter() { Topic = "主题名称A", QualityOfServiceLevel = MqttQualityOfServiceLevel.ExactlyOnce });
                //    Topics.Add(new MqttTopicFilter() { Topic = "主题名称B" });
                //    Topics.Add(new MqttTopicFilter() { Topic = "主题名称C" });
                //    client.SubscribeAsync(Topics.ToArray()).GetAwaiter().GetResult();

                //    //第3种订阅方式
                //    MqttClientSubscribeOptionsBuilder builder = new MqttClientSubscribeOptionsBuilder();
                //    builder.WithTopicFilter("AAA");
                //    client.SubscribeAsync(builder.Build()).GetAwaiter().GetResult();
                //});
                //断开连接 重连就写在此处
                client.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(DisconnectedHandler);
                //client.UseDisconnectedHandler(args =>
                //{
                //    Console.WriteLine("本客户端已经断开连接");
                //    Console.WriteLine();
                //    try
                //    {
                //        client.ConnectAsync(options).GetAwaiter().GetResult();
                //    }
                //    catch (Exception ex)
                //    {
                //        Console.WriteLine("重连失败");
                //    }
                //});
                //客户端发送消息
                //await client.PublishAsync("你想要的主题", "你需要发送的东西");
                //await client.PublishAsync("你想要的主题", Encoding.UTF8.GetBytes("你需要发送的东西").ToList());
                //连接
                await client.ConnectAsync(options);
            }
            catch (MqttConnectingFailedException)
            {
                Console.WriteLine("身份校验失败");
            }
            catch (Exception ex)
            {
                Console.WriteLine("出现异常");
                Console.WriteLine(ex.Message);
            }
        }


        /// <summary>
        /// 客户端断开连接后,如果需要重连在此处实现
        /// </summary>
        /// <param name="obj"></param>
        private async void DisconnectedHandler(MqttClientDisconnectedEventArgs obj)
        {
            Console.WriteLine("本客户端已经断开连接");
            Console.WriteLine();
            try
            {
                await client.ConnectAsync(options);
            }
            catch (Exception)
            {
                Console.WriteLine("重连失败");
            }
        }

        /// <summary>
        /// 连接成功 在此处做订阅主题(Topic)操作
        /// </summary>
        /// <param name="obj"></param>
        private async void ConnectedHandler(MqttClientConnectedEventArgs obj)
        {
            Console.WriteLine("本客户端已连接成功");
            Console.WriteLine($"地址:{model.IP}");
            Console.WriteLine($"端口:{model.Port}");
            Console.WriteLine($"客户端:{model.ClientId}");
            Console.WriteLine($"账号:{model.Account}");
            Console.WriteLine();
            //第1种订阅方式
            // client.SubscribeAsync("主题名称").GetAwaiter().GetResult();

            //第2种订阅方式
            List<MqttTopicFilter> Topics = new List<MqttTopicFilter>();
            Topics.Add(new MqttTopicFilter() { Topic = model.Topic, QualityOfServiceLevel = MqttQualityOfServiceLevel.ExactlyOnce});
            //Topics.Add(new MqttTopicFilter() { Topic = "主题名称B" });
            //Topics.Add(new MqttTopicFilter() { Topic = "主题名称C" });
            await client.SubscribeAsync(Topics.ToArray());

            //第3种订阅方式
            //MqttClientSubscribeOptionsBuilder builder = new MqttClientSubscribeOptionsBuilder();
            //builder.WithTopicFilter("AAA");
            //client.SubscribeAsync(builder.Build()).GetAwaiter().GetResult();
        }

        /// <summary>
        /// 收到消息
        /// </summary>
        /// <param name="obj"></param>
        private void MessageReceivedHandler(MqttApplicationMessageReceivedEventArgs obj)
        {
            Console.WriteLine("===================================================");
            Console.WriteLine("收到消息:");
            Console.WriteLine($"主题:{obj.ApplicationMessage.Topic}");
            Console.WriteLine($"消息:{Encoding.UTF8.GetString(obj.ApplicationMessage.Payload)}");
            Console.WriteLine("+++++++++++++++++++++++++++++++++++++++++++++++++++");
            Console.WriteLine();
        }
    }

    public class MqttClientDto
    {
        /// <summary>
        /// 连接地址
        /// </summary>
        public string IP { get; set; }
        /// <summary>
        /// 账号
        /// </summary>
        public string Account { get; set; }
        /// <summary>
        /// 密码
        /// </summary>
        public string PassWord { get; set; }
        /// <summary>
        /// 客户端Id
        /// </summary>
        public string ClientId { get; set; }

        public int Port { get; set; }

        public string Topic { get; set; }
    }
}

View Code

服务端:


using MQTTnet;
using MQTTnet.Client.Receiving;
using MQTTnet.Protocol;
using MQTTnet.Server;
using System;
using System.Net;
using System.Text;
using System.Threading.Tasks;

namespace MqttPub
{
    class Program
    {
        static async Task Main(string[] args)
        {
            await new ServerDome(). StartAsync();
            Console.Read();
        }
    }

    public class ServerDome  
    {
        private IMqttServer server;
        MqttClientDto model = null;
        public ServerDome()
        {
            model = new MqttClientDto
            {
                Account = "",
                PassWord = "",
                ClientId = Guid.NewGuid().ToString(),
                IP = "",
                Port = 1883,
                Topic = "test"
            };
        }

        public async Task StartAsync()
        {
            if (server == null || !server.IsStarted)
            {

                server = new MqttFactory().CreateMqttServer();
                MqttServerOptionsBuilder serverOptions = new MqttServerOptionsBuilder();
                //、默认监听端口 
                serverOptions.WithDefaultEndpointPort(model.Port);
                //校验客户端信息
                serverOptions.WithConnectionValidator(client => {
                    string Account = client.Username;
                    string PassWord = client.Password;
                    string clientid = client.ClientId;
                    if (Account == "" && PassWord == "")
                    {
                        client.ReasonCode = MqttConnectReasonCode.Success;
                        Console.WriteLine("校验成功");
                    }
                    else
                    {
                        client.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
                        Console.WriteLine("校验失败");
                    }
                });

                //客户端发送消息监听
                server.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(MessageReceivedHandler);
                //server.UseApplicationMessageReceivedHandler(args=>{
                //    Console.WriteLine("===================================================");
                //    Console.WriteLine("收到消息:");
                //    Console.WriteLine($"客户端:{args.ClientId}");
                //    Console.WriteLine($"主题:{args.ApplicationMessage.Topic}");
                //    Console.WriteLine($"消息:{Encoding.UTF8.GetString(args.ApplicationMessage.Payload)}");
                //    Console.WriteLine("+++++++++++++++++++++++++++++++++++++++++++++++++++");
                //    Console.WriteLine();
                //});
                //客户端连接事件
                server.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(ClientConnectedHandler);
                //server.UseClientConnectedHandler(args =>
                //{
                //    Console.WriteLine($"{args.ClientId}此客户端已经连接到服务器");
                //});
                //客户端断开连接事件
                server.ClientDisconnectedHandler = new MqttServerClientDisconnectedHandlerDelegate(ClientDisconnectedHandler);
                //server.UseClientDisconnectedHandler(args => {
                //    Console.WriteLine($"断开连接的客户端:{args.ClientId}");
                //    Console.WriteLine($"断开连接类型:{args.DisconnectType.ToString()}");
                //});

                //客户端订阅主题事件
                server.ClientSubscribedTopicHandler = new MqttServerClientSubscribedHandlerDelegate(ClientSubscribedTopicHandler);
                //客户端取消订阅主题事件
                server.ClientUnsubscribedTopicHandler = new MqttServerClientUnsubscribedTopicHandlerDelegate(ClientUnsubscribedTopicHandler);
                //服务器启动事件
                server.StartedHandler = new MqttServerStartedHandlerDelegate(StartedHandler);
                //服务器停止事件
                server.StoppedHandler = new MqttServerStoppedHandlerDelegate(StoppedHandler);
                //服务端发送数据
                //await  server.PublishAsync("你想要的主题","你需要发送的东西");
                //var mqttApplicationMessage = new MqttApplicationMessage();
                //mqttApplicationMessage.Topic = "你想要的主题";
                //mqttApplicationMessage.Payload = Encoding.ASCII.GetBytes("你需要发送的东西");
                //await server.PublishAsync(mqttApplicationMessage);
                //启动服务器
                await server.StartAsync(serverOptions.Build());
            }
        }

        public async Task StopAsync()
        {
            if (server != null)
            {
                if (server.IsStarted)
                {
                    await server.StopAsync();
                    server.Dispose();
                }
            }
        }

        /// <summary>
        /// 客户端取消订阅主题
        /// </summary>
        /// <param name="obj"></param>
        private void ClientUnsubscribedTopicHandler(MqttServerClientUnsubscribedTopicEventArgs obj)
        {
            Console.WriteLine($"客户端:{obj.ClientId}");
            Console.WriteLine($"取消订阅主题:{obj.TopicFilter}");
        }

        /// <summary>
        /// 客户端订阅的主题
        /// </summary>
        /// <param name="obj"></param>
        private void ClientSubscribedTopicHandler(MqttServerClientSubscribedTopicEventArgs obj)
        {
            Console.WriteLine($"客户端:{obj.ClientId}");
            Console.WriteLine($"订阅主题:{obj.TopicFilter.Topic}");
        }

        /// <summary>
        /// 客户端断开连接
        /// </summary>
        /// <param name="obj"></param>
        private void ClientDisconnectedHandler(MqttServerClientDisconnectedEventArgs obj)
        {
            Console.WriteLine($"断开连接的客户端:{obj.ClientId}");
            Console.WriteLine($"断开连接类型:{obj.DisconnectType.ToString()}");
        }

        /// <summary>
        /// 客户端连接到服务器事件
        /// </summary>
        /// <param name="obj"></param>
        private void ClientConnectedHandler(MqttServerClientConnectedEventArgs obj)
        {
            throw new NotImplementedException();
        }

        /// <summary>
        /// 收到各个客户端发送的消息
        /// </summary>
        /// <param name="obj"></param>
        private void MessageReceivedHandler(MqttApplicationMessageReceivedEventArgs obj)
        {
            Console.WriteLine("===================================================");
            Console.WriteLine("收到消息:");
            Console.WriteLine($"客户端:{obj.ClientId}");
            Console.WriteLine($"主题:{obj.ApplicationMessage.Topic}");
            Console.WriteLine($"消息:{Encoding.UTF8.GetString(obj.ApplicationMessage.Payload)}");
            Console.WriteLine("+++++++++++++++++++++++++++++++++++++++++++++++++++");
            Console.WriteLine();
        }


        /// <summary>
        /// MQTT启动服务器事件
        /// </summary>
        /// <param name="obj"></param>
        private void StartedHandler(EventArgs obj)
        {
            Console.WriteLine($"程序已经启动!监听端口为:{model.Port}");
        }

        /// <summary>
        /// MQTT服务器停止事件
        /// </summary>
        /// <param name="obj"></param>
        private void StoppedHandler(EventArgs obj)
        {
            Console.WriteLine("程序已经关闭");
        }
    }

    public class MqttClientDto
    {
        /// <summary>
        /// 连接地址
        /// </summary>
        public string IP { get; set; }
        /// <summary>
        /// 账号
        /// </summary>
        public string Account { get; set; }
        /// <summary>
        /// 密码
        /// </summary>
        public string PassWord { get; set; }
        /// <summary>
        /// 客户端Id
        /// </summary>
        public string ClientId { get; set; }

        public int Port { get; set; }

        public string Topic { get; set; }
    }
}

View Code

这里说明下如何使用通配符

例如,发送 topic 主题为:test/123/ABC 或者 test/234/ABC ,消费者在订阅时,可以使用:test/+/ABC  来订阅该类消息。

通配符的作用为分组订阅、 

 发布者发布内容为: test//status ,订阅者订阅的为:test/+/status

 

 当然,发布者也可以在 / / 之间增加内容,例如设备号:

 

  • 主题名不能使用通配符, 但是主题过滤器中可以使用通配符.因此,订阅者可以通过过滤器接合通配符订阅一类消息

 以MQTTnet  3.0.16 为例,开启自动确认,开启不保留最后一跳消息。