package com.hdl.sdk.link.core.utils.mqtt;  
 | 
  
 | 
import android.content.Context;  
 | 
import android.text.TextUtils;  
 | 
  
 | 
import com.alibaba.fastjson.JSON;  
 | 
  
 | 
import com.hdl.sdk.link.common.utils.LogUtils;  
 | 
import com.hdl.sdk.link.core.bean.LinkPacket;  
 | 
import com.hdl.sdk.link.core.bean.eventbus.BaseEventBus;  
 | 
import com.hdl.sdk.link.core.bean.eventbus.EventBindMiniRemoteSuccessInfo;  
 | 
import com.hdl.sdk.link.core.bean.eventbus.EventNotifyRefreshGatewayAesKeyInfo;  
 | 
import com.hdl.sdk.link.core.bean.gateway.GatewayBean;  
 | 
import com.hdl.sdk.link.core.config.HDLLinkConfig;  
 | 
import com.hdl.sdk.link.core.connect.HDLConnectHelper;  
 | 
import com.hdl.sdk.link.core.protocol.LinkMessageDecoder;  
 | 
import com.hdl.sdk.link.core.utils.AesUtil;  
 | 
import com.hdl.sdk.link.core.utils.QueueUtils;  
 | 
import com.hdl.sdk.link.gateway.HDLLinkLocalGateway;  
 | 
import com.hdl.sdk.link.socket.bean.Packet;  
 | 
  
 | 
import org.eclipse.paho.client.mqttv3.IMqttActionListener;  
 | 
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;  
 | 
import org.eclipse.paho.client.mqttv3.IMqttToken;  
 | 
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;  
 | 
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;  
 | 
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;  
 | 
import org.eclipse.paho.client.mqttv3.MqttException;  
 | 
import org.eclipse.paho.client.mqttv3.MqttMessage;  
 | 
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;  
 | 
import org.greenrobot.eventbus.EventBus;  
 | 
  
 | 
import java.util.ArrayList;  
 | 
import java.util.Arrays;  
 | 
import java.util.List;  
 | 
  
 | 
/**  
 | 
 * Created by Zoro on 2018/8/1.  
 | 
 * desc:2019/7/24  连接成功以后再订阅   不需要发送心跳  不需要自己处理重连  
 | 
 */  
 | 
  
 | 
public class MqttRecvClient {  
 | 
    private static String mBroker;  
 | 
    private MemoryPersistence persistence = new MemoryPersistence();  
 | 
    private MqttAsyncClient sampleClient;  
 | 
    private MqttConnectOptions connOpts = new MqttConnectOptions();  
 | 
    private MqttThread mqttThread;  
 | 
    private static volatile MqttRecvClient mqttRecvClient;  
 | 
    private final String TAG = "MqttRecvClient";  
 | 
    private static final String[] ignoreTopics = new String[]{"/thing/topo/found", "/ota/device/progress/up"};  
 | 
    /**  
 | 
     * 上次的主题需要记录  更改主题的时候需要取消订阅  
 | 
     */  
 | 
    private static List<String> lastTopicFilters = new ArrayList<>();  
 | 
    private static String mClientId;  
 | 
    private static String mUserName;  
 | 
    private static String mPassWord;  
 | 
    private final int[] qos = {0};  
 | 
  
 | 
    private MqttRecvClient() {  
 | 
        if (mqttThread == null) {  
 | 
            mqttThread = new MqttThread();  
 | 
        }  
 | 
        if (TextUtils.isEmpty(mUserName) || TextUtils.isEmpty(mPassWord)) {  
 | 
            return;  
 | 
        }  
 | 
        mqttThread.start();  
 | 
    }  
 | 
  
 | 
    public static void init(Context context, String broker1, String deviceId, String userName, String pwd) {  
 | 
        mClientId = deviceId;  
 | 
        mBroker = broker1;  
 | 
        mUserName = userName;  
 | 
        mPassWord = pwd;  
 | 
        MqttRecvClient.create();  
 | 
    }  
 | 
  
