package com.hdl.sdk.link.core.utils.mqtt;
|
|
import android.content.Context;
|
import android.text.TextUtils;
|
|
import com.alibaba.fastjson.JSON;
|
|
import com.hdl.sdk.link.common.utils.LogUtils;
|
import com.hdl.sdk.link.core.bean.LinkPacket;
|
import com.hdl.sdk.link.core.bean.eventbus.EventBindMiniRemoteSuccessInfo;
|
import com.hdl.sdk.link.core.bean.eventbus.EventNotifyRefreshGatewayAesKeyInfo;
|
import com.hdl.sdk.link.core.bean.gateway.GatewayBean;
|
import com.hdl.sdk.link.core.config.HDLLinkConfig;
|
import com.hdl.sdk.link.core.connect.HDLConnectHelper;
|
import com.hdl.sdk.link.core.protocol.LinkMessageDecoder;
|
import com.hdl.sdk.link.core.utils.AesUtil;
|
import com.hdl.sdk.link.core.utils.QueueUtils;
|
import com.hdl.sdk.link.gateway.HDLLinkLocalGateway;
|
import com.hdl.sdk.link.socket.bean.Packet;
|
|
import org.eclipse.paho.client.mqttv3.IMqttActionListener;
|
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
|
import org.eclipse.paho.client.mqttv3.IMqttToken;
|
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
|
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
|
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
|
import org.eclipse.paho.client.mqttv3.MqttException;
|
import org.eclipse.paho.client.mqttv3.MqttMessage;
|
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
|
import org.greenrobot.eventbus.EventBus;
|
|
import java.util.ArrayList;
|
import java.util.List;
|
|
/**
|
* Created by Zoro on 2018/8/1.
|
* desc:2019/7/24 连接成功以后再订阅 不需要发送心跳 不需要自己处理重连
|
*/
|
|
public class MqttRecvClient {
|
private static String mBroker;
|
private MemoryPersistence persistence = new MemoryPersistence();
|
private MqttAsyncClient sampleClient;
|
private MqttConnectOptions connOpts = new MqttConnectOptions();
|
private MqttThread mqttThread;
|
private static volatile MqttRecvClient mqttRecvClient;
|
private final String TAG = "MqttRecvClient";
|
private static final String[] ignoreTopics = new String[]{"/thing/topo/found", "/ota/device/progress/up"};
|
/**
|
* 上次的主题需要记录 更改主题的时候需要取消订阅
|
*/
|
private static List<String> lastTopicFilters = new ArrayList<>();
|
private static String mClientId;
|
private static String mUserName;
|
private static String mPassWord;
|
private final int[] qos = {0};
|
|
private MqttRecvClient() {
|
if (mqttThread == null) {
|
mqttThread = new MqttThread();
|
}
|
if (TextUtils.isEmpty(mUserName) || TextUtils.isEmpty(mPassWord)) {
|
return;
|
}
|
mqttThread.start();
|
}
|
|
public static void init(Context context, String broker1, String deviceId, String userName, String pwd) {
|
mClientId = deviceId;
|
mBroker = broker1;
|
mUserName = userName;
|
mPassWord = pwd;
|
MqttRecvClient.create();
|
}
|
|
public void send(String topic, byte[] bytes) {
|
try {
|
if (TextUtils.isEmpty(topic)) {
|
LogUtils.e("请求主题为null");
|
return;
|
}
|
checkAndsubscribeAllTopics(topic);
|
publish(topic, bytes);
|
} catch (MqttException e) {
|
e.printStackTrace();
|
}
|
}
|
|
/**
|
* 发布
|
*
|
* @param topic 主题
|
* @param bytes 内容
|
* @throws MqttException
|
*/
|
public void publish(String topic, byte[] bytes) throws MqttException {
|
//回复时,mqtt主题中的方向要变化,要做方向替换
|
mqttRecvClient.sampleClient.publish(topic, bytes, 1, false, null, new IMqttActionListener() {
|
@Override
|
public void onSuccess(IMqttToken asyncActionToken) {
|
LogUtils.d(TAG, topic);
|
}
|
|
@Override
|
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
|
}
|
});
|
}
|
|
public static void create() {
|
if (mqttRecvClient == null) {
|
mqttRecvClient = new MqttRecvClient();
|
}
|
}
|
|
/**
|
* 使用的时候需要判断非空
|
*/
|
public static MqttRecvClient getInstance() {
|
if (null == mqttRecvClient) {
|
synchronized (MqttRecvClient.class) {
|
if (null == mqttRecvClient) {
|
mqttRecvClient = new MqttRecvClient();
|
}
|
return mqttRecvClient;
|
}
|
}
|
return mqttRecvClient;
|
}
|
|
class MqttThread extends Thread {
|
@Override
|
public void run() {
|
super.run();
|
connect();
|
}
|
}
|
|
private void connect() {
|
try {
|
if (sampleClient != null) {
|
sampleClient.close();
|
}
|
sampleClient = new MqttAsyncClient(mBroker, mClientId, persistence);
|
connOpts.setUserName(mUserName);
|
// connOpts.setServerURIs(new String[]{mBroker});
|
connOpts.setPassword(mPassWord.toCharArray());
|
connOpts.setCleanSession(true);
|
connOpts.setKeepAliveInterval(10);
|
connOpts.setAutomaticReconnect(true);
|
connOpts.setConnectionTimeout(10);
|
connOpts.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);
|
sampleClient.setCallback(new MqttCallbackExtended() {
|
public void connectComplete(boolean reconnect, String serverURI) {
|
LogUtils.d(TAG, "mqtt连接成功");
|
checkAndsubscribeAllTopics("");
|
}
|
|
public void connectionLost(Throwable throwable) {
|
LogUtils.d(TAG, "mqtt连接断开");
|
lastTopicFilters.clear();
|
}
|
|
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
|
managerMqttMsg(topic, mqttMessage);
|
}
|
|
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
|
}
|
});
|
sampleClient.connect(connOpts);
|
|
} catch (Exception me) {
|
me.printStackTrace();
|
}
|
}
|
|
/**
|
* 处理接收的mqtt数据
|
*
|
* @param topic 接收主题
|
* @param mqttMessage 接收数据
|
* @throws Exception
|
*/
|
public void managerMqttMsg(String topic, MqttMessage mqttMessage) throws Exception {
|
LogUtils.d(TAG, "\r\n" + "mqtt->远程回复主题" + topic);
|
if (HDLConnectHelper.isLocal()) {
|
boolean needReturn = true;
|
//如果是本地模式,云端下来的网关数据部分数据不接收(如:ota升级反馈进度)
|
for (String ignoreTopic : ignoreTopics) {
|
if (topic.endsWith(ignoreTopic)) {
|
needReturn = false;
|
break;
|
}
|
}
|
if (needReturn) {
|
return;
|
}
|
}
|
if (topic.contains("/custom/mqtt/secret/change")) {
|
/**
|
* 网关重连mqtt 需要更换aesKey 不然网关无法解密 通知刷新网关列表并且更新主网关的aesKey
|
*/
|
String[] topics = topic.split("/");
|
//非当前住宅网关的数据返回
|
if (topics.length < 3) {
|
return;
|
}
|
LogUtils.d(TAG, "网关重连mqtt秘钥更新通知->" + topic);
|
EventNotifyRefreshGatewayAesKeyInfo eventNotifyRefreshGatewayAesKeyInfo = new EventNotifyRefreshGatewayAesKeyInfo();
|
eventNotifyRefreshGatewayAesKeyInfo.setGatewayId(topics[2]);
|
EventBus.getDefault().post(eventNotifyRefreshGatewayAesKeyInfo);
|
return;
|
}
|
|
String[] topics = topic.split("/");
|
//非当前住宅网关的数据返回
|
if (topics.length < 3) {
|
return;
|
}
|
String aes = null;
|
String cloudsGatewayId = topics[2];//云端上GatewayId
|
GatewayBean gatewayBean = HDLLinkLocalGateway.getInstance().getGatewayByOidOrGatewayId(cloudsGatewayId);
|
if (cloudsGatewayId.equals(HDLLinkConfig.getInstance().getHomeId())) {
|
aes = getHomeAES();
|
} else if (gatewayBean != null && HDLConnectHelper.getGatewayTypeList().contains(gatewayBean.getGatewayType())) {
|
//逆变器mqtt专用秘钥
|
aes = gatewayBean.getAesKey();
|
} else {
|
aes = HDLLinkConfig.getInstance().getAesKey();
|
}
|
if (TextUtils.isEmpty(aes)) {
|
return;
|
}
|
|
byte[] bytes = AesUtil.aesDecrypt(mqttMessage.getPayload(), aes);
|
if (null == bytes) {
|
LogUtils.d(TAG, "\r\n" + "mqtt->远程回复数据 密钥解密失败");
|
return;
|
}
|
String bodyStr = new String(bytes);
|
LogUtils.d(TAG, "\r\n" + "mqtt->远程回复数据" + bodyStr);
|
/**
|
* 红外宝设备通过/thing/topo/found主题 上报红外宝设备已经入网了 然后直接return 不需要再下行了
|
*/
|
if (topic.endsWith("/thing/topo/found")) {
|
/**
|
* {"id":"0000016E","time_stamp":"366574","objects":[{"sid":"010105D370451908110100000000",
|
* "name":"Mini智能遥控器","spk":"ir.module","oid":"010105D370451908","omodel":"MIR01R-LK.10",
|
* "online":"true","attributes":[],"status":[],"from":"010105D370451908","src":"010105D370451908"}]}
|
*/
|
if (bodyStr.contains("ir.module")) {
|
EventBus.getDefault().post(new EventBindMiniRemoteSuccessInfo());
|
return;
|
}
|
}
|
//Link从网关透传特殊处理
|
if (topic.contains("/native/a/") && topic.contains("/slaveoid/")) {
|
LinkMessageDecoder.getInstance().read(new Packet(bytes));
|
} else {
|
byte[] topBytes = new byte[3];
|
if (bytes.length > 2) {
|
topBytes[0] = bytes[0];
|
topBytes[1] = bytes[1];
|
topBytes[2] = bytes[2];
|
}
|
if (new String(topBytes).equals("hex")) {//link原生数据,判断是否是文件处理通知
|
LinkMessageDecoder.getInstance().read(new Packet(bytes));
|
} else {
|
QueueUtils.getInstance().add(new LinkPacket(topic, bytes, true));
|
}
|
}
|
}
|
|
/**
|
* 检查主题是否已订阅,没有订阅就订阅
|
*
|
* @param sendTopic 请求主题
|
*/
|
public synchronized void checkAndsubscribeAllTopics(String sendTopic) {
|
if (null != sampleClient && sampleClient.isConnected() == false) {
|
return;
|
}
|
try {
|
for (String topic : nativeAndLinkTopic(sendTopic)) {
|
if (lastTopicFilters.contains(topic)) {
|
continue;
|
}
|
LogUtils.d(TAG, "订阅主题" + topic);
|
|
sampleClient.subscribe(topic, 0, null, new IMqttActionListener() {
|
@Override
|
public void onSuccess(IMqttToken asyncActionToken) {
|
if (!lastTopicFilters.contains(topic)) {
|
lastTopicFilters.add(topic);
|
}
|
}
|
|
@Override
|
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
|
|
}
|
});
|
}
|
|
} catch (MqttException e) {
|
e.printStackTrace();
|
}
|
}
|
|
/**
|
* 订阅平台下来的主题
|
*
|
* @return string数组
|
*/
|
private String[] nativeAndLinkTopic(String sendTopic) {
|
String[] topicArray = sendTopic.split("/");
|
//非当前住宅网关的数据返回
|
if (topicArray.length < 3) {
|
return new String[]{
|
String.format("/user/%s/#", HDLLinkConfig.getInstance().getGatewayId()),
|
String.format("/base/%s/#", HDLLinkConfig.getInstance().getGatewayId())};
|
}
|
|
String gatewayId = topicArray[2];//云端上GatewayId
|
String[] topics = {
|
String.format("/user/%s/#", HDLLinkConfig.getInstance().getGatewayId()),
|
String.format("/base/%s/#", HDLLinkConfig.getInstance().getGatewayId()),
|
String.format("/user/%s/#", gatewayId),
|
String.format("/base/%s/#", gatewayId),
|
};
|
return topics;
|
}
|
|
|
/**
|
* APP订阅云端主题解密密文的秘钥(这个密钥只能用于订阅云端主题,订阅网关主题另外一个密钥)
|
*
|
* @return -返回解密密文的秘钥
|
*/
|
private String getHomeAES() {
|
String homeId = HDLLinkConfig.getInstance().getHomeId();
|
if (TextUtils.isEmpty(homeId)) {
|
return null;
|
}
|
//解密密钥规则:已现有的住宅ID为基准,从右边一一获取值,最后如果不够16位,则往右补零
|
StringBuilder aesKey = new StringBuilder();
|
for (int i = homeId.length() - 1; i >= 0; i--) {
|
aesKey.append(homeId.charAt(i));
|
if (aesKey.length() == 16) {
|
break;
|
}
|
}
|
return this.PadRight(aesKey.toString(), 16, "0");
|
}
|
|
/**
|
* 从右边添加空格或其它字符
|
*
|
* @param currentValueStr 当前值
|
* @param count 总长数
|
* @param others 其它字符(自定义)
|
* @return 总长度
|
*/
|
private String PadRight(String currentValueStr, int count, String others) {
|
if (count > currentValueStr.length()) {
|
StringBuilder stringBuilder = new StringBuilder();
|
int subLen = count - currentValueStr.length();
|
for (int i = 0; i < subLen; i++) {
|
stringBuilder.append(others);
|
}
|
currentValueStr = currentValueStr + stringBuilder;
|
}
|
return currentValueStr;
|
|
}
|
|
/**
|
* 切换住宅的时候订阅要全部取消
|
*/
|
public void removeAllTopic() {
|
if (null != sampleClient && sampleClient.isConnected() == false) {
|
return;
|
}
|
try {
|
if (lastTopicFilters.size() == 0) {
|
return;
|
}
|
LogUtils.d(TAG, "移除主题:\r\n" + JSON.toJSONString(lastTopicFilters));
|
sampleClient.unsubscribe(lastTopicFilters.toArray(new String[lastTopicFilters.size()]));
|
lastTopicFilters.clear();
|
} catch (MqttException e) {
|
e.printStackTrace();
|
}
|
}
|
|
public void stop() {
|
if (mqttRecvClient != null) {
|
try {
|
if (mqttRecvClient.sampleClient != null) {
|
mqttRecvClient.sampleClient.disconnect();
|
mqttRecvClient.sampleClient.close();
|
mqttRecvClient = null;
|
}
|
} catch (MqttException e) {
|
e.printStackTrace();
|
}
|
}
|
}
|
|
|
}
|