package com.hdl.sdk.link.core.utils.mqtt; import android.os.SystemClock; import android.text.TextUtils; import com.alibaba.fastjson.JSON; import com.hdl.sdk.link.common.utils.LogUtils; import com.hdl.sdk.link.common.utils.ThreadToolUtils; import com.hdl.sdk.link.core.bean.LinkPacket; import com.hdl.sdk.link.core.bean.eventbus.BaseEventBus; import com.hdl.sdk.link.core.bean.eventbus.EventBindMiniRemoteSuccessInfo; import com.hdl.sdk.link.core.bean.eventbus.EventNotifyRefreshGatewayAesKeyInfo; import com.hdl.sdk.link.core.bean.gateway.GatewayBean; import com.hdl.sdk.link.core.config.HDLLinkConfig; import com.hdl.sdk.link.core.connect.HDLConnectHelper; import com.hdl.sdk.link.core.protocol.LinkMessageDecoder; import com.hdl.sdk.link.core.utils.AesUtil; import com.hdl.sdk.link.core.utils.ByteUtils; import com.hdl.sdk.link.core.utils.JsonUtils; import com.hdl.sdk.link.core.utils.QueueUtils; import com.hdl.sdk.link.gateway.HDLLinkLocalGateway; import com.hdl.sdk.link.socket.bean.Packet; import org.eclipse.paho.client.mqttv3.IMqttActionListener; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.IMqttToken; import org.eclipse.paho.client.mqttv3.MqttAsyncClient; import org.eclipse.paho.client.mqttv3.MqttCallbackExtended; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.greenrobot.eventbus.EventBus; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutorService; /** * Created by Zoro on 2018/8/1. * desc:2019/7/24 连接成功以后再订阅 不需要发送心跳 不需要自己处理重连 */ public class MqttRecvClient { private final String TAG = "MqttRecvClient"; private static volatile MqttRecvClient mqttRecvClient; private MqttAsyncClient sampleClient; private String mBroker; private String mClientId; private String mUserName; private String mPassWord; private final String[] ignoreTopics = new String[]{"/thing/topo/found", "/ota/device/progress/up"}; /** * 上次的主题需要记录 更改主题的时候需要取消订阅 */ private List lastTopicFilters = new ArrayList<>(); private final int connectionTimeout = 8;//秒 private final int keepAliveInterval = 10; /** * 设置远程连接参数 * * @param broker 地址 * @param clientId 客户端Id * @param userName 用户名 * @param pwd 密码 */ public void setConnectParam(String broker, String clientId, String userName, String pwd) { mBroker = broker; mClientId = clientId; mUserName = userName; mPassWord = pwd; } /** * 发送数据 * * @param topic * @param bytes */ public void send(String topic, byte[] bytes) { try { if (TextUtils.isEmpty(topic) || bytes == null) { LogUtils.e(TAG, "数据不发送,Topic:" + topic + " Bytes:" + bytes); return; } checkAndsubscribeAllTopics(topic); publish(topic, bytes); } catch (MqttException e) { e.printStackTrace(); } } /** * 发布 * * @param topic 主题 * @param bytes 内容 * @throws MqttException */ public void publish(String topic, byte[] bytes) throws MqttException { if (sampleClient == null) { LogUtils.i(TAG, "Mqtt未初始化"); return; } //如果还没有连接成功,可以试着休眠等一下。这种情况可能是刚开始连接,接着马上发送数据 if (!sampleClient.isConnected()) { reConnect(); SystemClock.sleep(500); } //回复时,mqtt主题中的方向要变化,要做方向替换 /** * 从网关会出现mqtt发出去失败 所以改成模式2 */ int qos = 0; if(topic.contains("/custom/native/zigbee/down/slaveoid/")){ qos = 2; } sampleClient.publish(topic, bytes, qos, false, null, new IMqttActionListener() { @Override public void onSuccess(IMqttToken asyncActionToken) { LogUtils.d(TAG, "mqtt发送成功,Topic:" + topic); } @Override public void onFailure(IMqttToken asyncActionToken, Throwable exception) { LogUtils.d(TAG, "mqtt发送失败,Topic:" + topic); } }); } /** * 使用的时候需要判断非空 */ public static MqttRecvClient getInstance() { if (null == mqttRecvClient) { synchronized (MqttRecvClient.class) { if (null == mqttRecvClient) { mqttRecvClient = new MqttRecvClient(); } } } return mqttRecvClient; } /** * 重新连接 */ public void reConnect() { try { if (sampleClient == null) { return; } LogUtils.i(TAG, "mqtt重新连接"); //重新连接 sampleClient.reconnect(); } catch (Exception e) { } } /** * 连接mqtt服务器 */ public void connect() { try { //先关闭之前的连接 disConnect(); if (TextUtils.isEmpty(mBroker) || TextUtils.isEmpty(mClientId) || TextUtils.isEmpty(mUserName) || TextUtils.isEmpty(mPassWord)) { LogUtils.i(TAG, "连接参数为空 mBroker:" + mBroker + " mClientId:" + mClientId + " mUserName:" + mUserName + " mPassWord:" + mPassWord); return; } sampleClient = new MqttAsyncClient(mBroker, mClientId, new MemoryPersistence()); MqttConnectOptions connOpts = new MqttConnectOptions(); connOpts.setUserName(mUserName); // connOpts.setServerURIs(new String[]{mBroker}); connOpts.setPassword(mPassWord.toCharArray()); connOpts.setCleanSession(true); connOpts.setKeepAliveInterval(keepAliveInterval); connOpts.setAutomaticReconnect(true); connOpts.setConnectionTimeout(connectionTimeout); connOpts.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1); connOpts.setMaxInflight(1000); sampleClient.setCallback(new MqttCallbackExtended() { public void connectComplete(boolean reconnect, String serverURI) { LogUtils.d(TAG, "连接成功"); checkAndsubscribeAllTopics(""); } public void connectionLost(Throwable throwable) { LogUtils.d(TAG, "连接断开"); lastTopicFilters.clear(); } public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { managerMqttMsg(topic, mqttMessage); } public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { } }); LogUtils.d(TAG, "开始连接,连接参数 mBroker:" + mBroker + " mClientId:" + mClientId + " mUserName:" + mUserName + " mPassWord:" + mPassWord); sampleClient.connect(connOpts); } catch (Exception me) { me.printStackTrace(); } } /** * 处理接收的mqtt数据 * * @param topic 接收主题 * @param mqttMessage 接收数据 * @throws Exception */ private void managerMqttMsg(String topic, MqttMessage mqttMessage) throws Exception { LogUtils.d(TAG, "远程接收主题,Topic:" + topic); if (HDLConnectHelper.isLocal()) { boolean needReturn = false; //如果是本地模式,云端下来的网关数据部分数据不接收(如:ota升级反馈进度) for (String ignoreTopic : ignoreTopics) { if (topic.endsWith(ignoreTopic)) { needReturn = true; break; } } if (needReturn) { return; } } if (topic.contains("/custom/mqtt/secret/change")) { /** * 网关重连mqtt 需要更换aesKey 不然网关无法解密 通知刷新网关列表并且更新主网关的aesKey */ String[] topics = topic.split("/"); //非当前住宅网关的数据返回 if (topics.length < 3) { return; } LogUtils.d(TAG, "网关重连mqtt秘钥更新通知" + topic); BaseEventBus baseEventBus = new BaseEventBus(); baseEventBus.setTopic(topic); EventBus.getDefault().post(baseEventBus); EventNotifyRefreshGatewayAesKeyInfo eventNotifyRefreshGatewayAesKeyInfo = new EventNotifyRefreshGatewayAesKeyInfo(); eventNotifyRefreshGatewayAesKeyInfo.setGatewayId(topics[2]); EventBus.getDefault().post(eventNotifyRefreshGatewayAesKeyInfo); return; } String[] topics = topic.split("/"); //非当前住宅网关的数据返回 if (topics.length < 3) { return; } String aes = HDLLinkConfig.getInstance().getAesKey();//默认用主网关的密钥 String cloudsGatewayId = topics[2];//云端上GatewayId GatewayBean gatewayBean = HDLLinkLocalGateway.getInstance().getGatewayByOidOrGatewayId(cloudsGatewayId); //云端转换网关的数据,非网关直接过来的数据,这个用的是住宅密钥 if (cloudsGatewayId.equals(HDLLinkConfig.getInstance().getHomeId())) { aes = getHomeAES(); } else { //网关的数据 if (gatewayBean != null) { //毫米波mqtt专用秘钥、逆变器mqtt专用秘钥 aes = gatewayBean.getAesKey(); } } if (TextUtils.isEmpty(aes)) { LogUtils.e("找不到远程解密的密钥,这个问题要排期解决"); return; } byte[] bytes = AesUtil.aesDecrypt(mqttMessage.getPayload(), aes); if (null == bytes) { LogUtils.i(TAG, "远程回复数据 密钥解密失败"); return; } //zigbee数据比较特殊,是json,但前面有其它数据 if (JsonUtils.isJson(bytes) || topic.contains("/custom/native/zigbee/up")) { LogUtils.i(TAG, "远程接收数据,Payload:" + new String(bytes)); } else { LogUtils.i(TAG, "远程接收数据,Payload:" + ByteUtils.encodeHexString(bytes)); } /** * 红外宝设备通过/thing/topo/found主题 上报红外宝设备已经入网了 然后直接return 不需要再下行了 */ if (topic.endsWith("/thing/topo/found")) { /** * {"id":"0000016E","time_stamp":"366574","objects":[{"sid":"010105D370451908110100000000", * "name":"Mini智能遥控器","spk":"ir.module","oid":"010105D370451908","omodel":"MIR01R-LK.10", * "online":"true","attributes":[],"status":[],"from":"010105D370451908","src":"010105D370451908"}]} */ if (new String(bytes).contains("ir.module")) { EventBus.getDefault().post(new EventBindMiniRemoteSuccessInfo()); return; } } //Link从网关透传特殊处理 if (topic.contains("/native/a/") && topic.contains("/slaveoid/")) { LinkMessageDecoder.getInstance().read(new Packet(bytes)); } else { byte[] topBytes = new byte[3]; if (bytes.length > 2) { topBytes[0] = bytes[0]; topBytes[1] = bytes[1]; topBytes[2] = bytes[2]; } if (new String(topBytes).equals("hex")) {//link原生数据,判断是否是文件处理通知 LinkMessageDecoder.getInstance().read(new Packet(bytes)); } else { QueueUtils.getInstance().add(new LinkPacket(topic, bytes, true)); } } } /** * 检查主题是否已订阅,没有订阅就订阅 * * @param sendTopic 请求主题 */ public synchronized void checkAndsubscribeAllTopics(String sendTopic) { // LogUtils.d(TAG, "收到订阅主题\r\n" + sendTopic); if (null == sampleClient) { return; } if (sampleClient.isConnected() == false) { return; } try { String topics[] = nativeAndLinkTopic(sendTopic); for (String topic : topics) { if (lastTopicFilters.contains(topic)) { continue; } LogUtils.d(TAG, "订阅主题:" + topic); sampleClient.subscribe(topic, 0, null, new IMqttActionListener() { @Override public void onSuccess(IMqttToken asyncActionToken) { if (!lastTopicFilters.contains(topic)) { lastTopicFilters.add(topic); } } @Override public void onFailure(IMqttToken asyncActionToken, Throwable exception) { } }); } } catch (MqttException e) { e.printStackTrace(); } } /** * 订阅平台下来的主题 * * @return string数组 */ private String[] nativeAndLinkTopic(String sendTopic) { String[] topicArray = sendTopic.split("/"); //非当前住宅网关的数据返回,默认先订阅当前 if (topicArray.length < 3) { if (TextUtils.isEmpty(HDLLinkConfig.getInstance().getGatewayId())) { return new String[]{}; } //默认订阅主网关的信息 return new String[]{ String.format("/user/%s/#", HDLLinkConfig.getInstance().getGatewayId()), String.format("/base/%s/#", HDLLinkConfig.getInstance().getGatewayId())}; } String gatewayId = topicArray[2];//云端上GatewayId if (gatewayId.equals(HDLLinkConfig.getInstance().getGatewayId())) { return new String[]{ String.format("/user/%s/#", gatewayId), String.format("/base/%s/#", gatewayId) }; } else { return new String[]{ String.format("/user/%s/#", gatewayId), String.format("/base/%s/#", gatewayId), String.format("/user/%s/#", HDLLinkConfig.getInstance().getGatewayId()), String.format("/base/%s/#", HDLLinkConfig.getInstance().getGatewayId()) }; } } /** * APP订阅云端主题解密密文的秘钥(这个密钥只能用于订阅云端主题,订阅网关主题另外一个密钥) * * @return -返回解密密文的秘钥 */ private String getHomeAES() { String homeId = HDLLinkConfig.getInstance().getHomeId(); if (TextUtils.isEmpty(homeId)) { return null; } //解密密钥规则:已现有的住宅ID为基准,从右边一一获取值,最后如果不够16位,则往右补零 StringBuilder aesKey = new StringBuilder(); for (int i = homeId.length() - 1; i >= 0; i--) { aesKey.append(homeId.charAt(i)); if (aesKey.length() == 16) { break; } } return this.PadRight(aesKey.toString(), 16, "0"); } /** * 从右边添加空格或其它字符 * * @param currentValueStr 当前值 * @param count 总长数 * @param others 其它字符(自定义) * @return 总长度 */ private String PadRight(String currentValueStr, int count, String others) { if (count > currentValueStr.length()) { StringBuilder stringBuilder = new StringBuilder(); int subLen = count - currentValueStr.length(); for (int i = 0; i < subLen; i++) { stringBuilder.append(others); } currentValueStr = currentValueStr + stringBuilder; } return currentValueStr; } /** * 切换住宅的时候订阅要全部取消 */ public void removeAllTopic() { if (null == sampleClient) { return; } if (sampleClient.isConnected() == false) { return; } try { if (lastTopicFilters.size() == 0) { return; } LogUtils.d(TAG, "移除主题\r\n" + JSON.toJSONString(lastTopicFilters)); sampleClient.unsubscribe(lastTopicFilters.toArray(new String[lastTopicFilters.size()])); lastTopicFilters.clear(); } catch (MqttException e) { e.printStackTrace(); } } /** * 关闭当前连接,一般不要用,只创建一个连接就好了 */ public void disConnect() { try { if (sampleClient == null) { return; } LogUtils.i(TAG, "断开mqtt连接"); sampleClient.disconnect(); sampleClient = null;//不是置空mqttRecvClient,而是mqttRecvClient.sampleClient lastTopicFilters.clear(); } catch (MqttException e) { e.printStackTrace(); } } public String getmBroker() { return mBroker; } public void setmBroker(String mBroker) { this.mBroker = mBroker; } public String getmClientId() { return mClientId; } public void setmClientId(String mClientId) { this.mClientId = mClientId; } public String getmUserName() { return mUserName; } public void setmUserName(String mUserName) { this.mUserName = mUserName; } public String getmPassWord() { return mPassWord; } public void setmPassWord(String mPassWord) { this.mPassWord = mPassWord; } /** * 是否已经初始化及调用连接 * * @return */ public boolean isInit() { return sampleClient != null; } public boolean isConnected() { if (sampleClient == null) { return false; } return sampleClient.isConnected(); } }