From 14de918a79943e4961b09fa01ed320c6cad41f2e Mon Sep 17 00:00:00 2001
From: wjc <1243177876@qq.com>
Date: 星期三, 28 六月 2023 17:14:51 +0800
Subject: [PATCH] Revert "Revert "Merge branch 'hxb' into wjc""

---
 HDLLinkLocalSdk/src/main/java/com/hdl/sdk/link/core/utils/mqtt/MqttRecvClient.java |  412 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 files changed, 412 insertions(+), 0 deletions(-)

diff --git a/HDLLinkLocalSdk/src/main/java/com/hdl/sdk/link/core/utils/mqtt/MqttRecvClient.java b/HDLLinkLocalSdk/src/main/java/com/hdl/sdk/link/core/utils/mqtt/MqttRecvClient.java
new file mode 100644
index 0000000..6e8ef74
--- /dev/null
+++ b/HDLLinkLocalSdk/src/main/java/com/hdl/sdk/link/core/utils/mqtt/MqttRecvClient.java
@@ -0,0 +1,412 @@
+package com.hdl.sdk.link.core.utils.mqtt;
+
+import android.content.Context;
+import android.text.TextUtils;
+
+import com.alibaba.fastjson.JSON;
+
+import com.hdl.sdk.link.common.utils.LogUtils;
+import com.hdl.sdk.link.core.bean.LinkPacket;
+import com.hdl.sdk.link.core.bean.eventbus.EventBindMiniRemoteSuccessInfo;
+import com.hdl.sdk.link.core.bean.eventbus.EventNotifyRefreshGatewayAesKeyInfo;
+import com.hdl.sdk.link.core.bean.gateway.GatewayBean;
+import com.hdl.sdk.link.core.config.HDLLinkConfig;
+import com.hdl.sdk.link.core.connect.HDLConnectHelper;
+import com.hdl.sdk.link.core.protocol.LinkMessageDecoder;
+import com.hdl.sdk.link.core.utils.AesUtil;
+import com.hdl.sdk.link.core.utils.QueueUtils;
+import com.hdl.sdk.link.gateway.HDLLinkLocalGateway;
+import com.hdl.sdk.link.socket.bean.Packet;
+
+import org.eclipse.paho.client.mqttv3.IMqttActionListener;
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+import org.eclipse.paho.client.mqttv3.IMqttToken;
+import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
+import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+import org.greenrobot.eventbus.EventBus;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Created by Zoro on 2018/8/1.
+ * desc:2019/7/24  杩炴帴鎴愬姛浠ュ悗鍐嶈闃�   涓嶉渶瑕佸彂閫佸績璺�  涓嶉渶瑕佽嚜宸卞鐞嗛噸杩�
+ */
+
+public class MqttRecvClient {
+    private static String mBroker;
+    private MemoryPersistence persistence = new MemoryPersistence();
+    private MqttAsyncClient sampleClient;
+    private MqttConnectOptions connOpts = new MqttConnectOptions();
+    private MqttThread mqttThread;
+    private static volatile MqttRecvClient mqttRecvClient;
+    private final String TAG = "MqttRecvClient";
+    private static final String[] ignoreTopics = new String[]{"/thing/topo/found", "/ota/device/progress/up"};
+    /**
+     * 涓婃鐨勪富棰橀渶瑕佽褰�  鏇存敼涓婚鐨勬椂鍊欓渶瑕佸彇娑堣闃�
+     */
+    private static List<String> lastTopicFilters = new ArrayList<>();
+    private static String mClientId;
+    private static String mUserName;
+    private static String mPassWord;
+    private final int[] qos = {0};
+
+    private MqttRecvClient() {
+        if (mqttThread == null) {
+            mqttThread = new MqttThread();
+        }
+        if (TextUtils.isEmpty(mUserName) || TextUtils.isEmpty(mPassWord)) {
+            return;
+        }
+        mqttThread.start();
+    }
+
+    public static void init(Context context, String broker1, String deviceId, String userName, String pwd) {
+        mClientId = deviceId;
+        mBroker = broker1;
+        mUserName = userName;
+        mPassWord = pwd;
+        MqttRecvClient.create();
+    }
+
+    public void send(String topic, byte[] bytes) {
+        try {
+            if (TextUtils.isEmpty(topic)) {
+                LogUtils.e("璇锋眰涓婚涓簄ull");
+                return;
+            }
+            checkAndsubscribeAllTopics(topic);
+            publish(topic, bytes);
+        } catch (MqttException e) {
+            e.printStackTrace();
+        }
+    }
+
+    /**
+     * 鍙戝竷
+     *
+     * @param topic 涓婚
+     * @param bytes 鍐呭
+     * @throws MqttException
+     */
+    public void publish(String topic, byte[] bytes) throws MqttException {
+        //鍥炲鏃讹紝mqtt涓婚涓殑鏂瑰悜瑕佸彉鍖栵紝瑕佸仛鏂瑰悜鏇挎崲
+        mqttRecvClient.sampleClient.publish(topic, bytes, 1, false, null, new IMqttActionListener() {
+            @Override
+            public void onSuccess(IMqttToken asyncActionToken) {
+                LogUtils.d(TAG, topic);
+            }
+
+            @Override
+            public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
+            }
+        });
+    }
+
+    public static void create() {
+        if (mqttRecvClient == null) {
+            mqttRecvClient = new MqttRecvClient();
+        }
+    }
+
+    /**
+     * 浣跨敤鐨勬椂鍊欓渶瑕佸垽鏂潪绌�
+     */
+    public static MqttRecvClient getInstance() {
+        if (null == mqttRecvClient) {
+            synchronized (MqttRecvClient.class) {
+                if (null == mqttRecvClient) {
+                    mqttRecvClient = new MqttRecvClient();
+                }
+                return mqttRecvClient;
+            }
+        }
+        return mqttRecvClient;
+    }
+
+    class MqttThread extends Thread {
+        @Override
+        public void run() {
+            super.run();
+            connect();
+        }
+    }
+
+    private void connect() {
+        try {
+            if (sampleClient != null) {
+                sampleClient.close();
+            }
+            sampleClient = new MqttAsyncClient(mBroker, mClientId, persistence);
+            connOpts.setUserName(mUserName);
+//            connOpts.setServerURIs(new String[]{mBroker});
+            connOpts.setPassword(mPassWord.toCharArray());
+            connOpts.setCleanSession(true);
+            connOpts.setKeepAliveInterval(10);
+            connOpts.setAutomaticReconnect(true);
+            connOpts.setConnectionTimeout(10);
+            connOpts.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);
+            sampleClient.setCallback(new MqttCallbackExtended() {
+                public void connectComplete(boolean reconnect, String serverURI) {
+                    LogUtils.d(TAG, "connect success");
+                    checkAndsubscribeAllTopics("");
+                }
+
+                public void connectionLost(Throwable throwable) {
+                    LogUtils.d(TAG, "杩炴帴鏂紑");
+                    lastTopicFilters.clear();
+                }
+
+                public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
+                    managerMqttMsg(topic, mqttMessage);
+                }
+
+                public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
+                }
+            });
+            sampleClient.connect(connOpts);
+
+        } catch (Exception me) {
+            me.printStackTrace();
+        }
+    }
+
+    /**
+     * 澶勭悊鎺ユ敹鐨刴qtt鏁版嵁
+     *
+     * @param topic       鎺ユ敹涓婚
+     * @param mqttMessage 鎺ユ敹鏁版嵁
+     * @throws Exception
+     */
+    public void managerMqttMsg(String topic, MqttMessage mqttMessage) throws Exception {
+        LogUtils.d(TAG, "\r\n" + "mqtt->杩滅▼鍥炲涓婚" + topic);
+        if (HDLConnectHelper.isLocal()) {
+            boolean needReturn = true;
+            //濡傛灉鏄湰鍦版ā寮忥紝浜戠涓嬫潵鐨勭綉鍏虫暟鎹儴鍒嗘暟鎹笉鎺ユ敹(濡傦細ota鍗囩骇鍙嶉杩涘害)
+            for (String ignoreTopic : ignoreTopics) {
+                if (topic.endsWith(ignoreTopic)) {
+                    needReturn = false;
+                    break;
+                }
+            }
+            if (needReturn) {
+                return;
+            }
+        }
+        if (topic.contains("/custom/mqtt/secret/change")) {
+            /**
+             * 缃戝叧閲嶈繛mqtt 闇�瑕佹洿鎹esKey 涓嶇劧缃戝叧鏃犳硶瑙e瘑 閫氱煡鍒锋柊缃戝叧鍒楄〃骞朵笖鏇存柊涓荤綉鍏崇殑aesKey
+             */
+            String[] topics = topic.split("/");
+            //闈炲綋鍓嶄綇瀹呯綉鍏崇殑鏁版嵁杩斿洖
+            if (topics.length < 3) {
+                return;
+            }
+            LogUtils.d(TAG, "缃戝叧閲嶈繛mqtt绉橀挜鏇存柊閫氱煡->" + topic);
+            EventNotifyRefreshGatewayAesKeyInfo eventNotifyRefreshGatewayAesKeyInfo = new EventNotifyRefreshGatewayAesKeyInfo();
+            eventNotifyRefreshGatewayAesKeyInfo.setGatewayId(topics[2]);
+            EventBus.getDefault().post(eventNotifyRefreshGatewayAesKeyInfo);
+            return;
+        }
+
+        String[] topics = topic.split("/");
+        //闈炲綋鍓嶄綇瀹呯綉鍏崇殑鏁版嵁杩斿洖
+        if (topics.length < 3) {
+            return;
+        }
+        String aes = null;
+        String cloudsGatewayId = topics[2];//浜戠涓奊atewayId
+        GatewayBean gatewayBean = HDLLinkLocalGateway.getInstance().getGatewayByOidOrGatewayId(cloudsGatewayId);
+        if (cloudsGatewayId.equals(HDLLinkConfig.getInstance().getHomeId())) {
+            aes = getHomeAES();
+        } else if (gatewayBean != null && HDLConnectHelper.getGatewayTypeList().contains(gatewayBean.getGatewayType())) {
+            //姣背娉qtt涓撶敤绉橀挜
+            aes = gatewayBean.getAesKey();
+        } else {
+            aes = HDLLinkConfig.getInstance().getAesKey();
+        }
+        if (TextUtils.isEmpty(aes)) {
+            return;
+        }
+
+        byte[] bytes = AesUtil.aesDecrypt(mqttMessage.getPayload(), aes);
+        if (null == bytes) {
+            LogUtils.d(TAG, "\r\n" + "mqtt->杩滅▼鍥炲鏁版嵁 瀵嗛挜瑙e瘑澶辫触");
+            return;
+        }
+        String bodyStr = new String(bytes);
+        LogUtils.d(TAG, "\r\n" + "mqtt->杩滅▼鍥炲鏁版嵁" + bodyStr);
+        /**
+         * 绾㈠瀹濊澶囬�氳繃/thing/topo/found涓婚  涓婃姤绾㈠瀹濊澶囧凡缁忓叆缃戜簡  鐒跺悗鐩存帴return  涓嶉渶瑕佸啀涓嬭浜�
+         */
+        if (topic.endsWith("/thing/topo/found")) {
+            /**
+             * {"id":"0000016E","time_stamp":"366574","objects":[{"sid":"010105D370451908110100000000",
+             * "name":"Mini鏅鸿兘閬ユ帶鍣�","spk":"ir.module","oid":"010105D370451908","omodel":"MIR01R-LK.10",
+             * "online":"true","attributes":[],"status":[],"from":"010105D370451908","src":"010105D370451908"}]}
+             */
+            if (bodyStr.contains("ir.module")) {
+                EventBus.getDefault().post(new EventBindMiniRemoteSuccessInfo());
+                return;
+            }
+        }
+        //Link浠庣綉鍏抽�忎紶鐗规畩澶勭悊
+        if (topic.contains("/native/a/") && topic.contains("/slaveoid/")) {
+            LinkMessageDecoder.getInstance().read(new Packet(bytes));
+        } else {
+            byte[] topBytes = new byte[3];
+            if (bytes.length > 2) {
+                topBytes[0] = bytes[0];
+                topBytes[1] = bytes[1];
+                topBytes[2] = bytes[2];
+            }
+            if (new String(topBytes).equals("hex")) {//link鍘熺敓鏁版嵁锛屽垽鏂槸鍚︽槸鏂囦欢澶勭悊閫氱煡
+                LinkMessageDecoder.getInstance().read(new Packet(bytes));
+            } else {
+                QueueUtils.getInstance().add(new LinkPacket(topic, bytes, true));
+            }
+        }
+    }
+
+    /**
+     * 妫�鏌ヤ富棰樻槸鍚﹀凡璁㈤槄锛屾病鏈夎闃呭氨璁㈤槄
+     *
+     * @param sendTopic 璇锋眰涓婚
+     */
+    public synchronized void checkAndsubscribeAllTopics(String sendTopic) {
+        if (null != sampleClient && sampleClient.isConnected() == false) {
+            return;
+        }
+        try {
+            for (String topic : nativeAndLinkTopic(sendTopic)) {
+                if (lastTopicFilters.contains(topic)) {
+                    continue;
+                }
+                LogUtils.d(TAG, "璁㈤槄涓婚" + topic);
+
+                sampleClient.subscribe(topic, 0, null, new IMqttActionListener() {
+                    @Override
+                    public void onSuccess(IMqttToken asyncActionToken) {
+                        if (!lastTopicFilters.contains(topic)) {
+                            lastTopicFilters.add(topic);
+                        }
+                    }
+
+                    @Override
+                    public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
+
+                    }
+                });
+            }
+
+        } catch (MqttException e) {
+            e.printStackTrace();
+        }
+    }
+
+    /**
+     * 璁㈤槄骞冲彴涓嬫潵鐨勪富棰�
+     *
+     * @return string鏁扮粍
+     */
+    private String[] nativeAndLinkTopic(String sendTopic) {
+        String[] topicArray = sendTopic.split("/");
+        //闈炲綋鍓嶄綇瀹呯綉鍏崇殑鏁版嵁杩斿洖
+        if (topicArray.length < 3) {
+            return new String[]{
+                    String.format("/user/%s/#", HDLLinkConfig.getInstance().getGatewayId()),
+                    String.format("/base/%s/#", HDLLinkConfig.getInstance().getGatewayId())};
+        }
+
+        String gatewayId = topicArray[2];//浜戠涓奊atewayId
+        String[] topics = {
+                String.format("/user/%s/#", HDLLinkConfig.getInstance().getGatewayId()),
+                String.format("/base/%s/#", HDLLinkConfig.getInstance().getGatewayId()),
+                String.format("/user/%s/#", gatewayId),
+                String.format("/base/%s/#", gatewayId),
+        };
+        return topics;
+    }
+
+
+    /**
+     * APP璁㈤槄浜戠涓婚瑙e瘑瀵嗘枃鐨勭閽�(杩欎釜瀵嗛挜鍙兘鐢ㄤ簬璁㈤槄浜戠涓婚锛岃闃呯綉鍏充富棰樺彟澶栦竴涓瘑閽�)
+     *
+     * @return -杩斿洖瑙e瘑瀵嗘枃鐨勭閽�
+     */
+    private String getHomeAES() {
+        String homeId = HDLLinkConfig.getInstance().getHomeId();
+        if (TextUtils.isEmpty(homeId)) {
+            return null;
+        }
+        //瑙e瘑瀵嗛挜瑙勫垯锛氬凡鐜版湁鐨勪綇瀹匢D涓哄熀鍑�,浠庡彸杈逛竴涓�鑾峰彇鍊�,鏈�鍚庡鏋滀笉澶�16浣�,鍒欏線鍙宠ˉ闆�
+        StringBuilder aesKey = new StringBuilder();
+        for (int i = homeId.length() - 1; i >= 0; i--) {
+            aesKey.append(homeId.charAt(i));
+            if (aesKey.length() == 16) {
+                break;
+            }
+        }
+        return this.PadRight(aesKey.toString(), 16, "0");
+    }
+
+    /**
+     * 浠庡彸杈规坊鍔犵┖鏍兼垨鍏跺畠瀛楃
+     *
+     * @param currentValueStr 褰撳墠鍊�
+     * @param count           鎬婚暱鏁�
+     * @param others          鍏跺畠瀛楃(鑷畾涔�)
+     * @return 鎬婚暱搴�
+     */
+    private String PadRight(String currentValueStr, int count, String others) {
+        if (count > currentValueStr.length()) {
+            StringBuilder stringBuilder = new StringBuilder();
+            int subLen = count - currentValueStr.length();
+            for (int i = 0; i < subLen; i++) {
+                stringBuilder.append(others);
+            }
+            currentValueStr = currentValueStr + stringBuilder;
+        }
+        return currentValueStr;
+
+    }
+
+    /**
+     * 鍒囨崲浣忓畢鐨勬椂鍊欒闃呰鍏ㄩ儴鍙栨秷
+     */
+    public void removeAllTopic() {
+        if (null != sampleClient && sampleClient.isConnected() == false) {
+            return;
+        }
+        try {
+            if (lastTopicFilters.size() == 0) {
+                return;
+            }
+            LogUtils.d(TAG, "绉婚櫎涓婚:\r\n" + JSON.toJSONString(lastTopicFilters));
+            sampleClient.unsubscribe(lastTopicFilters.toArray(new String[lastTopicFilters.size()]));
+            lastTopicFilters.clear();
+        } catch (MqttException e) {
+            e.printStackTrace();
+        }
+    }
+
+    public void stop() {
+        if (mqttRecvClient != null) {
+            try {
+                if (mqttRecvClient.sampleClient != null) {
+                    mqttRecvClient.sampleClient.disconnect();
+                    mqttRecvClient.sampleClient.close();
+                    mqttRecvClient = null;
+                }
+            } catch (MqttException e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+
+}

--
Gitblit v1.8.0