using System;
using MQTTnet.Client;
using MQTTnet;
using MQTTnet.Client.Options;
using System.Net.Sockets;
using System.Threading.Tasks;
namespace HDL_ON.DAL.Net
{
public static class Mqtt_A_Protocol
{
///
/// MqttClient
///
public static IMqttClient mqttClient_A;
public static string mqttClientIP;
public static string mqttGatewayMAC;
static bool remoteIsConnected;
static bool onConnection = false;
static Mqtt_A_Protocol()
{
InitMqtt();
}
///
/// 断开远程Mqtt的链接
///
public static async Task DisConnectRemoteMqttClient(string s = "")
{
try
{
if (remoteIsConnected)
{
remoteIsConnected = false;
System.Console.WriteLine($"Remote主动断开_{s}");
//await RemoteMqttClient.DisconnectAsync(new MQTTnet.Client.Disconnecting.MqttClientDisconnectOptions { }, CancellationToken.None);
await mqttClient_A.DisconnectAsync();
}
}
catch (Exception e)
{
Console.WriteLine($"Remote断开通讯连接出异常:{e.Message}");
}
}
static bool isSubscribeSuccess;
static async Task SubscribeTopics()
{
if (remoteIsConnected && !isSubscribeSuccess)
{
try
{
var Topic1 = $"/BusGateWayToApp/{mqttGatewayMAC}/Common/Json";
try
{
await mqttClient_A.SubscribeAsync(Topic1);
}
catch (Exception ex)
{
await DisConnectRemoteMqttClient(ex.Message);
await StartMqtt();
if (remoteIsConnected)
{
await mqttClient_A.SubscribeAsync(Topic1);
}
}
}
catch { }
}
}
static void InitMqtt()
{
new System.Threading.Thread(async () =>
{
while (true)
{
try
{
System.Threading.Thread.Sleep(1000);
//if (!CommonPage.IsRemote)
// continue;
if (remoteIsConnected)
continue;
await StartMqtt();
await SubscribeTopics();
}
catch { }
}
})
{ IsBackground = true }.Start();
}
///
/// 启动A协议Mqtt
///
public static async Task StartMqtt()
{
try
{
if (remoteIsConnected)
return;
if (onConnection)
return;
onConnection = true;
new System.Threading.Thread(async () =>
{
if (remoteIsConnected)
return;
try
{
if (mqttClient_A == null)
{
mqttClient_A = new MqttFactory().CreateMqttClient();
mqttClient_A.UseApplicationMessageReceivedHandler(async e =>
{
var aesDecryptTopic = e.ApplicationMessage.Topic;
var aesDecryptPayload = e.ApplicationMessage.Payload;
MainPage.Log(aesDecryptTopic);
MainPage.Log($"Des Topic={aesDecryptTopic}");
});
mqttClient_A.UseConnectedHandler(async (e) =>
{
MainPage.Log("mqtt connected !!");
onConnection = false;
});
}
try
{
int readCount = 0;
BusSocket.Stop();
System.Threading.Thread.Sleep(1000);
BusSocket.Start(6688);
System.Threading.Thread.Sleep(1000);
Control.ReadGatewayIPAddress();
while (true)
{
if (!string.IsNullOrEmpty(mqttClientIP))
{
break;
}
else if (readCount > 10)
{
onConnection = false;
return;
}
else
{
Control.ReadGatewayIPAddress();
System.Threading.Thread.Sleep(200);
}
}
BusSocket.Stop();
System.Threading.Thread.Sleep(1000);
BusSocket.Start(6000);
System.Threading.Thread.Sleep(1000);
var options = new MqttClientOptionsBuilder()//MQTT连接参数填充
.WithClientId(Guid.NewGuid().ToString().Substring(0, 5))//客户端ID
.WithTcpServer(mqttClientIP, 1883)//MQTTServerIP.Text, Int32.Parse(MQTTServerPort.Text.ToString()))//TCP服务端 1883 ,即MQTT服务端
.WithCredentials("", "")//"", "")//凭证 帐号 密码
.WithCommunicationTimeout(new TimeSpan(0, 0, 60)) //重连超时时间,默认5s
.WithKeepAlivePeriod(new TimeSpan(0, 0, 15)) //保持连接时间,默认5s,心跳包
.Build();
await mqttClient_A.ConnectAsync(options);
remoteIsConnected = true;
}
catch { }
}
catch (Exception ex)
{
}
finally
{
onConnection = false;
}
})
{ IsBackground = true }.Start();
}
catch (Exception ex)
{
MainPage.Log("============>" + ex.Message);
}
}
///
///
///
/// 附加数据包
/// 操作类型:0=网关控制;1=订阅网关数据;2=订阅网关上线数据
///
public static async Task MqttRemoteSend(byte[] message)
{
try
{
if (mqttClient_A == null || !mqttClient_A.IsConnected)
{
await StartMqtt();
}
if (!mqttClient_A.IsConnected)
{
return;
}
var topicName = $"/AppToBusGateWay/{mqttGatewayMAC}/Common/Json";
var m = new MqttApplicationMessage { Topic = topicName, Payload = message, Retain = false, QualityOfServiceLevel = MQTTnet.Protocol.MqttQualityOfServiceLevel.ExactlyOnce };
await mqttClient_A?.PublishAsync(m);
}
catch (Exception e)
{
}
}
}
}