From 99bc815e07e39354f51421b77f4012ffd35594d8 Mon Sep 17 00:00:00 2001
From: wjc <1243177876@qq.com>
Date: 星期三, 28 六月 2023 18:03:00 +0800
Subject: [PATCH] 2023年06月28日18:02:58
---
HDLLinkLocalSdk/src/main/java/com/hdl/sdk/link/core/utils/mqtt/MqttRecvClient.java | 412 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
1 files changed, 412 insertions(+), 0 deletions(-)
diff --git a/HDLLinkLocalSdk/src/main/java/com/hdl/sdk/link/core/utils/mqtt/MqttRecvClient.java b/HDLLinkLocalSdk/src/main/java/com/hdl/sdk/link/core/utils/mqtt/MqttRecvClient.java
new file mode 100644
index 0000000..6e8ef74
--- /dev/null
+++ b/HDLLinkLocalSdk/src/main/java/com/hdl/sdk/link/core/utils/mqtt/MqttRecvClient.java
@@ -0,0 +1,412 @@
+package com.hdl.sdk.link.core.utils.mqtt;
+
+import android.content.Context;
+import android.text.TextUtils;
+
+import com.alibaba.fastjson.JSON;
+
+import com.hdl.sdk.link.common.utils.LogUtils;
+import com.hdl.sdk.link.core.bean.LinkPacket;
+import com.hdl.sdk.link.core.bean.eventbus.EventBindMiniRemoteSuccessInfo;
+import com.hdl.sdk.link.core.bean.eventbus.EventNotifyRefreshGatewayAesKeyInfo;
+import com.hdl.sdk.link.core.bean.gateway.GatewayBean;
+import com.hdl.sdk.link.core.config.HDLLinkConfig;
+import com.hdl.sdk.link.core.connect.HDLConnectHelper;
+import com.hdl.sdk.link.core.protocol.LinkMessageDecoder;
+import com.hdl.sdk.link.core.utils.AesUtil;
+import com.hdl.sdk.link.core.utils.QueueUtils;
+import com.hdl.sdk.link.gateway.HDLLinkLocalGateway;
+import com.hdl.sdk.link.socket.bean.Packet;
+
+import org.eclipse.paho.client.mqttv3.IMqttActionListener;
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+import org.eclipse.paho.client.mqttv3.IMqttToken;
+import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
+import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+import org.greenrobot.eventbus.EventBus;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Created by Zoro on 2018/8/1.
+ * desc:2019/7/24 杩炴帴鎴愬姛浠ュ悗鍐嶈闃� 涓嶉渶瑕佸彂閫佸績璺� 涓嶉渶瑕佽嚜宸卞鐞嗛噸杩�
+ */
+
+public class MqttRecvClient {
+ private static String mBroker;
+ private MemoryPersistence persistence = new MemoryPersistence();
+ private MqttAsyncClient sampleClient;
+ private MqttConnectOptions connOpts = new MqttConnectOptions();
+ private MqttThread mqttThread;
+ private static volatile MqttRecvClient mqttRecvClient;
+ private final String TAG = "MqttRecvClient";
+ private static final String[] ignoreTopics = new String[]{"/thing/topo/found", "/ota/device/progress/up"};
+ /**
+ * 涓婃鐨勪富棰橀渶瑕佽褰� 鏇存敼涓婚鐨勬椂鍊欓渶瑕佸彇娑堣闃�
+ */
+ private static List<String> lastTopicFilters = new ArrayList<>();
+ private static String mClientId;
+ private static String mUserName;
+ private static String mPassWord;
+ private final int[] qos = {0};
+
+ private MqttRecvClient() {
+ if (mqttThread == null) {
+ mqttThread = new MqttThread();
+ }
+ if (TextUtils.isEmpty(mUserName) || TextUtils.isEmpty(mPassWord)) {
+ return;
+ }
+ mqttThread.start();
+ }
+
+ public static void init(Context context, String broker1, String deviceId, String userName, String pwd) {
+ mClientId = deviceId;
+ mBroker = broker1;
+ mUserName = userName;
+ mPassWord = pwd;
+ MqttRecvClient.create();
+ }
+
+ public void send(String topic, byte[] bytes) {
+ try {
+ if (TextUtils.isEmpty(topic)) {
+ LogUtils.e("璇锋眰涓婚涓簄ull");
+ return;
+ }
+ checkAndsubscribeAllTopics(topic);
+ publish(topic, bytes);
+ } catch (MqttException e) {
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * 鍙戝竷
+ *
+ * @param topic 涓婚
+ * @param bytes 鍐呭
+ * @throws MqttException
+ */
+ public void publish(String topic, byte[] bytes) throws MqttException {
+ //鍥炲鏃讹紝mqtt涓婚涓殑鏂瑰悜瑕佸彉鍖栵紝瑕佸仛鏂瑰悜鏇挎崲
+ mqttRecvClient.sampleClient.publish(topic, bytes, 1, false, null, new IMqttActionListener() {
+ @Override
+ public void onSuccess(IMqttToken asyncActionToken) {
+ LogUtils.d(TAG, topic);
+ }
+
+ @Override
+ public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
+ }
+ });
+ }
+
+ public static void create() {
+ if (mqttRecvClient == null) {
+ mqttRecvClient = new MqttRecvClient();
+ }
+ }
+
+ /**
+ * 浣跨敤鐨勬椂鍊欓渶瑕佸垽鏂潪绌�
+ */
+ public static MqttRecvClient getInstance() {
+ if (null == mqttRecvClient) {
+ synchronized (MqttRecvClient.class) {
+ if (null == mqttRecvClient) {
+ mqttRecvClient = new MqttRecvClient();
+ }
+ return mqttRecvClient;
+ }
+ }
+ return mqttRecvClient;
+ }
+
+ class MqttThread extends Thread {
+ @Override
+ public void run() {
+ super.run();
+ connect();
+ }
+ }
+
+ private void connect() {
+ try {
+ if (sampleClient != null) {
+ sampleClient.close();
+ }
+ sampleClient = new MqttAsyncClient(mBroker, mClientId, persistence);
+ connOpts.setUserName(mUserName);
+// connOpts.setServerURIs(new String[]{mBroker});
+ connOpts.setPassword(mPassWord.toCharArray());
+ connOpts.setCleanSession(true);
+ connOpts.setKeepAliveInterval(10);
+ connOpts.setAutomaticReconnect(true);
+ connOpts.setConnectionTimeout(10);
+ connOpts.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);
+ sampleClient.setCallback(new MqttCallbackExtended() {
+ public void connectComplete(boolean reconnect, String serverURI) {
+ LogUtils.d(TAG, "connect success");
+ checkAndsubscribeAllTopics("");
+ }
+
+ public void connectionLost(Throwable throwable) {
+ LogUtils.d(TAG, "杩炴帴鏂紑");
+ lastTopicFilters.clear();
+ }
+
+ public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
+ managerMqttMsg(topic, mqttMessage);
+ }
+
+ public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
+ }
+ });
+ sampleClient.connect(connOpts);
+
+ } catch (Exception me) {
+ me.printStackTrace();
+ }
+ }
+
+ /**
+ * 澶勭悊鎺ユ敹鐨刴qtt鏁版嵁
+ *
+ * @param topic 鎺ユ敹涓婚
+ * @param mqttMessage 鎺ユ敹鏁版嵁
+ * @throws Exception
+ */
+ public void managerMqttMsg(String topic, MqttMessage mqttMessage) throws Exception {
+ LogUtils.d(TAG, "\r\n" + "mqtt->杩滅▼鍥炲涓婚" + topic);
+ if (HDLConnectHelper.isLocal()) {
+ boolean needReturn = true;
+ //濡傛灉鏄湰鍦版ā寮忥紝浜戠涓嬫潵鐨勭綉鍏虫暟鎹儴鍒嗘暟鎹笉鎺ユ敹(濡傦細ota鍗囩骇鍙嶉杩涘害)
+ for (String ignoreTopic : ignoreTopics) {
+ if (topic.endsWith(ignoreTopic)) {
+ needReturn = false;
+ break;
+ }
+ }
+ if (needReturn) {
+ return;
+ }
+ }
+ if (topic.contains("/custom/mqtt/secret/change")) {
+ /**
+ * 缃戝叧閲嶈繛mqtt 闇�瑕佹洿鎹esKey 涓嶇劧缃戝叧鏃犳硶瑙e瘑 閫氱煡鍒锋柊缃戝叧鍒楄〃骞朵笖鏇存柊涓荤綉鍏崇殑aesKey
+ */
+ String[] topics = topic.split("/");
+ //闈炲綋鍓嶄綇瀹呯綉鍏崇殑鏁版嵁杩斿洖
+ if (topics.length < 3) {
+ return;
+ }
+ LogUtils.d(TAG, "缃戝叧閲嶈繛mqtt绉橀挜鏇存柊閫氱煡->" + topic);
+ EventNotifyRefreshGatewayAesKeyInfo eventNotifyRefreshGatewayAesKeyInfo = new EventNotifyRefreshGatewayAesKeyInfo();
+ eventNotifyRefreshGatewayAesKeyInfo.setGatewayId(topics[2]);
+ EventBus.getDefault().post(eventNotifyRefreshGatewayAesKeyInfo);
+ return;
+ }
+
+ String[] topics = topic.split("/");
+ //闈炲綋鍓嶄綇瀹呯綉鍏崇殑鏁版嵁杩斿洖
+ if (topics.length < 3) {
+ return;
+ }
+ String aes = null;
+ String cloudsGatewayId = topics[2];//浜戠涓奊atewayId
+ GatewayBean gatewayBean = HDLLinkLocalGateway.getInstance().getGatewayByOidOrGatewayId(cloudsGatewayId);
+ if (cloudsGatewayId.equals(HDLLinkConfig.getInstance().getHomeId())) {
+ aes = getHomeAES();
+ } else if (gatewayBean != null && HDLConnectHelper.getGatewayTypeList().contains(gatewayBean.getGatewayType())) {
+ //姣背娉qtt涓撶敤绉橀挜
+ aes = gatewayBean.getAesKey();
+ } else {
+ aes = HDLLinkConfig.getInstance().getAesKey();
+ }
+ if (TextUtils.isEmpty(aes)) {
+ return;
+ }
+
+ byte[] bytes = AesUtil.aesDecrypt(mqttMessage.getPayload(), aes);
+ if (null == bytes) {
+ LogUtils.d(TAG, "\r\n" + "mqtt->杩滅▼鍥炲鏁版嵁 瀵嗛挜瑙e瘑澶辫触");
+ return;
+ }
+ String bodyStr = new String(bytes);
+ LogUtils.d(TAG, "\r\n" + "mqtt->杩滅▼鍥炲鏁版嵁" + bodyStr);
+ /**
+ * 绾㈠瀹濊澶囬�氳繃/thing/topo/found涓婚 涓婃姤绾㈠瀹濊澶囧凡缁忓叆缃戜簡 鐒跺悗鐩存帴return 涓嶉渶瑕佸啀涓嬭浜�
+ */
+ if (topic.endsWith("/thing/topo/found")) {
+ /**
+ * {"id":"0000016E","time_stamp":"366574","objects":[{"sid":"010105D370451908110100000000",
+ * "name":"Mini鏅鸿兘閬ユ帶鍣�","spk":"ir.module","oid":"010105D370451908","omodel":"MIR01R-LK.10",
+ * "online":"true","attributes":[],"status":[],"from":"010105D370451908","src":"010105D370451908"}]}
+ */
+ if (bodyStr.contains("ir.module")) {
+ EventBus.getDefault().post(new EventBindMiniRemoteSuccessInfo());
+ return;
+ }
+ }
+ //Link浠庣綉鍏抽�忎紶鐗规畩澶勭悊
+ if (topic.contains("/native/a/") && topic.contains("/slaveoid/")) {
+ LinkMessageDecoder.getInstance().read(new Packet(bytes));
+ } else {
+ byte[] topBytes = new byte[3];
+ if (bytes.length > 2) {
+ topBytes[0] = bytes[0];
+ topBytes[1] = bytes[1];
+ topBytes[2] = bytes[2];
+ }
+ if (new String(topBytes).equals("hex")) {//link鍘熺敓鏁版嵁锛屽垽鏂槸鍚︽槸鏂囦欢澶勭悊閫氱煡
+ LinkMessageDecoder.getInstance().read(new Packet(bytes));
+ } else {
+ QueueUtils.getInstance().add(new LinkPacket(topic, bytes, true));
+ }
+ }
+ }
+
+ /**
+ * 妫�鏌ヤ富棰樻槸鍚﹀凡璁㈤槄锛屾病鏈夎闃呭氨璁㈤槄
+ *
+ * @param sendTopic 璇锋眰涓婚
+ */
+ public synchronized void checkAndsubscribeAllTopics(String sendTopic) {
+ if (null != sampleClient && sampleClient.isConnected() == false) {
+ return;
+ }
+ try {
+ for (String topic : nativeAndLinkTopic(sendTopic)) {
+ if (lastTopicFilters.contains(topic)) {
+ continue;
+ }
+ LogUtils.d(TAG, "璁㈤槄涓婚" + topic);
+
+ sampleClient.subscribe(topic, 0, null, new IMqttActionListener() {
+ @Override
+ public void onSuccess(IMqttToken asyncActionToken) {
+ if (!lastTopicFilters.contains(topic)) {
+ lastTopicFilters.add(topic);
+ }
+ }
+
+ @Override
+ public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
+
+ }
+ });
+ }
+
+ } catch (MqttException e) {
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * 璁㈤槄骞冲彴涓嬫潵鐨勪富棰�
+ *
+ * @return string鏁扮粍
+ */
+ private String[] nativeAndLinkTopic(String sendTopic) {
+ String[] topicArray = sendTopic.split("/");
+ //闈炲綋鍓嶄綇瀹呯綉鍏崇殑鏁版嵁杩斿洖
+ if (topicArray.length < 3) {
+ return new String[]{
+ String.format("/user/%s/#", HDLLinkConfig.getInstance().getGatewayId()),
+ String.format("/base/%s/#", HDLLinkConfig.getInstance().getGatewayId())};
+ }
+
+ String gatewayId = topicArray[2];//浜戠涓奊atewayId
+ String[] topics = {
+ String.format("/user/%s/#", HDLLinkConfig.getInstance().getGatewayId()),
+ String.format("/base/%s/#", HDLLinkConfig.getInstance().getGatewayId()),
+ String.format("/user/%s/#", gatewayId),
+ String.format("/base/%s/#", gatewayId),
+ };
+ return topics;
+ }
+
+
+ /**
+ * APP璁㈤槄浜戠涓婚瑙e瘑瀵嗘枃鐨勭閽�(杩欎釜瀵嗛挜鍙兘鐢ㄤ簬璁㈤槄浜戠涓婚锛岃闃呯綉鍏充富棰樺彟澶栦竴涓瘑閽�)
+ *
+ * @return -杩斿洖瑙e瘑瀵嗘枃鐨勭閽�
+ */
+ private String getHomeAES() {
+ String homeId = HDLLinkConfig.getInstance().getHomeId();
+ if (TextUtils.isEmpty(homeId)) {
+ return null;
+ }
+ //瑙e瘑瀵嗛挜瑙勫垯锛氬凡鐜版湁鐨勪綇瀹匢D涓哄熀鍑�,浠庡彸杈逛竴涓�鑾峰彇鍊�,鏈�鍚庡鏋滀笉澶�16浣�,鍒欏線鍙宠ˉ闆�
+ StringBuilder aesKey = new StringBuilder();
+ for (int i = homeId.length() - 1; i >= 0; i--) {
+ aesKey.append(homeId.charAt(i));
+ if (aesKey.length() == 16) {
+ break;
+ }
+ }
+ return this.PadRight(aesKey.toString(), 16, "0");
+ }
+
+ /**
+ * 浠庡彸杈规坊鍔犵┖鏍兼垨鍏跺畠瀛楃
+ *
+ * @param currentValueStr 褰撳墠鍊�
+ * @param count 鎬婚暱鏁�
+ * @param others 鍏跺畠瀛楃(鑷畾涔�)
+ * @return 鎬婚暱搴�
+ */
+ private String PadRight(String currentValueStr, int count, String others) {
+ if (count > currentValueStr.length()) {
+ StringBuilder stringBuilder = new StringBuilder();
+ int subLen = count - currentValueStr.length();
+ for (int i = 0; i < subLen; i++) {
+ stringBuilder.append(others);
+ }
+ currentValueStr = currentValueStr + stringBuilder;
+ }
+ return currentValueStr;
+
+ }
+
+ /**
+ * 鍒囨崲浣忓畢鐨勬椂鍊欒闃呰鍏ㄩ儴鍙栨秷
+ */
+ public void removeAllTopic() {
+ if (null != sampleClient && sampleClient.isConnected() == false) {
+ return;
+ }
+ try {
+ if (lastTopicFilters.size() == 0) {
+ return;
+ }
+ LogUtils.d(TAG, "绉婚櫎涓婚:\r\n" + JSON.toJSONString(lastTopicFilters));
+ sampleClient.unsubscribe(lastTopicFilters.toArray(new String[lastTopicFilters.size()]));
+ lastTopicFilters.clear();
+ } catch (MqttException e) {
+ e.printStackTrace();
+ }
+ }
+
+ public void stop() {
+ if (mqttRecvClient != null) {
+ try {
+ if (mqttRecvClient.sampleClient != null) {
+ mqttRecvClient.sampleClient.disconnect();
+ mqttRecvClient.sampleClient.close();
+ mqttRecvClient = null;
+ }
+ } catch (MqttException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+
+}
--
Gitblit v1.8.0