package com.hdl.sdk.link.core.utils.mqtt;
|
|
import android.os.SystemClock;
|
import android.text.TextUtils;
|
|
import com.alibaba.fastjson.JSON;
|
import com.hdl.sdk.link.common.utils.LogUtils;
|
import com.hdl.sdk.link.common.utils.ThreadToolUtils;
|
import com.hdl.sdk.link.core.bean.LinkPacket;
|
import com.hdl.sdk.link.core.bean.eventbus.BaseEventBus;
|
import com.hdl.sdk.link.core.bean.eventbus.EventBindMiniRemoteSuccessInfo;
|
import com.hdl.sdk.link.core.bean.eventbus.EventNotifyRefreshGatewayAesKeyInfo;
|
import com.hdl.sdk.link.core.bean.gateway.GatewayBean;
|
import com.hdl.sdk.link.core.config.HDLLinkConfig;
|
import com.hdl.sdk.link.core.connect.HDLConnectHelper;
|
import com.hdl.sdk.link.core.protocol.LinkMessageDecoder;
|
import com.hdl.sdk.link.core.utils.AesUtil;
|
import com.hdl.sdk.link.core.utils.ByteUtils;
|
import com.hdl.sdk.link.core.utils.JsonUtils;
|
import com.hdl.sdk.link.core.utils.QueueUtils;
|
import com.hdl.sdk.link.gateway.HDLLinkLocalGateway;
|
import com.hdl.sdk.link.socket.bean.Packet;
|
|
import org.eclipse.paho.client.mqttv3.IMqttActionListener;
|
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
|
import org.eclipse.paho.client.mqttv3.IMqttToken;
|
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
|
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
|
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
|
import org.eclipse.paho.client.mqttv3.MqttException;
|
import org.eclipse.paho.client.mqttv3.MqttMessage;
|
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
|
import org.greenrobot.eventbus.EventBus;
|
|
import java.util.ArrayList;
|
import java.util.List;
|
import java.util.concurrent.ExecutorService;
|
|
/**
|
* Created by Zoro on 2018/8/1.
|
* desc:2019/7/24 连接成功以后再订阅 不需要发送心跳 不需要自己处理重连
|
*/
|
|
public class MqttRecvClient {
|
private final String TAG = "MqttRecvClient";
|
private static volatile MqttRecvClient mqttRecvClient;
|
private MqttAsyncClient sampleClient;
|
private String mBroker;
|
private String mClientId;
|
private String mUserName;
|
private String mPassWord;
|
private final String[] ignoreTopics = new String[]{"/thing/topo/found", "/ota/device/progress/up"};
|
/**
|
* 上次的主题需要记录 更改主题的时候需要取消订阅
|
*/
|
private List<String> lastTopicFilters = new ArrayList<>();
|
private final int connectionTimeout = 8;//秒
|
private final int keepAliveInterval = 10;
|
|
/**
|
* 设置远程连接参数
|
*
|
* @param broker 地址
|
* @param clientId 客户端Id
|
* @param userName 用户名
|
* @param pwd 密码
|
*/
|
public void setConnectParam(String broker, String clientId, String userName, String pwd) {
|
mBroker = broker;
|
mClientId = clientId;
|
mUserName = userName;
|
mPassWord = pwd;
|
}
|
|
/**
|
* 发送数据
|
*
|
* @param topic
|
* @param bytes
|
*/
|
public void send(String topic, byte[] bytes) {
|
try {
|
if (TextUtils.isEmpty(topic) || bytes == null) {
|
LogUtils.e(TAG, "数据不发送,Topic:" + topic + " Bytes:" + bytes);
|
return;
|
}
|
checkAndsubscribeAllTopics(topic);
|
publish(topic, bytes);
|
} catch (MqttException e) {
|
e.printStackTrace();
|
}
|
}
|
|
/**
|
* 发布
|
*
|
* @param topic 主题
|
* @param bytes 内容
|
* @throws MqttException
|
*/
|
public void publish(String topic, byte[] bytes) throws MqttException {
|
if (sampleClient == null) {
|
LogUtils.i(TAG, "Mqtt未初始化");
|
return;
|
}
|
//如果还没有连接成功,可以试着休眠等一下。这种情况可能是刚开始连接,接着马上发送数据
|
if (!sampleClient.isConnected()) {
|
reConnect();
|
SystemClock.sleep(500);
|
}
|
//回复时,mqtt主题中的方向要变化,要做方向替换
|
/**
|
* 从网关会出现mqtt发出去失败 所以改成模式2
|
*/
|
int qos = 0;
|
if(topic.contains("/custom/native/zigbee/down/slaveoid/")){
|
qos = 2;
|
}
|
sampleClient.publish(topic, bytes, qos, false, null, new IMqttActionListener() {
|
@Override
|
public void onSuccess(IMqttToken asyncActionToken) {
|
LogUtils.d(TAG, "mqtt发送成功,Topic:" + topic);
|
}
|
|
@Override
|
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
|
LogUtils.d(TAG, "mqtt发送失败,Topic:" + topic);
|
}
|
});
|
}
|
|
/**
|
* 使用的时候需要判断非空
|
*/
|
public static MqttRecvClient getInstance() {
|
if (null == mqttRecvClient) {
|
synchronized (MqttRecvClient.class) {
|
if (null == mqttRecvClient) {
|
mqttRecvClient = new MqttRecvClient();
|
}
|
}
|
}
|
return mqttRecvClient;
|
}
|
|
/**
|
* 重新连接
|
*/
|
public void reConnect() {
|
try {
|
if (sampleClient == null) {
|
return;
|
}
|
LogUtils.i(TAG, "mqtt重新连接");
|
//重新连接
|
sampleClient.reconnect();
|
} catch (Exception e) {
|
}
|
}
|
|
/**
|
* 连接mqtt服务器
|
*/
|
public void connect() {
|
try {
|
//先关闭之前的连接
|
disConnect();
|
|
if (TextUtils.isEmpty(mBroker) || TextUtils.isEmpty(mClientId) || TextUtils.isEmpty(mUserName) || TextUtils.isEmpty(mPassWord)) {
|
LogUtils.i(TAG, "连接参数为空 mBroker:" + mBroker + " mClientId:" + mClientId + " mUserName:" + mUserName + " mPassWord:" + mPassWord);
|
return;
|
}
|
sampleClient = new MqttAsyncClient(mBroker, mClientId, new MemoryPersistence());
|
MqttConnectOptions connOpts = new MqttConnectOptions();
|
connOpts.setUserName(mUserName);
|
// connOpts.setServerURIs(new String[]{mBroker});
|
connOpts.setPassword(mPassWord.toCharArray());
|
connOpts.setCleanSession(true);
|
connOpts.setKeepAliveInterval(keepAliveInterval);
|
connOpts.setAutomaticReconnect(true);
|
connOpts.setConnectionTimeout(connectionTimeout);
|
connOpts.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);
|
connOpts.setMaxInflight(1000);
|
sampleClient.setCallback(new MqttCallbackExtended() {
|
public void connectComplete(boolean reconnect, String serverURI) {
|
LogUtils.d(TAG, "连接成功");
|
checkAndsubscribeAllTopics("");
|
}
|
|
public void connectionLost(Throwable throwable) {
|
LogUtils.d(TAG, "连接断开");
|
lastTopicFilters.clear();
|
}
|
|
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
|
managerMqttMsg(topic, mqttMessage);
|
}
|
|
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
|
}
|
});
|
LogUtils.d(TAG, "开始连接,连接参数 mBroker:" + mBroker + " mClientId:" + mClientId + " mUserName:" + mUserName + " mPassWord:" + mPassWord);
|
sampleClient.connect(connOpts);
|
} catch (Exception me) {
|
me.printStackTrace();
|
}
|
}
|
|
/**
|
* 处理接收的mqtt数据
|
*
|
* @param topic 接收主题
|
* @param mqttMessage 接收数据
|
* @throws Exception
|
*/
|
private void managerMqttMsg(String topic, MqttMessage mqttMessage) throws Exception {
|
LogUtils.d(TAG, "远程接收主题,Topic:" + topic);
|
if (HDLConnectHelper.isLocal()) {
|
boolean needReturn = false;
|
//如果是本地模式,云端下来的网关数据部分数据不接收(如:ota升级反馈进度)
|
for (String ignoreTopic : ignoreTopics) {
|
if (topic.endsWith(ignoreTopic)) {
|
needReturn = true;
|
break;
|
}
|
}
|
if (needReturn) {
|
return;
|
}
|
}
|
if (topic.contains("/custom/mqtt/secret/change")) {
|
/**
|
* 网关重连mqtt 需要更换aesKey 不然网关无法解密 通知刷新网关列表并且更新主网关的aesKey
|
*/
|
String[] topics = topic.split("/");
|
//非当前住宅网关的数据返回
|
if (topics.length < 3) {
|
return;
|
}
|
LogUtils.d(TAG, "网关重连mqtt秘钥更新通知" + topic);
|
|
BaseEventBus baseEventBus = new BaseEventBus();
|
baseEventBus.setTopic(topic);
|
EventBus.getDefault().post(baseEventBus);
|
|
EventNotifyRefreshGatewayAesKeyInfo eventNotifyRefreshGatewayAesKeyInfo = new EventNotifyRefreshGatewayAesKeyInfo();
|
eventNotifyRefreshGatewayAesKeyInfo.setGatewayId(topics[2]);
|
EventBus.getDefault().post(eventNotifyRefreshGatewayAesKeyInfo);
|
return;
|
}
|
|
String[] topics = topic.split("/");
|
//非当前住宅网关的数据返回
|
if (topics.length < 3) {
|
return;
|
}
|
String aes = HDLLinkConfig.getInstance().getAesKey();//默认用主网关的密钥
|
String cloudsGatewayId = topics[2];//云端上GatewayId
|
GatewayBean gatewayBean = HDLLinkLocalGateway.getInstance().getGatewayByOidOrGatewayId(cloudsGatewayId);
|
//云端转换网关的数据,非网关直接过来的数据,这个用的是住宅密钥
|
if (cloudsGatewayId.equals(HDLLinkConfig.getInstance().getHomeId())) {
|
aes = getHomeAES();
|
} else {
|
//网关的数据
|
if (gatewayBean != null) {
|
//毫米波mqtt专用秘钥、逆变器mqtt专用秘钥
|
aes = gatewayBean.getAesKey();
|
}
|
}
|
|
if (TextUtils.isEmpty(aes)) {
|
LogUtils.e("找不到远程解密的密钥,这个问题要排期解决");
|
return;
|
}
|
|
byte[] bytes = AesUtil.aesDecrypt(mqttMessage.getPayload(), aes);
|
if (null == bytes) {
|
LogUtils.i(TAG, "远程回复数据 密钥解密失败");
|
return;
|
}
|
//zigbee数据比较特殊,是json,但前面有其它数据
|
if (JsonUtils.isJson(bytes) || topic.contains("/custom/native/zigbee/up")) {
|
LogUtils.i(TAG, "远程接收数据,Payload:" + new String(bytes));
|
} else {
|
LogUtils.i(TAG, "远程接收数据,Payload:" + ByteUtils.encodeHexString(bytes));
|
}
|
/**
|
* 红外宝设备通过/thing/topo/found主题 上报红外宝设备已经入网了 然后直接return 不需要再下行了
|
*/
|
if (topic.endsWith("/thing/topo/found")) {
|
/**
|
* {"id":"0000016E","time_stamp":"366574","objects":[{"sid":"010105D370451908110100000000",
|
* "name":"Mini智能遥控器","spk":"ir.module","oid":"010105D370451908","omodel":"MIR01R-LK.10",
|
* "online":"true","attributes":[],"status":[],"from":"010105D370451908","src":"010105D370451908"}]}
|
*/
|
if (new String(bytes).contains("ir.module")) {
|
EventBus.getDefault().post(new EventBindMiniRemoteSuccessInfo());
|
return;
|
}
|
}
|
//Link从网关透传特殊处理
|
if (topic.contains("/native/a/") && topic.contains("/slaveoid/")) {
|
LinkMessageDecoder.getInstance().read(new Packet(bytes));
|
} else {
|
byte[] topBytes = new byte[3];
|
if (bytes.length > 2) {
|
topBytes[0] = bytes[0];
|
topBytes[1] = bytes[1];
|
topBytes[2] = bytes[2];
|
}
|
if (new String(topBytes).equals("hex")) {//link原生数据,判断是否是文件处理通知
|
LinkMessageDecoder.getInstance().read(new Packet(bytes));
|
} else {
|
QueueUtils.getInstance().add(new LinkPacket(topic, bytes, true));
|
}
|
}
|
}
|
|
/**
|
* 检查主题是否已订阅,没有订阅就订阅
|
*
|
* @param sendTopic 请求主题
|
*/
|
public synchronized void checkAndsubscribeAllTopics(String sendTopic) {
|
// LogUtils.d(TAG, "收到订阅主题\r\n" + sendTopic);
|
if (null == sampleClient) {
|
return;
|
}
|
if (sampleClient.isConnected() == false) {
|
return;
|
}
|
try {
|
String topics[] = nativeAndLinkTopic(sendTopic);
|
for (String topic : topics) {
|
if (lastTopicFilters.contains(topic)) {
|
continue;
|
}
|
LogUtils.d(TAG, "订阅主题:" + topic);
|
sampleClient.subscribe(topic, 0, null, new IMqttActionListener() {
|
@Override
|
public void onSuccess(IMqttToken asyncActionToken) {
|
if (!lastTopicFilters.contains(topic)) {
|
lastTopicFilters.add(topic);
|
}
|
}
|
|
@Override
|
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
|
|
}
|
});
|
}
|
|
} catch (MqttException e) {
|
e.printStackTrace();
|
}
|
}
|
|
/**
|
* 订阅平台下来的主题
|
*
|
* @return string数组
|
*/
|
private String[] nativeAndLinkTopic(String sendTopic) {
|
String[] topicArray = sendTopic.split("/");
|
//非当前住宅网关的数据返回,默认先订阅当前
|
if (topicArray.length < 3) {
|
if (TextUtils.isEmpty(HDLLinkConfig.getInstance().getGatewayId())) {
|
return new String[]{};
|
}
|
//默认订阅主网关的信息
|
return new String[]{
|
String.format("/user/%s/#", HDLLinkConfig.getInstance().getGatewayId()),
|
String.format("/base/%s/#", HDLLinkConfig.getInstance().getGatewayId())};
|
}
|
|
String gatewayId = topicArray[2];//云端上GatewayId
|
if (gatewayId.equals(HDLLinkConfig.getInstance().getGatewayId())) {
|
return new String[]{
|
String.format("/user/%s/#", gatewayId),
|
String.format("/base/%s/#", gatewayId)
|
};
|
} else {
|
return new String[]{
|
String.format("/user/%s/#", gatewayId),
|
String.format("/base/%s/#", gatewayId),
|
String.format("/user/%s/#", HDLLinkConfig.getInstance().getGatewayId()),
|
String.format("/base/%s/#", HDLLinkConfig.getInstance().getGatewayId())
|
};
|
}
|
}
|
|
|
/**
|
* APP订阅云端主题解密密文的秘钥(这个密钥只能用于订阅云端主题,订阅网关主题另外一个密钥)
|
*
|
* @return -返回解密密文的秘钥
|
*/
|
private String getHomeAES() {
|
String homeId = HDLLinkConfig.getInstance().getHomeId();
|
if (TextUtils.isEmpty(homeId)) {
|
return null;
|
}
|
//解密密钥规则:已现有的住宅ID为基准,从右边一一获取值,最后如果不够16位,则往右补零
|
StringBuilder aesKey = new StringBuilder();
|
for (int i = homeId.length() - 1; i >= 0; i--) {
|
aesKey.append(homeId.charAt(i));
|
if (aesKey.length() == 16) {
|
break;
|
}
|
}
|
return this.PadRight(aesKey.toString(), 16, "0");
|
}
|
|
/**
|
* 从右边添加空格或其它字符
|
*
|
* @param currentValueStr 当前值
|
* @param count 总长数
|
* @param others 其它字符(自定义)
|
* @return 总长度
|
*/
|
private String PadRight(String currentValueStr, int count, String others) {
|
if (count > currentValueStr.length()) {
|
StringBuilder stringBuilder = new StringBuilder();
|
int subLen = count - currentValueStr.length();
|
for (int i = 0; i < subLen; i++) {
|
stringBuilder.append(others);
|
}
|
currentValueStr = currentValueStr + stringBuilder;
|
}
|
return currentValueStr;
|
|
}
|
|
/**
|
* 切换住宅的时候订阅要全部取消
|
*/
|
public void removeAllTopic() {
|
if (null == sampleClient) {
|
return;
|
}
|
if (sampleClient.isConnected() == false) {
|
return;
|
}
|
try {
|
if (lastTopicFilters.size() == 0) {
|
return;
|
}
|
LogUtils.d(TAG, "移除主题\r\n" + JSON.toJSONString(lastTopicFilters));
|
sampleClient.unsubscribe(lastTopicFilters.toArray(new String[lastTopicFilters.size()]));
|
lastTopicFilters.clear();
|
} catch (MqttException e) {
|
e.printStackTrace();
|
}
|
}
|
|
/**
|
* 关闭当前连接,一般不要用,只创建一个连接就好了
|
*/
|
public void disConnect() {
|
try {
|
if (sampleClient == null) {
|
return;
|
}
|
LogUtils.i(TAG, "断开mqtt连接");
|
sampleClient.disconnect();
|
sampleClient = null;//不是置空mqttRecvClient,而是mqttRecvClient.sampleClient
|
lastTopicFilters.clear();
|
} catch (MqttException e) {
|
e.printStackTrace();
|
}
|
}
|
|
public String getmBroker() {
|
return mBroker;
|
}
|
|
public void setmBroker(String mBroker) {
|
this.mBroker = mBroker;
|
}
|
|
public String getmClientId() {
|
return mClientId;
|
}
|
|
public void setmClientId(String mClientId) {
|
this.mClientId = mClientId;
|
}
|
|
public String getmUserName() {
|
return mUserName;
|
}
|
|
public void setmUserName(String mUserName) {
|
this.mUserName = mUserName;
|
}
|
|
public String getmPassWord() {
|
return mPassWord;
|
}
|
|
public void setmPassWord(String mPassWord) {
|
this.mPassWord = mPassWord;
|
}
|
|
/**
|
* 是否已经初始化及调用连接
|
*
|
* @return
|
*/
|
public boolean isInit() {
|
return sampleClient != null;
|
}
|
|
public boolean isConnected() {
|
if (sampleClient == null) {
|
return false;
|
}
|
return sampleClient.isConnected();
|
}
|
}
|