package com.hdl.sdk.link.core.utils.mqtt; import android.content.Context; import android.text.TextUtils; import com.alibaba.fastjson.JSON; import com.hdl.sdk.link.common.utils.LogUtils; import com.hdl.sdk.link.core.bean.LinkPacket; import com.hdl.sdk.link.core.bean.eventbus.BaseEventBus; import com.hdl.sdk.link.core.bean.eventbus.EventBindMiniRemoteSuccessInfo; import com.hdl.sdk.link.core.bean.eventbus.EventNotifyRefreshGatewayAesKeyInfo; import com.hdl.sdk.link.core.bean.gateway.GatewayBean; import com.hdl.sdk.link.core.config.HDLLinkConfig; import com.hdl.sdk.link.core.connect.HDLConnectHelper; import com.hdl.sdk.link.core.protocol.LinkMessageDecoder; import com.hdl.sdk.link.core.utils.AesUtil; import com.hdl.sdk.link.core.utils.QueueUtils; import com.hdl.sdk.link.gateway.HDLLinkLocalGateway; import com.hdl.sdk.link.socket.bean.Packet; import org.eclipse.paho.client.mqttv3.IMqttActionListener; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.IMqttToken; import org.eclipse.paho.client.mqttv3.MqttAsyncClient; import org.eclipse.paho.client.mqttv3.MqttCallbackExtended; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.greenrobot.eventbus.EventBus; import java.util.ArrayList; import java.util.Arrays; import java.util.List; /** * Created by Zoro on 2018/8/1. * desc:2019/7/24 连接成功以后再订阅 不需要发送心跳 不需要自己处理重连 */ public class MqttRecvClient { private static String mBroker; private MemoryPersistence persistence = new MemoryPersistence(); private MqttAsyncClient sampleClient; private MqttConnectOptions connOpts = new MqttConnectOptions(); private MqttThread mqttThread; private static volatile MqttRecvClient mqttRecvClient; private final String TAG = "MqttRecvClient"; private static final String[] ignoreTopics = new String[]{"/thing/topo/found", "/ota/device/progress/up"}; /** * 上次的主题需要记录 更改主题的时候需要取消订阅 */ private static List lastTopicFilters = new ArrayList<>(); private static String mClientId; private static String mUserName; private static String mPassWord; private final int[] qos = {0}; private MqttRecvClient() { if (mqttThread == null) { mqttThread = new MqttThread(); } if (TextUtils.isEmpty(mUserName) || TextUtils.isEmpty(mPassWord)) { return; } mqttThread.start(); } public static void init(Context context, String broker1, String deviceId, String userName, String pwd) { mClientId = deviceId; mBroker = broker1; mUserName = userName; mPassWord = pwd; MqttRecvClient.create(); } public void send(String topic, byte[] bytes) { try { if (TextUtils.isEmpty(topic)) { LogUtils.e("请求主题为null"); return; } checkAndsubscribeAllTopics(topic); publish(topic, bytes); } catch (MqttException e) { e.printStackTrace(); } } /** * 发布 * * @param topic 主题 * @param bytes 内容 * @throws MqttException */ public void publish(String topic, byte[] bytes) throws MqttException { //回复时,mqtt主题中的方向要变化,要做方向替换 mqttRecvClient.sampleClient.publish(topic, bytes, 1, false, null, new IMqttActionListener() { @Override public void onSuccess(IMqttToken asyncActionToken) { LogUtils.d(TAG, topic); } @Override public void onFailure(IMqttToken asyncActionToken, Throwable exception) { } }); } public static void create() { if (mqttRecvClient == null) { mqttRecvClient = new MqttRecvClient(); } } /** * 使用的时候需要判断非空 */ public static MqttRecvClient getInstance() { if (null == mqttRecvClient) { synchronized (MqttRecvClient.class) { if (null == mqttRecvClient) { mqttRecvClient = new MqttRecvClient(); } return mqttRecvClient; } } return mqttRecvClient; } class MqttThread extends Thread { @Override public void run() { super.run(); connect(); } } private void connect() { try { if (sampleClient != null) { sampleClient.close(); } sampleClient = new MqttAsyncClient(mBroker, mClientId, persistence); connOpts.setUserName(mUserName); // connOpts.setServerURIs(new String[]{mBroker}); connOpts.setPassword(mPassWord.toCharArray()); connOpts.setCleanSession(true); connOpts.setKeepAliveInterval(10); connOpts.setAutomaticReconnect(true); connOpts.setConnectionTimeout(60); connOpts.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1); sampleClient.setCallback(new MqttCallbackExtended() { public void connectComplete(boolean reconnect, String serverURI) { LogUtils.d(TAG, "connect success"); checkAndsubscribeAllTopics(""); } public void connectionLost(Throwable throwable) { LogUtils.d(TAG, "连接断开"); lastTopicFilters.clear(); } public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { managerMqttMsg(topic, mqttMessage); } public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { } }); sampleClient.connect(connOpts); } catch (Exception me) { me.printStackTrace(); } } /** * 处理接收的mqtt数据 * * @param topic 接收主题 * @param mqttMessage 接收数据 * @throws Exception */ public void managerMqttMsg(String topic, MqttMessage mqttMessage) throws Exception { LogUtils.d(TAG, "\r\n" + "mqtt->远程回复主题" + topic); if (HDLConnectHelper.isLocal()) { boolean needReturn = true; //如果是本地模式,云端下来的网关数据部分数据不接收(如:ota升级反馈进度) for (String ignoreTopic : ignoreTopics) { if (topic.endsWith(ignoreTopic)) { needReturn = false; break; } } if (needReturn) { return; } } if (topic.contains("/custom/mqtt/secret/change")) { /** * 网关重连mqtt 需要更换aesKey 不然网关无法解密 通知刷新网关列表并且更新主网关的aesKey */ String[] topics = topic.split("/"); //非当前住宅网关的数据返回 if (topics.length < 3) { return; } LogUtils.d(TAG, "网关重连mqtt秘钥更新通知->" + topic); BaseEventBus baseEventBus=new BaseEventBus(); baseEventBus.setTopic(topic); EventBus.getDefault().post(baseEventBus); EventNotifyRefreshGatewayAesKeyInfo eventNotifyRefreshGatewayAesKeyInfo = new EventNotifyRefreshGatewayAesKeyInfo(); eventNotifyRefreshGatewayAesKeyInfo.setGatewayId(topics[2]); EventBus.getDefault().post(eventNotifyRefreshGatewayAesKeyInfo); return; } String[] topics = topic.split("/"); //非当前住宅网关的数据返回 if (topics.length < 3) { return; } String aes = null; String cloudsGatewayId = topics[2];//云端上GatewayId GatewayBean gatewayBean = HDLLinkLocalGateway.getInstance().getGatewayByOidOrGatewayId(cloudsGatewayId); if (cloudsGatewayId.equals(HDLLinkConfig.getInstance().getHomeId())) { aes = getHomeAES(); } else if (gatewayBean != null && HDLConnectHelper.getGatewayTypeList().contains(gatewayBean.getGatewayType())) { //毫米波mqtt专用秘钥、逆变器mqtt专用秘钥 aes = gatewayBean.getAesKey(); } else { aes = HDLLinkConfig.getInstance().getAesKey(); } if (TextUtils.isEmpty(aes)) { return; } byte[] bytes = AesUtil.aesDecrypt(mqttMessage.getPayload(), aes); if (null == bytes) { LogUtils.d(TAG, "\r\n" + "mqtt->远程回复数据 密钥解密失败"); return; } String bodyStr = new String(bytes); if (HDLConnectHelper.isInverterTopic(topic)) { LogUtils.d(TAG, "\r\n" + "mqtt->远程回复数据" + Arrays.toString(HDLConnectHelper.byteArrayConvertIntArray(bytes))); } else { LogUtils.d(TAG, "\r\n" + "mqtt->远程回复数据" + bodyStr); } /** * 红外宝设备通过/thing/topo/found主题 上报红外宝设备已经入网了 然后直接return 不需要再下行了 */ if (topic.endsWith("/thing/topo/found")) { /** * {"id":"0000016E","time_stamp":"366574","objects":[{"sid":"010105D370451908110100000000", * "name":"Mini智能遥控器","spk":"ir.module","oid":"010105D370451908","omodel":"MIR01R-LK.10", * "online":"true","attributes":[],"status":[],"from":"010105D370451908","src":"010105D370451908"}]} */ if (bodyStr.contains("ir.module")) { EventBus.getDefault().post(new EventBindMiniRemoteSuccessInfo()); return; } } //Link从网关透传特殊处理 if (topic.contains("/native/a/") && topic.contains("/slaveoid/")) { LinkMessageDecoder.getInstance().read(new Packet(bytes)); } else { byte[] topBytes = new byte[3]; if (bytes.length > 2) { topBytes[0] = bytes[0]; topBytes[1] = bytes[1]; topBytes[2] = bytes[2]; } if (new String(topBytes).equals("hex")) {//link原生数据,判断是否是文件处理通知 LinkMessageDecoder.getInstance().read(new Packet(bytes)); } else { QueueUtils.getInstance().add(new LinkPacket(topic, bytes, true)); } } } /** * 检查主题是否已订阅,没有订阅就订阅 * * @param sendTopic 请求主题 */ public synchronized void checkAndsubscribeAllTopics(String sendTopic) { if (null == sampleClient) { return; } if (null != sampleClient && sampleClient.isConnected() == false) { return; } try { for (String topic : nativeAndLinkTopic(sendTopic)) { if (lastTopicFilters.contains(topic)) { continue; } LogUtils.d(TAG, "订阅主题" + topic); sampleClient.subscribe(topic, 0, null, new IMqttActionListener() { @Override public void onSuccess(IMqttToken asyncActionToken) { if (!lastTopicFilters.contains(topic)) { lastTopicFilters.add(topic); } } @Override public void onFailure(IMqttToken asyncActionToken, Throwable exception) { } }); } } catch (MqttException e) { e.printStackTrace(); } } /** * 订阅平台下来的主题 * * @return string数组 */ private String[] nativeAndLinkTopic(String sendTopic) { String[] topicArray = sendTopic.split("/"); //非当前住宅网关的数据返回 if (topicArray.length < 3) { return new String[]{ String.format("/user/%s/#", HDLLinkConfig.getInstance().getGatewayId()), String.format("/base/%s/#", HDLLinkConfig.getInstance().getGatewayId())}; } String gatewayId = topicArray[2];//云端上GatewayId String[] topics = { String.format("/user/%s/#", HDLLinkConfig.getInstance().getGatewayId()), String.format("/base/%s/#", HDLLinkConfig.getInstance().getGatewayId()), String.format("/user/%s/#", gatewayId), String.format("/base/%s/#", gatewayId), }; return topics; } /** * APP订阅云端主题解密密文的秘钥(这个密钥只能用于订阅云端主题,订阅网关主题另外一个密钥) * * @return -返回解密密文的秘钥 */ private String getHomeAES() { String homeId = HDLLinkConfig.getInstance().getHomeId(); if (TextUtils.isEmpty(homeId)) { return null; } //解密密钥规则:已现有的住宅ID为基准,从右边一一获取值,最后如果不够16位,则往右补零 StringBuilder aesKey = new StringBuilder(); for (int i = homeId.length() - 1; i >= 0; i--) { aesKey.append(homeId.charAt(i)); if (aesKey.length() == 16) { break; } } return this.PadRight(aesKey.toString(), 16, "0"); } /** * 从右边添加空格或其它字符 * * @param currentValueStr 当前值 * @param count 总长数 * @param others 其它字符(自定义) * @return 总长度 */ private String PadRight(String currentValueStr, int count, String others) { if (count > currentValueStr.length()) { StringBuilder stringBuilder = new StringBuilder(); int subLen = count - currentValueStr.length(); for (int i = 0; i < subLen; i++) { stringBuilder.append(others); } currentValueStr = currentValueStr + stringBuilder; } return currentValueStr; } /** * 切换住宅的时候订阅要全部取消 */ public void removeAllTopic() { if (null == sampleClient) { return; } if (null != sampleClient && sampleClient.isConnected() == false) { return; } try { if (lastTopicFilters.size() == 0) { return; } LogUtils.d(TAG, "移除主题:\r\n" + JSON.toJSONString(lastTopicFilters)); sampleClient.unsubscribe(lastTopicFilters.toArray(new String[lastTopicFilters.size()])); lastTopicFilters.clear(); } catch (MqttException e) { e.printStackTrace(); } } public void stop() { if (mqttRecvClient != null) { try { if (mqttRecvClient.sampleClient != null) { mqttRecvClient.sampleClient.disconnect(); mqttRecvClient.sampleClient.close(); mqttRecvClient = null; lastTopicFilters.clear(); } } catch (MqttException e) { e.printStackTrace(); } } } }