New file |
| | |
| | | package com.hdl.sdk.link.core.utils.mqtt;
|
| | |
|
| | | import android.content.Context;
|
| | | import android.text.TextUtils;
|
| | |
|
| | | import com.alibaba.fastjson.JSON;
|
| | |
|
| | | import com.hdl.sdk.link.common.utils.LogUtils;
|
| | | import com.hdl.sdk.link.core.bean.LinkPacket;
|
| | | import com.hdl.sdk.link.core.bean.eventbus.EventBindMiniRemoteSuccessInfo;
|
| | | import com.hdl.sdk.link.core.bean.eventbus.EventNotifyRefreshGatewayAesKeyInfo;
|
| | | import com.hdl.sdk.link.core.bean.gateway.GatewayBean;
|
| | | import com.hdl.sdk.link.core.config.HDLLinkConfig;
|
| | | import com.hdl.sdk.link.core.connect.HDLConnectHelper;
|
| | | import com.hdl.sdk.link.core.protocol.LinkMessageDecoder;
|
| | | import com.hdl.sdk.link.core.utils.AesUtil;
|
| | | import com.hdl.sdk.link.core.utils.QueueUtils;
|
| | | import com.hdl.sdk.link.gateway.HDLLinkLocalGateway;
|
| | | import com.hdl.sdk.link.socket.bean.Packet;
|
| | |
|
| | | import org.eclipse.paho.client.mqttv3.IMqttActionListener;
|
| | | import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
|
| | | import org.eclipse.paho.client.mqttv3.IMqttToken;
|
| | | import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
|
| | | import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
|
| | | import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
|
| | | import org.eclipse.paho.client.mqttv3.MqttException;
|
| | | import org.eclipse.paho.client.mqttv3.MqttMessage;
|
| | | import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
|
| | | import org.greenrobot.eventbus.EventBus;
|
| | |
|
| | | import java.util.ArrayList;
|
| | | import java.util.List;
|
| | |
|
| | | /**
|
| | | * Created by Zoro on 2018/8/1.
|
| | | * desc:2019/7/24 连接成功以后再订阅 不需要发送心跳 不需要自己处理重连
|
| | | */
|
| | |
|
| | | public class MqttRecvClient {
|
| | | private static String mBroker;
|
| | | private MemoryPersistence persistence = new MemoryPersistence();
|
| | | private MqttAsyncClient sampleClient;
|
| | | private MqttConnectOptions connOpts = new MqttConnectOptions();
|
| | | private MqttThread mqttThread;
|
| | | private static volatile MqttRecvClient mqttRecvClient;
|
| | | private final String TAG = "MqttRecvClient";
|
| | | private static final String[] ignoreTopics = new String[]{"/thing/topo/found", "/ota/device/progress/up"};
|
| | | /**
|
| | | * 上次的主题需要记录 更改主题的时候需要取消订阅
|
| | | */
|
| | | private static List<String> lastTopicFilters = new ArrayList<>();
|
| | | private static String mClientId;
|
| | | private static String mUserName;
|
| | | private static String mPassWord;
|
| | | private final int[] qos = {0};
|
| | |
|
| | | private MqttRecvClient() {
|
| | | if (mqttThread == null) {
|
| | | mqttThread = new MqttThread();
|
| | | }
|
| | | if (TextUtils.isEmpty(mUserName) || TextUtils.isEmpty(mPassWord)) {
|
| | | return;
|
| | | }
|
| | | mqttThread.start();
|
| | | }
|
| | |
|
| | | public static void init(Context context, String broker1, String deviceId, String userName, String pwd) {
|
| | | mClientId = deviceId;
|
| | | mBroker = broker1;
|
| | | mUserName = userName;
|
| | | mPassWord = pwd;
|
| | | MqttRecvClient.create();
|
| | | }
|
| | |
|
| | | public void send(String topic, byte[] bytes) {
|
| | | try {
|
| | | if (TextUtils.isEmpty(topic)) {
|
| | | LogUtils.e("请求主题为null");
|
| | | return;
|
| | | }
|
| | | checkAndsubscribeAllTopics(topic);
|
| | | publish(topic, bytes);
|
| | | } catch (MqttException e) {
|
| | | e.printStackTrace();
|
| | | }
|
| | | }
|
| | |
|
| | | /**
|
| | | * 发布
|
| | | *
|
| | | * @param topic 主题
|
| | | * @param bytes 内容
|
| | | * @throws MqttException
|
| | | */
|
| | | public void publish(String topic, byte[] bytes) throws MqttException {
|
| | | //回复时,mqtt主题中的方向要变化,要做方向替换
|
| | | mqttRecvClient.sampleClient.publish(topic, bytes, 1, false, null, new IMqttActionListener() {
|
| | | @Override
|
| | | public void onSuccess(IMqttToken asyncActionToken) {
|
| | | LogUtils.d(TAG, topic);
|
| | | }
|
| | |
|
| | | @Override
|
| | | public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
|
| | | }
|
| | | });
|
| | | }
|
| | |
|
| | | public static void create() {
|
| | | if (mqttRecvClient == null) {
|
| | | mqttRecvClient = new MqttRecvClient();
|
| | | }
|
| | | }
|
| | |
|
| | | /**
|
| | | * 使用的时候需要判断非空
|
| | | */
|
| | | public static MqttRecvClient getInstance() {
|
| | | if (null == mqttRecvClient) {
|
| | | synchronized (MqttRecvClient.class) {
|
| | | if (null == mqttRecvClient) {
|
| | | mqttRecvClient = new MqttRecvClient();
|
| | | }
|
| | | return mqttRecvClient;
|
| | | }
|
| | | }
|
| | | return mqttRecvClient;
|
| | | }
|
| | |
|
| | | class MqttThread extends Thread {
|
| | | @Override
|
| | | public void run() {
|
| | | super.run();
|
| | | connect();
|
| | | }
|
| | | }
|
| | |
|
| | | private void connect() {
|
| | | try {
|
| | | if (sampleClient != null) {
|
| | | sampleClient.close();
|
| | | }
|
| | | sampleClient = new MqttAsyncClient(mBroker, mClientId, persistence);
|
| | | connOpts.setUserName(mUserName);
|
| | | // connOpts.setServerURIs(new String[]{mBroker});
|
| | | connOpts.setPassword(mPassWord.toCharArray());
|
| | | connOpts.setCleanSession(true);
|
| | | connOpts.setKeepAliveInterval(10);
|
| | | connOpts.setAutomaticReconnect(true);
|
| | | connOpts.setConnectionTimeout(10);
|
| | | connOpts.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);
|
| | | sampleClient.setCallback(new MqttCallbackExtended() {
|
| | | public void connectComplete(boolean reconnect, String serverURI) {
|
| | | LogUtils.d(TAG, "connect success");
|
| | | checkAndsubscribeAllTopics("");
|
| | | }
|
| | |
|
| | | public void connectionLost(Throwable throwable) {
|
| | | LogUtils.d(TAG, "连接断开");
|
| | | lastTopicFilters.clear();
|
| | | }
|
| | |
|
| | | public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
|
| | | managerMqttMsg(topic, mqttMessage);
|
| | | }
|
| | |
|
| | | public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
|
| | | }
|
| | | });
|
| | | sampleClient.connect(connOpts);
|
| | |
|
| | | } catch (Exception me) {
|
| | | me.printStackTrace();
|
| | | }
|
| | | }
|
| | |
|
| | | /**
|
| | | * 处理接收的mqtt数据
|
| | | *
|
| | | * @param topic 接收主题
|
| | | * @param mqttMessage 接收数据
|
| | | * @throws Exception
|
| | | */
|
| | | public void managerMqttMsg(String topic, MqttMessage mqttMessage) throws Exception {
|
| | | LogUtils.d(TAG, "\r\n" + "mqtt->远程回复主题" + topic);
|
| | | if (HDLConnectHelper.isLocal()) {
|
| | | boolean needReturn = true;
|
| | | //如果是本地模式,云端下来的网关数据部分数据不接收(如:ota升级反馈进度)
|
| | | for (String ignoreTopic : ignoreTopics) {
|
| | | if (topic.endsWith(ignoreTopic)) {
|
| | | needReturn = false;
|
| | | break;
|
| | | }
|
| | | }
|
| | | if (needReturn) {
|
| | | return;
|
| | | }
|
| | | }
|
| | | if (topic.contains("/custom/mqtt/secret/change")) {
|
| | | /**
|
| | | * 网关重连mqtt 需要更换aesKey 不然网关无法解密 通知刷新网关列表并且更新主网关的aesKey
|
| | | */
|
| | | String[] topics = topic.split("/");
|
| | | //非当前住宅网关的数据返回
|
| | | if (topics.length < 3) {
|
| | | return;
|
| | | }
|
| | | LogUtils.d(TAG, "网关重连mqtt秘钥更新通知->" + topic);
|
| | | EventNotifyRefreshGatewayAesKeyInfo eventNotifyRefreshGatewayAesKeyInfo = new EventNotifyRefreshGatewayAesKeyInfo();
|
| | | eventNotifyRefreshGatewayAesKeyInfo.setGatewayId(topics[2]);
|
| | | EventBus.getDefault().post(eventNotifyRefreshGatewayAesKeyInfo);
|
| | | return;
|
| | | }
|
| | |
|
| | | String[] topics = topic.split("/");
|
| | | //非当前住宅网关的数据返回
|
| | | if (topics.length < 3) {
|
| | | return;
|
| | | }
|
| | | String aes = null;
|
| | | String cloudsGatewayId = topics[2];//云端上GatewayId
|
| | | GatewayBean gatewayBean = HDLLinkLocalGateway.getInstance().getGatewayByOidOrGatewayId(cloudsGatewayId);
|
| | | if (cloudsGatewayId.equals(HDLLinkConfig.getInstance().getHomeId())) {
|
| | | aes = getHomeAES();
|
| | | } else if (gatewayBean != null && HDLConnectHelper.getGatewayTypeList().contains(gatewayBean.getGatewayType())) {
|
| | | //毫米波mqtt专用秘钥
|
| | | aes = gatewayBean.getAesKey();
|
| | | } else {
|
| | | aes = HDLLinkConfig.getInstance().getAesKey();
|
| | | }
|
| | | if (TextUtils.isEmpty(aes)) {
|
| | | return;
|
| | | }
|
| | |
|
| | | byte[] bytes = AesUtil.aesDecrypt(mqttMessage.getPayload(), aes);
|
| | | if (null == bytes) {
|
| | | LogUtils.d(TAG, "\r\n" + "mqtt->远程回复数据 密钥解密失败");
|
| | | return;
|
| | | }
|
| | | String bodyStr = new String(bytes);
|
| | | LogUtils.d(TAG, "\r\n" + "mqtt->远程回复数据" + bodyStr);
|
| | | /**
|
| | | * 红外宝设备通过/thing/topo/found主题 上报红外宝设备已经入网了 然后直接return 不需要再下行了
|
| | | */
|
| | | if (topic.endsWith("/thing/topo/found")) {
|
| | | /**
|
| | | * {"id":"0000016E","time_stamp":"366574","objects":[{"sid":"010105D370451908110100000000",
|
| | | * "name":"Mini智能遥控器","spk":"ir.module","oid":"010105D370451908","omodel":"MIR01R-LK.10",
|
| | | * "online":"true","attributes":[],"status":[],"from":"010105D370451908","src":"010105D370451908"}]}
|
| | | */
|
| | | if (bodyStr.contains("ir.module")) {
|
| | | EventBus.getDefault().post(new EventBindMiniRemoteSuccessInfo());
|
| | | return;
|
| | | }
|
| | | }
|
| | | //Link从网关透传特殊处理
|
| | | if (topic.contains("/native/a/") && topic.contains("/slaveoid/")) {
|
| | | LinkMessageDecoder.getInstance().read(new Packet(bytes));
|
| | | } else {
|
| | | byte[] topBytes = new byte[3];
|
| | | if (bytes.length > 2) {
|
| | | topBytes[0] = bytes[0];
|
| | | topBytes[1] = bytes[1];
|
| | | topBytes[2] = bytes[2];
|
| | | }
|
| | | if (new String(topBytes).equals("hex")) {//link原生数据,判断是否是文件处理通知
|
| | | LinkMessageDecoder.getInstance().read(new Packet(bytes));
|
| | | } else {
|
| | | QueueUtils.getInstance().add(new LinkPacket(topic, bytes, true));
|
| | | }
|
| | | }
|
| | | }
|
| | |
|
| | | /**
|
| | | * 检查主题是否已订阅,没有订阅就订阅
|
| | | *
|
| | | * @param sendTopic 请求主题
|
| | | */
|
| | | public synchronized void checkAndsubscribeAllTopics(String sendTopic) {
|
| | | if (null != sampleClient && sampleClient.isConnected() == false) {
|
| | | return;
|
| | | }
|
| | | try {
|
| | | for (String topic : nativeAndLinkTopic(sendTopic)) {
|
| | | if (lastTopicFilters.contains(topic)) {
|
| | | continue;
|
| | | }
|
| | | LogUtils.d(TAG, "订阅主题" + topic);
|
| | |
|
| | | sampleClient.subscribe(topic, 0, null, new IMqttActionListener() {
|
| | | @Override
|
| | | public void onSuccess(IMqttToken asyncActionToken) {
|
| | | if (!lastTopicFilters.contains(topic)) {
|
| | | lastTopicFilters.add(topic);
|
| | | }
|
| | | }
|
| | |
|
| | | @Override
|
| | | public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
|
| | |
|
| | | }
|
| | | });
|
| | | }
|
| | |
|
| | | } catch (MqttException e) {
|
| | | e.printStackTrace();
|
| | | }
|
| | | }
|
| | |
|
| | | /**
|
| | | * 订阅平台下来的主题
|
| | | *
|
| | | * @return string数组
|
| | | */
|
| | | private String[] nativeAndLinkTopic(String sendTopic) {
|
| | | String[] topicArray = sendTopic.split("/");
|
| | | //非当前住宅网关的数据返回
|
| | | if (topicArray.length < 3) {
|
| | | return new String[]{
|
| | | String.format("/user/%s/#", HDLLinkConfig.getInstance().getGatewayId()),
|
| | | String.format("/base/%s/#", HDLLinkConfig.getInstance().getGatewayId())};
|
| | | }
|
| | |
|
| | | String gatewayId = topicArray[2];//云端上GatewayId
|
| | | String[] topics = {
|
| | | String.format("/user/%s/#", HDLLinkConfig.getInstance().getGatewayId()),
|
| | | String.format("/base/%s/#", HDLLinkConfig.getInstance().getGatewayId()),
|
| | | String.format("/user/%s/#", gatewayId),
|
| | | String.format("/base/%s/#", gatewayId),
|
| | | };
|
| | | return topics;
|
| | | }
|
| | |
|
| | |
|
| | | /**
|
| | | * APP订阅云端主题解密密文的秘钥(这个密钥只能用于订阅云端主题,订阅网关主题另外一个密钥)
|
| | | *
|
| | | * @return -返回解密密文的秘钥
|
| | | */
|
| | | private String getHomeAES() {
|
| | | String homeId = HDLLinkConfig.getInstance().getHomeId();
|
| | | if (TextUtils.isEmpty(homeId)) {
|
| | | return null;
|
| | | }
|
| | | //解密密钥规则:已现有的住宅ID为基准,从右边一一获取值,最后如果不够16位,则往右补零
|
| | | StringBuilder aesKey = new StringBuilder();
|
| | | for (int i = homeId.length() - 1; i >= 0; i--) {
|
| | | aesKey.append(homeId.charAt(i));
|
| | | if (aesKey.length() == 16) {
|
| | | break;
|
| | | }
|
| | | }
|
| | | return this.PadRight(aesKey.toString(), 16, "0");
|
| | | }
|
| | |
|
| | | /**
|
| | | * 从右边添加空格或其它字符
|
| | | *
|
| | | * @param currentValueStr 当前值
|
| | | * @param count 总长数
|
| | | * @param others 其它字符(自定义)
|
| | | * @return 总长度
|
| | | */
|
| | | private String PadRight(String currentValueStr, int count, String others) {
|
| | | if (count > currentValueStr.length()) {
|
| | | StringBuilder stringBuilder = new StringBuilder();
|
| | | int subLen = count - currentValueStr.length();
|
| | | for (int i = 0; i < subLen; i++) {
|
| | | stringBuilder.append(others);
|
| | | }
|
| | | currentValueStr = currentValueStr + stringBuilder;
|
| | | }
|
| | | return currentValueStr;
|
| | |
|
| | | }
|
| | |
|
| | | /**
|
| | | * 切换住宅的时候订阅要全部取消
|
| | | */
|
| | | public void removeAllTopic() {
|
| | | if (null != sampleClient && sampleClient.isConnected() == false) {
|
| | | return;
|
| | | }
|
| | | try {
|
| | | if (lastTopicFilters.size() == 0) {
|
| | | return;
|
| | | }
|
| | | LogUtils.d(TAG, "移除主题:\r\n" + JSON.toJSONString(lastTopicFilters));
|
| | | sampleClient.unsubscribe(lastTopicFilters.toArray(new String[lastTopicFilters.size()]));
|
| | | lastTopicFilters.clear();
|
| | | } catch (MqttException e) {
|
| | | e.printStackTrace();
|
| | | }
|
| | | }
|
| | |
|
| | | public void stop() {
|
| | | if (mqttRecvClient != null) {
|
| | | try {
|
| | | if (mqttRecvClient.sampleClient != null) {
|
| | | mqttRecvClient.sampleClient.disconnect();
|
| | | mqttRecvClient.sampleClient.close();
|
| | | mqttRecvClient = null;
|
| | | }
|
| | | } catch (MqttException e) {
|
| | | e.printStackTrace();
|
| | | }
|
| | | }
|
| | | }
|
| | |
|
| | |
|
| | | }
|