package com.hdl.sdk.link.core.utils;
|
|
import android.util.Log;
|
|
import com.hdl.sdk.link.common.event.EventDispatcher;
|
import com.hdl.sdk.link.common.utils.LogUtils;
|
import com.hdl.sdk.link.common.utils.ThreadToolUtils;
|
import com.hdl.sdk.link.core.bean.LinkPacket;
|
import com.hdl.sdk.link.core.bean.LinkResponse;
|
import com.hdl.sdk.link.core.config.HDLLinkConfig;
|
import com.hdl.sdk.link.enums.NativeType;
|
|
import java.io.UnsupportedEncodingException;
|
import java.nio.charset.StandardCharsets;
|
import java.util.LinkedList;
|
import java.util.Queue;
|
import java.util.concurrent.ExecutorService;
|
|
/**
|
* Created by hxb on 2022/8/4.
|
*/
|
public class QueueUtils {
|
|
private static final String TAG = "QueueUtils";
|
private static final String busProTAG = "BusPro";
|
private static final String zigbeeTAG = "Zigbee";
|
private static final String inverterTAG = "Inverter";
|
private static final String knxTAG = "KNX";
|
private static final String linkTAG = "Link";
|
|
//还没解密的数据包
|
private final Queue<LinkPacket> linkPackets;
|
private final ExecutorService executorService;
|
//是否已经启动
|
private boolean started;
|
/**
|
* instance
|
*/
|
private volatile static QueueUtils instance;
|
|
private QueueUtils() {
|
linkPackets = new LinkedList<>();
|
executorService = ThreadToolUtils.getInstance().newFixedThreadPool(1);
|
}
|
|
/**
|
* getInstance
|
*
|
* @return AuthenticateConfig
|
*/
|
public static synchronized QueueUtils getInstance() {
|
if (instance == null) {
|
synchronized (QueueUtils.class) {
|
if (instance == null) {
|
instance = new QueueUtils();
|
}
|
}
|
}
|
return instance;
|
}
|
|
/**
|
* 增加接收到的每条数据
|
*
|
* @param linkPacket
|
*/
|
public void add(LinkPacket linkPacket) {
|
synchronized (linkPackets) {
|
linkPackets.add(linkPacket);
|
}
|
}
|
|
public LinkPacket poll() {
|
synchronized (linkPackets) {
|
return linkPackets.poll();
|
}
|
}
|
|
/**
|
* 启动处理接收到的数据
|
*/
|
public synchronized void start() {
|
if (started) {
|
return;
|
}
|
started = true;
|
executorService.execute(new Runnable() {
|
@Override
|
public void run() {
|
while (true) {
|
try {
|
LinkPacket linkPacket = poll();
|
if (linkPacket == null) {
|
Thread.sleep(10);
|
continue;
|
}
|
manager(linkPacket);
|
} catch (Exception e) {
|
LogUtils.e(TAG, "处理接收到的数据异常\r\n" + e.getMessage());
|
}
|
}
|
}
|
});
|
}
|
|
private void manager(LinkPacket linkPacket) throws UnsupportedEncodingException {
|
// LogUtils.e(TAG, "===11");
|
LinkResponse response = new LinkResponse();
|
String topic = linkPacket.getTopic();
|
response.setTopic(topic);
|
|
byte[] bodyBytes = linkPacket.getBody();
|
|
//云端数据已经外部打印,这里不用再打印
|
// if(!linkPacket.isCloudPacket()) {
|
// LogUtils.i(TAG, "接收原始数据,Topic:" + response.getTopic() + "\r\nPayload:" + ByteUtils.encodeHexString(bodyBytes));
|
// }
|
//非云端数据,尝试解密。云端数据已经在外部解密
|
if (!linkPacket.isCloudPacket()) {
|
//需要解密
|
byte[] decryptBytes = AesUtil.aesDecrypt(bodyBytes, HDLLinkConfig.getInstance().getLocalSecret());
|
if (decryptBytes == null) {
|
//如果是明文,或者zigbee的数据可以不用打印
|
if (!JsonUtils.isJson(bodyBytes) && !topic.contains("/custom/native/zigbee/up")) {
|
LogUtils.e(TAG, "本地接收数据解密失败,Topic:" + topic + "\r\nPayload:" + ByteUtils.encodeHexString(bodyBytes) + "\r\nSecret:" + HDLLinkConfig.getInstance().getLocalSecret());
|
}
|
} else {
|
bodyBytes = decryptBytes;
|
}
|
}
|
// LogUtils.e(TAG, "===12");
|
response.setByteData(bodyBytes);
|
|
//zigbee数据比较特殊,是json,但前面有其它数据
|
if (JsonUtils.isJson(response.getByteData()) || topic.contains("/custom/native/zigbee/up")) {
|
response.setData(new String(response.getByteData(), StandardCharsets.UTF_8));
|
} else {
|
response.setData(ByteUtils.encodeHexString(response.getByteData()));
|
}
|
// LogUtils.e(TAG, "===13");
|
//云端数据已经外部打印,这里不用再打印
|
if (!linkPacket.isCloudPacket()) {
|
// if (response.getTopic().endsWith("/custom/native/buspro/up")) {
|
// LogUtils.i(TAG, "本地接收数据,Topic:" + response.getTopic() + "\r\nPayload:" + response.getData());
|
// }
|
String tag;
|
if (topic.contains("/native/zigbee/")) {
|
tag = zigbeeTAG;
|
} else if (topic.contains("/native/buspro/")) {
|
tag = busProTAG;
|
} else if (topic.contains("/native/inverter/")) {
|
tag = inverterTAG;
|
} else if (topic.contains("/native/knx/")) {
|
tag = knxTAG;
|
} else {
|
tag = linkTAG;
|
}
|
LogUtils.i(tag, "本地接收数据,Topic:" + response.getTopic() + "\r\nPayload:" + response.getData());
|
|
}
|
// LogUtils.e(TAG, "===14");
|
//解析完成,topic发送一次
|
EventDispatcher.getInstance().post(response.getTopic(), response);
|
}
|
}
|