wjc
2023-06-28 14de918a79943e4961b09fa01ed320c6cad41f2e
HDLLinkLocalSdk/src/main/java/com/hdl/sdk/link/core/utils/mqtt/MqttRecvClient.java
New file
@@ -0,0 +1,412 @@
package com.hdl.sdk.link.core.utils.mqtt;
import android.content.Context;
import android.text.TextUtils;
import com.alibaba.fastjson.JSON;
import com.hdl.sdk.link.common.utils.LogUtils;
import com.hdl.sdk.link.core.bean.LinkPacket;
import com.hdl.sdk.link.core.bean.eventbus.EventBindMiniRemoteSuccessInfo;
import com.hdl.sdk.link.core.bean.eventbus.EventNotifyRefreshGatewayAesKeyInfo;
import com.hdl.sdk.link.core.bean.gateway.GatewayBean;
import com.hdl.sdk.link.core.config.HDLLinkConfig;
import com.hdl.sdk.link.core.connect.HDLConnectHelper;
import com.hdl.sdk.link.core.protocol.LinkMessageDecoder;
import com.hdl.sdk.link.core.utils.AesUtil;
import com.hdl.sdk.link.core.utils.QueueUtils;
import com.hdl.sdk.link.gateway.HDLLinkLocalGateway;
import com.hdl.sdk.link.socket.bean.Packet;
import org.eclipse.paho.client.mqttv3.IMqttActionListener;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.greenrobot.eventbus.EventBus;
import java.util.ArrayList;
import java.util.List;
/**
 * Created by Zoro on 2018/8/1.
 * desc:2019/7/24  连接成功以后再订阅   不需要发送心跳  不需要自己处理重连
 */
