using System; using MQTTnet.Client; using MQTTnet; using MQTTnet.Client.Options; using System.Net.Sockets; using System.Threading.Tasks; namespace HDL_ON.DAL.Net { public static class Mqtt_A_Protocol { /// /// MqttClient /// public static IMqttClient mqttClient_A; public static string mqttClientIP; public static string mqttGatewayMAC; static bool remoteIsConnected; static bool onConnection = false; static Mqtt_A_Protocol() { InitMqtt(); } /// /// 断开远程Mqtt的链接 /// public static async Task DisConnectRemoteMqttClient(string s = "") { try { if (remoteIsConnected) { remoteIsConnected = false; System.Console.WriteLine($"Remote主动断开_{s}"); //await RemoteMqttClient.DisconnectAsync(new MQTTnet.Client.Disconnecting.MqttClientDisconnectOptions { }, CancellationToken.None); await mqttClient_A.DisconnectAsync(); } } catch (Exception e) { Console.WriteLine($"Remote断开通讯连接出异常:{e.Message}"); } } static bool isSubscribeSuccess; static async Task SubscribeTopics() { if (remoteIsConnected && !isSubscribeSuccess) { try { var Topic1 = $"/BusGateWayToApp/{mqttGatewayMAC}/Common/Json"; try { await mqttClient_A.SubscribeAsync(Topic1); } catch (Exception ex) { await DisConnectRemoteMqttClient(ex.Message); await StartMqtt(); if (remoteIsConnected) { await mqttClient_A.SubscribeAsync(Topic1); } } } catch { } } } static void InitMqtt() { new System.Threading.Thread(async () => { while (true) { try { System.Threading.Thread.Sleep(1000); //if (!CommonPage.IsRemote) // continue; if (remoteIsConnected) continue; await StartMqtt(); await SubscribeTopics(); } catch { } } }) { IsBackground = true }.Start(); } /// /// 启动A协议Mqtt /// public static async Task StartMqtt() { try { if (remoteIsConnected) return; if (onConnection) return; onConnection = true; new System.Threading.Thread(async () => { if (remoteIsConnected) return; try { if (mqttClient_A == null) { mqttClient_A = new MqttFactory().CreateMqttClient(); mqttClient_A.UseApplicationMessageReceivedHandler(async e => { var aesDecryptTopic = e.ApplicationMessage.Topic; var aesDecryptPayload = e.ApplicationMessage.Payload; MainPage.Log(aesDecryptTopic); MainPage.Log($"Des Topic={aesDecryptTopic}"); }); mqttClient_A.UseConnectedHandler(async (e) => { MainPage.Log("mqtt connected !!"); onConnection = false; }); } try { int readCount = 0; BusSocket.Stop(); System.Threading.Thread.Sleep(1000); BusSocket.Start(6688); System.Threading.Thread.Sleep(1000); Control.ReadGatewayIPAddress(); while (true) { if (!string.IsNullOrEmpty(mqttClientIP)) { break; } else if (readCount > 10) { onConnection = false; return; } else { Control.ReadGatewayIPAddress(); System.Threading.Thread.Sleep(200); } } BusSocket.Stop(); System.Threading.Thread.Sleep(1000); BusSocket.Start(6000); System.Threading.Thread.Sleep(1000); var options = new MqttClientOptionsBuilder()//MQTT连接参数填充 .WithClientId(Guid.NewGuid().ToString().Substring(0, 5))//客户端ID .WithTcpServer(mqttClientIP, 1883)//MQTTServerIP.Text, Int32.Parse(MQTTServerPort.Text.ToString()))//TCP服务端 1883 ,即MQTT服务端 .WithCredentials("", "")//"", "")//凭证 帐号 密码 .WithCommunicationTimeout(new TimeSpan(0, 0, 60)) //重连超时时间,默认5s .WithKeepAlivePeriod(new TimeSpan(0, 0, 15)) //保持连接时间,默认5s,心跳包 .Build(); await mqttClient_A.ConnectAsync(options); remoteIsConnected = true; } catch { } } catch (Exception ex) { } finally { onConnection = false; } }) { IsBackground = true }.Start(); } catch (Exception ex) { MainPage.Log("============>" + ex.Message); } } /// /// /// /// 附加数据包 /// 操作类型:0=网关控制;1=订阅网关数据;2=订阅网关上线数据 /// public static async Task MqttRemoteSend(byte[] message) { try { if (mqttClient_A == null || !mqttClient_A.IsConnected) { await StartMqtt(); } if (!mqttClient_A.IsConnected) { return; } var topicName = $"/AppToBusGateWay/{mqttGatewayMAC}/Common/Json"; var m = new MqttApplicationMessage { Topic = topicName, Payload = message, Retain = false, QualityOfServiceLevel = MQTTnet.Protocol.MqttQualityOfServiceLevel.ExactlyOnce }; await mqttClient_A?.PublishAsync(m); } catch (Exception e) { } } } }