 | 
    public void send(String topic, byte[] bytes) {  
 | 
        try {  
 | 
            if (TextUtils.isEmpty(topic)) {  
 | 
                LogUtils.e("请求主题为null");  
 | 
                return;  
 | 
            }  
 | 
            checkAndsubscribeAllTopics(topic);  
 | 
            publish(topic, bytes);  
 | 
        } catch (MqttException e) {  
 | 
            e.printStackTrace();  
 | 
        }  
 | 
    }  
 | 
  
 | 
    /**  
 | 
     * 发布  
 | 
     *  
 | 
     * @param topic 主题  
 | 
     * @param bytes 内容  
 | 
     * @throws MqttException  
 | 
     */  
 | 
    public void publish(String topic, byte[] bytes) throws MqttException {  
 | 
        //回复时,mqtt主题中的方向要变化,要做方向替换  
 | 
        mqttRecvClient.sampleClient.publish(topic, bytes, 1, false, null, new IMqttActionListener() {  
 | 
            @Override  
 | 
            public void onSuccess(IMqttToken asyncActionToken) {  
 | 
                LogUtils.d(TAG, topic);  
 | 
            }  
 | 
  
 | 
            @Override  
 | 
            public void onFailure(IMqttToken asyncActionToken, Throwable exception) {  
 | 
            }  
 | 
        });  
 | 
    }  
 | 
  
 | 
    public static void create() {  
 | 
        if (mqttRecvClient == null) {  
 | 
            mqttRecvClient = new MqttRecvClient();  
 | 
        }  
 | 
    }  
 | 
  
 | 
    /**  
 | 
     * 使用的时候需要判断非空  
 | 
     */  
 | 
    public static MqttRecvClient getInstance() {  
 | 
        if (null == mqttRecvClient) {  
 | 
            synchronized (MqttRecvClient.class) {  
 | 
                if (null == mqttRecvClient) {  
 | 
                    mqttRecvClient = new MqttRecvClient();  
 | 
                }  
 | 
                return mqttRecvClient;  
 | 
            }  
 | 
        }  
 | 
        return mqttRecvClient;  
 | 
    }  
 | 
  
 | 
    class MqttThread extends Thread {  
 | 
        @Override  
 | 
        public void run() {  
 | 
            super.run();  
 | 
            connect();  
 | 
        }  
 | 
    }  
 | 
  
