From 14de918a79943e4961b09fa01ed320c6cad41f2e Mon Sep 17 00:00:00 2001 From: wjc <1243177876@qq.com> Date: 星期三, 28 六月 2023 17:14:51 +0800 Subject: [PATCH] Revert "Revert "Merge branch 'hxb' into wjc"" --- HDLLinkLocalSdk/src/main/java/com/hdl/sdk/link/core/utils/mqtt/MqttRecvClient.java | 412 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 files changed, 412 insertions(+), 0 deletions(-) diff --git a/HDLLinkLocalSdk/src/main/java/com/hdl/sdk/link/core/utils/mqtt/MqttRecvClient.java b/HDLLinkLocalSdk/src/main/java/com/hdl/sdk/link/core/utils/mqtt/MqttRecvClient.java new file mode 100644 index 0000000..6e8ef74 --- /dev/null +++ b/HDLLinkLocalSdk/src/main/java/com/hdl/sdk/link/core/utils/mqtt/MqttRecvClient.java @@ -0,0 +1,412 @@ +package com.hdl.sdk.link.core.utils.mqtt; + +import android.content.Context; +import android.text.TextUtils; + +import com.alibaba.fastjson.JSON; + +import com.hdl.sdk.link.common.utils.LogUtils; +import com.hdl.sdk.link.core.bean.LinkPacket; +import com.hdl.sdk.link.core.bean.eventbus.EventBindMiniRemoteSuccessInfo; +import com.hdl.sdk.link.core.bean.eventbus.EventNotifyRefreshGatewayAesKeyInfo; +import com.hdl.sdk.link.core.bean.gateway.GatewayBean; +import com.hdl.sdk.link.core.config.HDLLinkConfig; +import com.hdl.sdk.link.core.connect.HDLConnectHelper; +import com.hdl.sdk.link.core.protocol.LinkMessageDecoder; +import com.hdl.sdk.link.core.utils.AesUtil; +import com.hdl.sdk.link.core.utils.QueueUtils; +import com.hdl.sdk.link.gateway.HDLLinkLocalGateway; +import com.hdl.sdk.link.socket.bean.Packet; + +import org.eclipse.paho.client.mqttv3.IMqttActionListener; +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; +import org.eclipse.paho.client.mqttv3.IMqttToken; +import org.eclipse.paho.client.mqttv3.MqttAsyncClient; +import org.eclipse.paho.client.mqttv3.MqttCallbackExtended; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.greenrobot.eventbus.EventBus; + +import java.util.ArrayList; +import java.util.List; + +/** + * Created by Zoro on 2018/8/1. + * desc:2019/7/24 杩炴帴鎴愬姛浠ュ悗鍐嶈闃� 涓嶉渶瑕佸彂閫佸績璺� 涓嶉渶瑕佽嚜宸卞鐞嗛噸杩� + */ + +public class MqttRecvClient { + private static String mBroker; + private MemoryPersistence persistence = new MemoryPersistence(); + private MqttAsyncClient sampleClient; + private MqttConnectOptions connOpts = new MqttConnectOptions(); + private MqttThread mqttThread; + private static volatile MqttRecvClient mqttRecvClient; + private final String TAG = "MqttRecvClient"; + private static final String[] ignoreTopics = new String[]{"/thing/topo/found", "/ota/device/progress/up"}; + /** + * 涓婃鐨勪富棰橀渶瑕佽褰� 鏇存敼涓婚鐨勬椂鍊欓渶瑕佸彇娑堣闃� + */ + private static List<String> lastTopicFilters = new ArrayList<>(); + private static String mClientId; + private static String mUserName; + private static String mPassWord; + private final int[] qos = {0}; + + private MqttRecvClient() { + if (mqttThread == null) { + mqttThread = new MqttThread(); + } + if (TextUtils.isEmpty(mUserName) || TextUtils.isEmpty(mPassWord)) { + return; + } + mqttThread.start(); + } + + public static void init(Context context, String broker1, String deviceId, String userName, String pwd) { + mClientId = deviceId; + mBroker = broker1; + mUserName = userName; + mPassWord = pwd; + MqttRecvClient.create(); + } + + public void send(String topic, byte[] bytes) { + try { + if (TextUtils.isEmpty(topic)) { + LogUtils.e("璇锋眰涓婚涓簄ull"); + return; + } + checkAndsubscribeAllTopics(topic); + publish(topic, bytes); + } catch (MqttException e) { + e.printStackTrace(); + } + } + + /** + * 鍙戝竷 + * + * @param topic 涓婚 + * @param bytes 鍐呭 + * @throws MqttException + */ + public void publish(String topic, byte[] bytes) throws MqttException { + //鍥炲鏃讹紝mqtt涓婚涓殑鏂瑰悜瑕佸彉鍖栵紝瑕佸仛鏂瑰悜鏇挎崲 + mqttRecvClient.sampleClient.publish(topic, bytes, 1, false, null, new IMqttActionListener() { + @Override + public void onSuccess(IMqttToken asyncActionToken) { + LogUtils.d(TAG, topic); + } + + @Override + public void onFailure(IMqttToken asyncActionToken, Throwable exception) { + } + }); + } + + public static void create() { + if (mqttRecvClient == null) { + mqttRecvClient = new MqttRecvClient(); + } + } + + /** + * 浣跨敤鐨勬椂鍊欓渶瑕佸垽鏂潪绌� + */ + public static MqttRecvClient getInstance() { + if (null == mqttRecvClient) { + synchronized (MqttRecvClient.class) { + if (null == mqttRecvClient) { + mqttRecvClient = new MqttRecvClient(); + } + return mqttRecvClient; + } + } + return mqttRecvClient; + } + + class MqttThread extends Thread { + @Override + public void run() { + super.run(); + connect(); + } + } + + private void connect() { + try { + if (sampleClient != null) { + sampleClient.close(); + } + sampleClient = new MqttAsyncClient(mBroker, mClientId, persistence); + connOpts.setUserName(mUserName); +// connOpts.setServerURIs(new String[]{mBroker}); + connOpts.setPassword(mPassWord.toCharArray()); + connOpts.setCleanSession(true); + connOpts.setKeepAliveInterval(10); + connOpts.setAutomaticReconnect(true); + connOpts.setConnectionTimeout(10); + connOpts.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1); + sampleClient.setCallback(new MqttCallbackExtended() { + public void connectComplete(boolean reconnect, String serverURI) { + LogUtils.d(TAG, "connect success"); + checkAndsubscribeAllTopics(""); + } + + public void connectionLost(Throwable throwable) { + LogUtils.d(TAG, "杩炴帴鏂紑"); + lastTopicFilters.clear(); + } + + public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { + managerMqttMsg(topic, mqttMessage); + } + + public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { + } + }); + sampleClient.connect(connOpts); + + } catch (Exception me) { + me.printStackTrace(); + } + } + + /** + * 澶勭悊鎺ユ敹鐨刴qtt鏁版嵁 + * + * @param topic 鎺ユ敹涓婚 + * @param mqttMessage 鎺ユ敹鏁版嵁 + * @throws Exception + */ + public void managerMqttMsg(String topic, MqttMessage mqttMessage) throws Exception { + LogUtils.d(TAG, "\r\n" + "mqtt->杩滅▼鍥炲涓婚" + topic); + if (HDLConnectHelper.isLocal()) { + boolean needReturn = true; + //濡傛灉鏄湰鍦版ā寮忥紝浜戠涓嬫潵鐨勭綉鍏虫暟鎹儴鍒嗘暟鎹笉鎺ユ敹(濡傦細ota鍗囩骇鍙嶉杩涘害) + for (String ignoreTopic : ignoreTopics) { + if (topic.endsWith(ignoreTopic)) { + needReturn = false; + break; + } + } + if (needReturn) { + return; + } + } + if (topic.contains("/custom/mqtt/secret/change")) { + /** + * 缃戝叧閲嶈繛mqtt 闇�瑕佹洿鎹esKey 涓嶇劧缃戝叧鏃犳硶瑙e瘑 閫氱煡鍒锋柊缃戝叧鍒楄〃骞朵笖鏇存柊涓荤綉鍏崇殑aesKey + */ + String[] topics = topic.split("/"); + //闈炲綋鍓嶄綇瀹呯綉鍏崇殑鏁版嵁杩斿洖 + if (topics.length < 3) { + return; + } + LogUtils.d(TAG, "缃戝叧閲嶈繛mqtt绉橀挜鏇存柊閫氱煡->" + topic); + EventNotifyRefreshGatewayAesKeyInfo eventNotifyRefreshGatewayAesKeyInfo = new EventNotifyRefreshGatewayAesKeyInfo(); + eventNotifyRefreshGatewayAesKeyInfo.setGatewayId(topics[2]); + EventBus.getDefault().post(eventNotifyRefreshGatewayAesKeyInfo); + return; + } + + String[] topics = topic.split("/"); + //闈炲綋鍓嶄綇瀹呯綉鍏崇殑鏁版嵁杩斿洖 + if (topics.length < 3) { + return; + } + String aes = null; + String cloudsGatewayId = topics[2];//浜戠涓奊atewayId + GatewayBean gatewayBean = HDLLinkLocalGateway.getInstance().getGatewayByOidOrGatewayId(cloudsGatewayId); + if (cloudsGatewayId.equals(HDLLinkConfig.getInstance().getHomeId())) { + aes = getHomeAES(); + } else if (gatewayBean != null && HDLConnectHelper.getGatewayTypeList().contains(gatewayBean.getGatewayType())) { + //姣背娉qtt涓撶敤绉橀挜 + aes = gatewayBean.getAesKey(); + } else { + aes = HDLLinkConfig.getInstance().getAesKey(); + } + if (TextUtils.isEmpty(aes)) { + return; + } + + byte[] bytes = AesUtil.aesDecrypt(mqttMessage.getPayload(), aes); + if (null == bytes) { + LogUtils.d(TAG, "\r\n" + "mqtt->杩滅▼鍥炲鏁版嵁 瀵嗛挜瑙e瘑澶辫触"); + return; + } + String bodyStr = new String(bytes); + LogUtils.d(TAG, "\r\n" + "mqtt->杩滅▼鍥炲鏁版嵁" + bodyStr); + /** + * 绾㈠瀹濊澶囬�氳繃/thing/topo/found涓婚 涓婃姤绾㈠瀹濊澶囧凡缁忓叆缃戜簡 鐒跺悗鐩存帴return 涓嶉渶瑕佸啀涓嬭浜� + */ + if (topic.endsWith("/thing/topo/found")) { + /** + * {"id":"0000016E","time_stamp":"366574","objects":[{"sid":"010105D370451908110100000000", + * "name":"Mini鏅鸿兘閬ユ帶鍣�","spk":"ir.module","oid":"010105D370451908","omodel":"MIR01R-LK.10", + * "online":"true","attributes":[],"status":[],"from":"010105D370451908","src":"010105D370451908"}]} + */ + if (bodyStr.contains("ir.module")) { + EventBus.getDefault().post(new EventBindMiniRemoteSuccessInfo()); + return; + } + } + //Link浠庣綉鍏抽�忎紶鐗规畩澶勭悊 + if (topic.contains("/native/a/") && topic.contains("/slaveoid/")) { + LinkMessageDecoder.getInstance().read(new Packet(bytes)); + } else { + byte[] topBytes = new byte[3]; + if (bytes.length > 2) { + topBytes[0] = bytes[0]; + topBytes[1] = bytes[1]; + topBytes[2] = bytes[2]; + } + if (new String(topBytes).equals("hex")) {//link鍘熺敓鏁版嵁锛屽垽鏂槸鍚︽槸鏂囦欢澶勭悊閫氱煡 + LinkMessageDecoder.getInstance().read(new Packet(bytes)); + } else { + QueueUtils.getInstance().add(new LinkPacket(topic, bytes, true)); + } + } + } + + /** + * 妫�鏌ヤ富棰樻槸鍚﹀凡璁㈤槄锛屾病鏈夎闃呭氨璁㈤槄 + * + * @param sendTopic 璇锋眰涓婚 + */ + public synchronized void checkAndsubscribeAllTopics(String sendTopic) { + if (null != sampleClient && sampleClient.isConnected() == false) { + return; + } + try { + for (String topic : nativeAndLinkTopic(sendTopic)) { + if (lastTopicFilters.contains(topic)) { + continue; + } + LogUtils.d(TAG, "璁㈤槄涓婚" + topic); + + sampleClient.subscribe(topic, 0, null, new IMqttActionListener() { + @Override + public void onSuccess(IMqttToken asyncActionToken) { + if (!lastTopicFilters.contains(topic)) { + lastTopicFilters.add(topic); + } + } + + @Override + public void onFailure(IMqttToken asyncActionToken, Throwable exception) { + + } + }); + } + + } catch (MqttException e) { + e.printStackTrace(); + } + } + + /** + * 璁㈤槄骞冲彴涓嬫潵鐨勪富棰� + * + * @return string鏁扮粍 + */ + private String[] nativeAndLinkTopic(String sendTopic) { + String[] topicArray = sendTopic.split("/"); + //闈炲綋鍓嶄綇瀹呯綉鍏崇殑鏁版嵁杩斿洖 + if (topicArray.length < 3) { + return new String[]{ + String.format("/user/%s/#", HDLLinkConfig.getInstance().getGatewayId()), + String.format("/base/%s/#", HDLLinkConfig.getInstance().getGatewayId())}; + } + + String gatewayId = topicArray[2];//浜戠涓奊atewayId + String[] topics = { + String.format("/user/%s/#", HDLLinkConfig.getInstance().getGatewayId()), + String.format("/base/%s/#", HDLLinkConfig.getInstance().getGatewayId()), + String.format("/user/%s/#", gatewayId), + String.format("/base/%s/#", gatewayId), + }; + return topics; + } + + + /** + * APP璁㈤槄浜戠涓婚瑙e瘑瀵嗘枃鐨勭閽�(杩欎釜瀵嗛挜鍙兘鐢ㄤ簬璁㈤槄浜戠涓婚锛岃闃呯綉鍏充富棰樺彟澶栦竴涓瘑閽�) + * + * @return -杩斿洖瑙e瘑瀵嗘枃鐨勭閽� + */ + private String getHomeAES() { + String homeId = HDLLinkConfig.getInstance().getHomeId(); + if (TextUtils.isEmpty(homeId)) { + return null; + } + //瑙e瘑瀵嗛挜瑙勫垯锛氬凡鐜版湁鐨勪綇瀹匢D涓哄熀鍑�,浠庡彸杈逛竴涓�鑾峰彇鍊�,鏈�鍚庡鏋滀笉澶�16浣�,鍒欏線鍙宠ˉ闆� + StringBuilder aesKey = new StringBuilder(); + for (int i = homeId.length() - 1; i >= 0; i--) { + aesKey.append(homeId.charAt(i)); + if (aesKey.length() == 16) { + break; + } + } + return this.PadRight(aesKey.toString(), 16, "0"); + } + + /** + * 浠庡彸杈规坊鍔犵┖鏍兼垨鍏跺畠瀛楃 + * + * @param currentValueStr 褰撳墠鍊� + * @param count 鎬婚暱鏁� + * @param others 鍏跺畠瀛楃(鑷畾涔�) + * @return 鎬婚暱搴� + */ + private String PadRight(String currentValueStr, int count, String others) { + if (count > currentValueStr.length()) { + StringBuilder stringBuilder = new StringBuilder(); + int subLen = count - currentValueStr.length(); + for (int i = 0; i < subLen; i++) { + stringBuilder.append(others); + } + currentValueStr = currentValueStr + stringBuilder; + } + return currentValueStr; + + } + + /** + * 鍒囨崲浣忓畢鐨勬椂鍊欒闃呰鍏ㄩ儴鍙栨秷 + */ + public void removeAllTopic() { + if (null != sampleClient && sampleClient.isConnected() == false) { + return; + } + try { + if (lastTopicFilters.size() == 0) { + return; + } + LogUtils.d(TAG, "绉婚櫎涓婚:\r\n" + JSON.toJSONString(lastTopicFilters)); + sampleClient.unsubscribe(lastTopicFilters.toArray(new String[lastTopicFilters.size()])); + lastTopicFilters.clear(); + } catch (MqttException e) { + e.printStackTrace(); + } + } + + public void stop() { + if (mqttRecvClient != null) { + try { + if (mqttRecvClient.sampleClient != null) { + mqttRecvClient.sampleClient.disconnect(); + mqttRecvClient.sampleClient.close(); + mqttRecvClient = null; + } + } catch (MqttException e) { + e.printStackTrace(); + } + } + } + + +} -- Gitblit v1.8.0