public class MqttRecvClient {
    private static String mBroker;
    private MemoryPersistence persistence = new MemoryPersistence();
    private MqttAsyncClient sampleClient;
    private MqttConnectOptions connOpts = new MqttConnectOptions();
    private MqttThread mqttThread;
    private static volatile MqttRecvClient mqttRecvClient;
    private final String TAG = "MqttRecvClient";
    private static final String[] ignoreTopics = new String[]{"/thing/topo/found", "/ota/device/progress/up"};
    /**
     * 上次的主题需要记录  更改主题的时候需要取消订阅
     */
    private static List<String> lastTopicFilters = new ArrayList<>();
    private static String mClientId;
    private static String mUserName;
    private static String mPassWord;
    private final int[] qos = {0};
    private MqttRecvClient() {
        if (mqttThread == null) {
            mqttThread = new MqttThread();
        }
        if (TextUtils.isEmpty(mUserName) || TextUtils.isEmpty(mPassWord)) {
            return;
        }
        mqttThread.start();
    }
    public static void init(Context context, String broker1, String deviceId, String userName, String pwd) {
        mClientId = deviceId;
        mBroker = broker1;
        mUserName = userName;
        mPassWord = pwd;
        MqttRecvClient.create();
    }
    public void send(String topic, byte[] bytes) {
        try {
            if (TextUtils.isEmpty(topic)) {
                LogUtils.e("请求主题为null");
                return;
            }
            checkAndsubscribeAllTopics(topic);
            publish(topic, bytes);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
    /**
     * 发布
     *
     * @param topic 主题
     * @param bytes 内容
     * @throws MqttException
     */
    public void publish(String topic, byte[] bytes) throws MqttException {
        //回复时,mqtt主题中的方向要变化,要做方向替换
        mqttRecvClient.sampleClient.publish(topic, bytes, 1, false, null, new IMqttActionListener() {
            @Override
            public void onSuccess(IMqttToken asyncActionToken) {
                LogUtils.d(TAG, topic);
            }
            @Override
            public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
            }
        });
    }
    public static void create() {
        if (mqttRecvClient == null) {
            mqttRecvClient = new MqttRecvClient();
        }
    }
    /**
     * 使用的时候需要判断非空
     */
    public static MqttRecvClient getInstance() {
        if (null == mqttRecvClient) {
            synchronized (MqttRecvClient.class) {
                if (null == mqttRecvClient) {
                    mqttRecvClient = new MqttRecvClient();
                }
                return mqttRecvClient;
            }
        }
        return mqttRecvClient;
    }
    class MqttThread extends Thread {
        @Override
        public void run() {
            super.run();
            connect();
        }
    }
    private void connect() {
        try {
            if (sampleClient != null) {
                sampleClient.close();
            }
            sampleClient = new MqttAsyncClient(mBroker, mClientId, persistence);
            connOpts.setUserName(mUserName);
//            connOpts.setServerURIs(new String[]{mBroker});
            connOpts.setPassword(mPassWord.toCharArray());
            connOpts.setCleanSession(true);
            connOpts.setKeepAliveInterval(10);
            connOpts.setAutomaticReconnect(true);
            connOpts.setConnectionTimeout(10);
            connOpts.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);
            sampleClient.setCallback(new MqttCallbackExtended() {
                public void connectComplete(boolean reconnect, String serverURI) {
                    LogUtils.d(TAG, "connect success");
                    checkAndsubscribeAllTopics("");
                }
                public void connectionLost(Throwable throwable) {
                    LogUtils.d(TAG, "连接断开");
                    lastTopicFilters.clear();
                }
                public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
                    managerMqttMsg(topic, mqttMessage);
                }
                public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                }
            });
            sampleClient.connect(connOpts);
        } catch (Exception me) {
            me.printStackTrace();
        }
    }
    /**
     * 处理接收的mqtt数据
     *
     * @param topic       接收主题
     * @param mqttMessage 接收数据
     * @throws Exception
     */
    public void managerMqttMsg(String topic, MqttMessage mqttMessage) throws Exception {
        LogUtils.d(TAG, "\r\n" + "mqtt->远程回复主题" + topic);
        if (HDLConnectHelper.isLocal()) {
            boolean needReturn = true;
            //如果是本地模式,云端下来的网关数据部分数据不接收(如:ota升级反馈进度)
            for (String ignoreTopic : ignoreTopics) {
                if (topic.endsWith(ignoreTopic)) {
                    needReturn = false;
                    break;
                }
            }
            if (needReturn) {
                return;
            }
        }
        if (topic.contains("/custom/mqtt/secret/change")) {
            /**
             * 网关重连mqtt 需要更换aesKey 不然网关无法解密 通知刷新网关列表并且更新主网关的aesKey
             */
            String[] topics = topic.split("/");
            //非当前住宅网关的数据返回
            if (topics.length < 3) {
                return;
            }
            LogUtils.d(TAG, "网关重连mqtt秘钥更新通知->" + topic);
            EventNotifyRefreshGatewayAesKeyInfo eventNotifyRefreshGatewayAesKeyInfo = new EventNotifyRefreshGatewayAesKeyInfo();
            eventNotifyRefreshGatewayAesKeyInfo.setGatewayId(topics[2]);
            EventBus.getDefault().post(eventNotifyRefreshGatewayAesKeyInfo);
            return;
        }
        String[] topics = topic.split("/");
        //非当前住宅网关的数据返回
        if (topics.length < 3) {
            return;
        }
        String aes = null;
        String cloudsGatewayId = topics[2];//云端上GatewayId
        GatewayBean gatewayBean = HDLLinkLocalGateway.getInstance().getGatewayByOidOrGatewayId(cloudsGatewayId);
        if (cloudsGatewayId.equals(HDLLinkConfig.getInstance().getHomeId())) {
            aes = getHomeAES();
        } else if (gatewayBean != null && HDLConnectHelper.getGatewayTypeList().contains(gatewayBean.getGatewayType())) {
            //毫米波mqtt专用秘钥
            aes = gatewayBean.getAesKey();
        } else {
            aes = HDLLinkConfig.getInstance().getAesKey();
        }
        if (TextUtils.isEmpty(aes)) {
            return;
        }
        byte[] bytes = AesUtil.aesDecrypt(mqttMessage.getPayload(), aes);
        if (null == bytes) {
            LogUtils.d(TAG, "\r\n" + "mqtt->远程回复数据 密钥解密失败");
            return;
        }
        String bodyStr = new String(bytes);
        LogUtils.d(TAG, "\r\n" + "mqtt->远程回复数据" + bodyStr);
        /**
         * 红外宝设备通过/thing/topo/found主题  上报红外宝设备已经入网了  然后直接return  不需要再下行了
         */
        if (topic.endsWith("/thing/topo/found")) {
            /**
             * {"id":"0000016E","time_stamp":"366574","objects":[{"sid":"010105D370451908110100000000",
             * "name":"Mini智能遥控器","spk":"ir.module","oid":"010105D370451908","omodel":"MIR01R-LK.10",
             * "online":"true","attributes":[],"status":[],"from":"010105D370451908","src":"010105D370451908"}]}
             */
            if (bodyStr.contains("ir.module")) {
                EventBus.getDefault().post(new EventBindMiniRemoteSuccessInfo());
                return;
            }
        }
        //Link从网关透传特殊处理
        if (topic.contains("/native/a/") && topic.contains("/slaveoid/")) {
            LinkMessageDecoder.getInstance().read(new Packet(bytes));
        } else {
            byte[] topBytes = new byte[3];
            if (bytes.length > 2) {
                topBytes[0] = bytes[0];
                topBytes[1] = bytes[1];
                topBytes[2] = bytes[2];
            }
            if (new String(topBytes).equals("hex")) {//link原生数据,判断是否是文件处理通知
                LinkMessageDecoder.getInstance().read(new Packet(bytes));
            } else {
                QueueUtils.getInstance().add(new LinkPacket(topic, bytes, true));
            }
        }
    }
    /**
     * 检查主题是否已订阅,没有订阅就订阅
     *
     * @param sendTopic 请求主题
     */
    public synchronized void checkAndsubscribeAllTopics(String sendTopic) {
        if (null != sampleClient && sampleClient.isConnected() == false) {
            return;
        }
        try {
            for (String topic : nativeAndLinkTopic(sendTopic)) {
                if (lastTopicFilters.contains(topic)) {
                    continue;
                }
                LogUtils.d(TAG, "订阅主题" + topic);
                sampleClient.subscribe(topic, 0, null, new IMqttActionListener() {
                    @Override
                    public void onSuccess(IMqttToken asyncActionToken) {
                        if (!lastTopicFilters.contains(topic)) {
                            lastTopicFilters.add(topic);
                        }
                    }
                    @Override
                    public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                    }
                });
            }
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
    /**
     * 订阅平台下来的主题
     *
     * @return string数组
     */
    private String[] nativeAndLinkTopic(String sendTopic) {
        String[] topicArray = sendTopic.split("/");
        //非当前住宅网关的数据返回
        if (topicArray.length < 3) {
            return new String[]{
                    String.format("/user/%s/#", HDLLinkConfig.getInstance().getGatewayId()),
                    String.format("/base/%s/#", HDLLinkConfig.getInstance().getGatewayId())};
        }
        String gatewayId = topicArray[2];//云端上GatewayId
        String[] topics = {
                String.format("/user/%s/#", HDLLinkConfig.getInstance().getGatewayId()),
                String.format("/base/%s/#", HDLLinkConfig.getInstance().getGatewayId()),
                String.format("/user/%s/#", gatewayId),
                String.format("/base/%s/#", gatewayId),
        };
        return topics;
    }
    /**
     * APP订阅云端主题解密密文的秘钥(这个密钥只能用于订阅云端主题,订阅网关主题另外一个密钥)
     *
     * @return -返回解密密文的秘钥
     */
    private String getHomeAES() {
        String homeId = HDLLinkConfig.getInstance().getHomeId();
        if (TextUtils.isEmpty(homeId)) {
            return null;
        }
        //解密密钥规则:已现有的住宅ID为基准,从右边一一获取值,最后如果不够16位,则往右补零
        StringBuilder aesKey = new StringBuilder();
        for (int i = homeId.length() - 1; i >= 0; i--) {
            aesKey.append(homeId.charAt(i));
            if (aesKey.length() == 16) {
                break;
            }
        }
        return this.PadRight(aesKey.toString(), 16, "0");
    }
    /**
     * 从右边添加空格或其它字符
     *
     * @param currentValueStr 当前值
     * @param count           总长数
     * @param others          其它字符(自定义)
     * @return 总长度
     */
    private String PadRight(String currentValueStr, int count, String others) {
        if (count > currentValueStr.length()) {
            StringBuilder stringBuilder = new StringBuilder();
            int subLen = count - currentValueStr.length();
            for (int i = 0; i < subLen; i++) {
                stringBuilder.append(others);
            }
            currentValueStr = currentValueStr + stringBuilder;
        }
        return currentValueStr;
    }
    /**
     * 切换住宅的时候订阅要全部取消
     */
    public void removeAllTopic() {
        if (null != sampleClient && sampleClient.isConnected() == false) {
            return;
        }
        try {
            if (lastTopicFilters.size() == 0) {
                return;
            }
            LogUtils.d(TAG, "移除主题:\r\n" + JSON.toJSONString(lastTopicFilters));
            sampleClient.unsubscribe(lastTopicFilters.toArray(new String[lastTopicFilters.size()]));
            lastTopicFilters.clear();
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
    public void stop() {
        if (mqttRecvClient != null) {
            try {
                if (mqttRecvClient.sampleClient != null) {
                    mqttRecvClient.sampleClient.disconnect();
                    mqttRecvClient.sampleClient.close();
                    mqttRecvClient = null;
                }
            } catch (MqttException e) {
                e.printStackTrace();
            }
        }
    }
}