 | 
    private void connect() {  
 | 
        try {  
 | 
            if (sampleClient != null) {  
 | 
                sampleClient.close();  
 | 
            }  
 | 
            sampleClient = new MqttAsyncClient(mBroker, mClientId, persistence);  
 | 
            connOpts.setUserName(mUserName);  
 | 
//            connOpts.setServerURIs(new String[]{mBroker});  
 | 
            connOpts.setPassword(mPassWord.toCharArray());  
 | 
            connOpts.setCleanSession(true);  
 | 
            connOpts.setKeepAliveInterval(10);  
 | 
            connOpts.setAutomaticReconnect(true);  
 | 
            connOpts.setConnectionTimeout(10);  
 | 
            connOpts.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);  
 | 
            sampleClient.setCallback(new MqttCallbackExtended() {  
 | 
                public void connectComplete(boolean reconnect, String serverURI) {  
 | 
                    LogUtils.d(TAG, "mqtt连接成功");  
 | 
                    checkAndsubscribeAllTopics("");  
 | 
                }  
 | 
  
 | 
                public void connectionLost(Throwable throwable) {  
 | 
                    LogUtils.d(TAG, "mqtt连接断开");  
 | 
                    lastTopicFilters.clear();  
 | 
                }  
 | 
  
 | 
                public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {  
 | 
                    managerMqttMsg(topic, mqttMessage);  
 | 
                }  
 | 
  
 | 
                public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {  
 | 
                }  
 | 
            });  
 | 
            sampleClient.connect(connOpts);  
 | 
  
 | 
        } catch (Exception me) {  
 | 
            me.printStackTrace();  
 | 
        }  
 | 
    }  
 | 
  
 | 
    /**  
 | 
     * 处理接收的mqtt数据  
 | 
     *  
 | 
     * @param topic       接收主题  
 | 
     * @param mqttMessage 接收数据  
 | 
     * @throws Exception  
 | 
     */  
 | 
    public void managerMqttMsg(String topic, MqttMessage mqttMessage) throws Exception {  
 | 
        LogUtils.d(TAG, "\r\n" + "mqtt->远程回复主题" + topic);  
 | 
        if (HDLConnectHelper.isLocal()) {  
 | 
            boolean needReturn = true;  
 | 
            //如果是本地模式,云端下来的网关数据部分数据不接收(如:ota升级反馈进度)  
 | 
            for (String ignoreTopic : ignoreTopics) {  
 | 
                if (topic.endsWith(ignoreTopic)) {  
 | 
                    needReturn = false;  
 | 
                    break;  
 | 
                }  
 | 
            }  
 | 
            if (needReturn) {  
 | 
                return;  
 | 
            }  
 | 
        }  
 | 
        if (topic.contains("/custom/mqtt/secret/change")) {  
 | 
            /**  
 | 
             * 网关重连mqtt 需要更换aesKey 不然网关无法解密 通知刷新网关列表并且更新主网关的aesKey  
 | 
             */  
 | 
            String[] topics = topic.split("/");  
 | 
            //非当前住宅网关的数据返回  
 | 
            if (topics.length < 3) {  
 | 
                return;  
 | 
            }  
 | 
            LogUtils.d(TAG, "网关重连mqtt秘钥更新通知->" + topic);  
 | 
            BaseEventBus baseEventBus=new BaseEventBus();  
 | 
            baseEventBus.setTopic(topic);  
 | 
            EventBus.getDefault().post(baseEventBus);  
 | 
  
 | 
//            EventNotifyRefreshGatewayAesKeyInfo eventNotifyRefreshGatewayAesKeyInfo = new EventNotifyRefreshGatewayAesKeyInfo();  
 | 
//            eventNotifyRefreshGatewayAesKeyInfo.setGatewayId(topics[2]);  
 | 
//            EventBus.getDefault().post(eventNotifyRefreshGatewayAesKeyInfo);  
 | 
            return;  
 | 
        }  
 | 
  
 | 
        String[] topics = topic.split("/");  
 | 
        //非当前住宅网关的数据返回  
 | 
        if (topics.length < 3) {  
 | 
            return;  
 | 
        }  
 | 
        String aes = null;  
 | 
        String cloudsGatewayId = topics[2];//云端上GatewayId  
 | 
        GatewayBean gatewayBean = HDLLinkLocalGateway.getInstance().getGatewayByOidOrGatewayId(cloudsGatewayId);  
 | 
        if (cloudsGatewayId.equals(HDLLinkConfig.getInstance().getHomeId())) {  
 | 
            aes = getHomeAES();  
 | 
        } else if (gatewayBean != null && HDLConnectHelper.getGatewayTypeList().contains(gatewayBean.getGatewayType())) {  
 | 
            //逆变器mqtt专用秘钥  
 | 
            aes = gatewayBean.getAesKey();  
 | 
        } else {  
 | 
            aes = HDLLinkConfig.getInstance().getAesKey();  
 | 
        }  
 | 
        if (TextUtils.isEmpty(aes)) {  
 | 
            return;  
 | 
        }  
 | 
  
 | 
        byte[] bytes = AesUtil.aesDecrypt(mqttMessage.getPayload(), aes);  
 | 
        if (null == bytes) {  
 | 
            LogUtils.d(TAG, "\r\n" + "mqtt->远程回复数据 密钥解密失败");  
 | 
            return;  
 | 
        }  
 | 
        String bodyStr = new String(bytes);  
 | 
        if (HDLConnectHelper.isInverterTopic(topic)) {  
 | 
            LogUtils.d(TAG, "\r\n" + "mqtt->远程回复数据" + Arrays.toString(HDLConnectHelper.byteArrayConvertIntArray(bytes)));  
 | 
        } else {  
 | 
            LogUtils.d(TAG, "\r\n" + "mqtt->远程回复数据" + bodyStr);  
 | 
        }  
 | 
        /**  
 | 
         * 红外宝设备通过/thing/topo/found主题  上报红外宝设备已经入网了  然后直接return  不需要再下行了  
 | 
         */  
 | 
        if (topic.endsWith("/thing/topo/found")) {  
 | 
            /**  
 | 
             * {"id":"0000016E","time_stamp":"366574","objects":[{"sid":"010105D370451908110100000000",  
 | 
             * "name":"Mini智能遥控器","spk":"ir.module","oid":"010105D370451908","omodel":"MIR01R-LK.10",  
 | 
             * "online":"true","attributes":[],"status":[],"from":"010105D370451908","src":"010105D370451908"}]}  
 | 
             */  
 | 
            if (bodyStr.contains("ir.module")) {  
 | 
                EventBus.getDefault().post(new EventBindMiniRemoteSuccessInfo());  
 | 
                return;  
 | 
            }  
 | 
        }  
 | 
        //Link从网关透传特殊处理  
 | 
        if (topic.contains("/native/a/") && topic.contains("/slaveoid/")) {  
 | 
            LinkMessageDecoder.getInstance().read(new Packet(bytes));  
 | 
        } else {  
 | 
            byte[] topBytes = new byte[3];  
 | 
            if (bytes.length > 2) {  
 | 
                topBytes[0] = bytes[0];  
 | 
                topBytes[1] = bytes[1];  
 | 
                topBytes[2] = bytes[2];  
 | 
            }  
 | 
            if (new String(topBytes).equals("hex")) {//link原生数据,判断是否是文件处理通知  
 | 
                LinkMessageDecoder.getInstance().read(new Packet(bytes));  
 | 
            } else {  
 | 
                QueueUtils.getInstance().add(new LinkPacket(topic, bytes, true));  
 | 
            }  
 | 
        }  
 | 
    }  
 | 
  
 | 
    /**  
 | 
     * 检查主题是否已订阅,没有订阅就订阅  
 | 
     *  
 | 
     * @param sendTopic 请求主题  
 | 
     */  
 | 
    public synchronized void checkAndsubscribeAllTopics(String sendTopic) {  
 | 
        if (null != sampleClient && !sampleClient.isConnected()) {  
 | 
            return;  
 | 
        }  
 | 
        try {  
 | 
            for (String topic : nativeAndLinkTopic(sendTopic)) {  
 | 
                if (lastTopicFilters.contains(topic)) {  
 | 
                    continue;  
 | 
                }  
 | 
                LogUtils.d(TAG, "订阅主题" + topic);  
 | 
  
 | 
                sampleClient.subscribe(topic, 0, null, new IMqttActionListener() {  
 | 
                    @Override  
 | 
                    public void onSuccess(IMqttToken asyncActionToken) {  
 | 
                        if (!lastTopicFilters.contains(topic)) {  
 | 
                            lastTopicFilters.add(topic);  
 | 
                        }  
 | 
                    }  
 | 
  
 | 
                    @Override  
 | 
                    public void onFailure(IMqttToken asyncActionToken, Throwable exception) {  
 | 
  
 | 
                    }  
 | 
                });  
 | 
            }  
 | 
  
 | 
        } catch (MqttException e) {  
 | 
            e.printStackTrace();  
 | 
        }  
 | 
    }  
 | 
  
 | 
    /**  
 | 
     * 订阅平台下来的主题  
 | 
     *  
 | 
     * @return string数组  
 | 
     */  
 | 
    private String[] nativeAndLinkTopic(String sendTopic) {  
 | 
        String[] topicArray = sendTopic.split("/");  
 | 
        //非当前住宅网关的数据返回  
 | 
        if (topicArray.length < 3) {  
 | 
            return new String[]{  
 | 
                    String.format("/user/%s/#", HDLLinkConfig.getInstance().getGatewayId()),  
 | 
                    String.format("/base/%s/#", HDLLinkConfig.getInstance().getGatewayId())};  
 | 
        }  
 | 
  
 | 
        String gatewayId = topicArray[2];//云端上GatewayId  
 | 
        String[] topics = {  
 | 
                String.format("/user/%s/#", HDLLinkConfig.getInstance().getGatewayId()),  
 | 
                String.format("/base/%s/#", HDLLinkConfig.getInstance().getGatewayId()),  
 | 
                String.format("/user/%s/#", gatewayId),  
 | 
                String.format("/base/%s/#", gatewayId),  
 | 
        };  
 | 
        return topics;  
 | 
    }  
 | 
  
 | 
  
 | 
    /**  
 | 
     * APP订阅云端主题解密密文的秘钥(这个密钥只能用于订阅云端主题,订阅网关主题另外一个密钥)  
 | 
     *  
 | 
     * @return -返回解密密文的秘钥  
 | 
     */  
 | 
    private String getHomeAES() {  
 | 
        String homeId = HDLLinkConfig.getInstance().getHomeId();  
 | 
        if (TextUtils.isEmpty(homeId)) {  
 | 
            return null;  
 | 
        }  
 | 
        //解密密钥规则:已现有的住宅ID为基准,从右边一一获取值,最后如果不够16位,则往右补零  
 | 
        StringBuilder aesKey = new StringBuilder();  
 | 
        for (int i = homeId.length() - 1; i >= 0; i--) {  
 | 
            aesKey.append(homeId.charAt(i));  
 | 
            if (aesKey.length() == 16) {  
 | 
                break;  
 | 
            }  
 | 
        }  
 | 
        return this.PadRight(aesKey.toString(), 16, "0");  
 | 
    }  
 | 
  
 | 
    /**  
 | 
     * 从右边添加空格或其它字符  
 | 
     *  
 | 
     * @param currentValueStr 当前值  
 | 
     * @param count           总长数  
 | 
     * @param others          其它字符(自定义)  
 | 
     * @return 总长度  
 | 
     */  
 | 
    private String PadRight(String currentValueStr, int count, String others) {  
 | 
        if (count > currentValueStr.length()) {  
 | 
            StringBuilder stringBuilder = new StringBuilder();  
 | 
            int subLen = count - currentValueStr.length();  
 | 
            for (int i = 0; i < subLen; i++) {  
 | 
                stringBuilder.append(others);  
 | 
            }  
 | 
            currentValueStr = currentValueStr + stringBuilder;  
 | 
        }  
 | 
        return currentValueStr;  
 | 
  
 | 
    }  
 | 
  
 | 
    /**  
 | 
     * 切换住宅的时候订阅要全部取消  
 | 
     */  
 | 
    public void removeAllTopic() {  
 | 
        if (null != sampleClient && sampleClient.isConnected() == false) {  
 | 
            return;  
 | 
        }  
 | 
        try {  
 | 
            if (lastTopicFilters.size() == 0) {  
 | 
                return;  
 | 
            }  
 | 
            LogUtils.d(TAG, "移除主题:\r\n" + JSON.toJSONString(lastTopicFilters));  
 | 
            sampleClient.unsubscribe(lastTopicFilters.toArray(new String[lastTopicFilters.size()]));  
 | 
            lastTopicFilters.clear();  
 | 
        } catch (MqttException e) {  
 | 
            e.printStackTrace();  
 | 
        }  
 | 
    }  
 | 
  
 | 
    public void stop() {  
 | 
        if (mqttRecvClient != null) {  
 | 
            try {  
 | 
                if (mqttRecvClient.sampleClient != null) {  
 | 
                    mqttRecvClient.sampleClient.disconnect();  
 | 
                    mqttRecvClient.sampleClient.close();  
 | 
                    mqttRecvClient = null;  
 | 
                    lastTopicFilters.clear();  
 | 
                }  
 | 
            } catch (MqttException e) {  
 | 
                e.printStackTrace();  
 | 
            }  
 | 
        }  
 | 
    }  
 | 
  
 | 
  
 | 
}  
 |