package com.hdl.sdk.link.core.utils;
|
|
import com.hdl.sdk.link.common.event.EventDispatcher;
|
import com.hdl.sdk.link.common.utils.LogUtils;
|
import com.hdl.sdk.link.common.utils.ThreadToolUtils;
|
import com.hdl.sdk.link.core.bean.LinkPacket;
|
import com.hdl.sdk.link.core.bean.LinkResponse;
|
import com.hdl.sdk.link.core.config.HDLLinkConfig;
|
import com.hdl.sdk.link.core.connect.HDLConnectHelper;
|
|
import java.io.UnsupportedEncodingException;
|
import java.nio.charset.StandardCharsets;
|
import java.util.LinkedList;
|
import java.util.Queue;
|
import java.util.concurrent.ExecutorService;
|
|
/**
|
* Created by hxb on 2022/8/4.
|
*/
|
public class QueueUtils {
|
|
//还没解密的数据包
|
private final Queue<LinkPacket> linkPackets;
|
private final ExecutorService executorService;
|
//是否已经启动
|
private boolean started;
|
/**
|
* instance
|
*/
|
private volatile static QueueUtils instance;
|
|
private QueueUtils() {
|
linkPackets = new LinkedList<>();
|
executorService = ThreadToolUtils.getInstance().newFixedThreadPool(1);
|
}
|
|
/**
|
* getInstance
|
*
|
* @return AuthenticateConfig
|
*/
|
public static synchronized QueueUtils getInstance() {
|
if (instance == null) {
|
synchronized (QueueUtils.class) {
|
if (instance == null) {
|
instance = new QueueUtils();
|
}
|
}
|
}
|
return instance;
|
}
|
|
/**
|
* 增加接收到的每条数据
|
*
|
* @param linkPacket
|
*/
|
public void add(LinkPacket linkPacket) {
|
synchronized (linkPackets) {
|
linkPackets.add(linkPacket);
|
}
|
}
|
|
public LinkPacket poll() {
|
synchronized (linkPackets) {
|
return linkPackets.poll();
|
}
|
}
|
|
/**
|
* 启动处理接收到的数据
|
*/
|
public synchronized void start() {
|
if (started) {
|
return;
|
}
|
started = true;
|
executorService.execute(new Runnable() {
|
@Override
|
public void run() {
|
while (true) {
|
try {
|
LinkPacket linkPacket = poll();
|
if (linkPacket == null) {
|
Thread.sleep(10);
|
continue;
|
}
|
manager(linkPacket);
|
} catch (Exception e) {
|
LogUtils.e("处理接收到的数据异常:\r\n" + e.getMessage());
|
}
|
}
|
}
|
});
|
}
|
|
private void manager(LinkPacket linkPacket) throws UnsupportedEncodingException {
|
|
LinkResponse response = new LinkResponse();
|
response.setTopic(linkPacket.getTopic());
|
// GatewayBean gatewayBean = HDLLinkLocalGateway.getInstance().getGatewayByIpAddress(linkPacket.isIpAddress());
|
// boolean isEncrypt = false;
|
// if (gatewayBean != null) {
|
// isEncrypt = gatewayBean.getIsLocalEncrypt();
|
// }
|
// if (EncryptUtil.ifNeedEncrypt(response.getTopic(), isEncrypt)) {
|
// if (gatewayBean != null && gatewayBean.getIsLocalEncrypt() && encrypt(linkPacket.getBody())) {
|
if (!linkPacket.isCloudPacket() && encrypt(linkPacket.getBody())) {
|
//需要解密
|
byte[] bodyBytes = AesUtil.aesDecrypt(linkPacket.getBody(), HDLLinkConfig.getInstance().getLocalSecret());
|
if (bodyBytes != null) {
|
response.setData(new String(bodyBytes, StandardCharsets.UTF_8));
|
} else {
|
LogUtils.e("解密失败\r\n" + linkPacket.getTopic() + "\r\n" + ByteUtils.encodeHexString(linkPacket.getBody()));
|
response.setData(new String(linkPacket.getBody(), "utf-8"));
|
}
|
} else {
|
response.setData(new String(linkPacket.getBody(), "utf-8"));
|
}
|
// if (HDLConnectHelper.isLocal()) {
|
LogUtils.i("本地接收到数据:\r\n" + response.getTopic() + "\r\n" + response.getData());
|
// }
|
//解析完成,topic发送一次
|
EventDispatcher.getInstance().post(response.getTopic(), response);
|
}
|
|
//是否加密
|
private boolean encrypt(byte[] bytes) {
|
if (bytes[0] == '{' && bytes[bytes.length - 1] == '}' || (bytes[0] == '[' && bytes[bytes.length - 1] == ']')) {
|
return false;
|
}
|
return true;
|
}
|
}
|