wjc
2023-06-28 14de918a79943e4961b09fa01ed320c6cad41f2e
HDLLinkLocalSdk/src/main/java/com/hdl/sdk/link/core/utils/QueueUtils.java
New file
@@ -0,0 +1,134 @@
package com.hdl.sdk.link.core.utils;
import com.hdl.sdk.link.common.event.EventDispatcher;
import com.hdl.sdk.link.common.utils.LogUtils;
import com.hdl.sdk.link.common.utils.ThreadToolUtils;
import com.hdl.sdk.link.core.bean.LinkPacket;
import com.hdl.sdk.link.core.bean.LinkResponse;
import com.hdl.sdk.link.core.config.HDLLinkConfig;
import com.hdl.sdk.link.core.connect.HDLConnectHelper;
import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
/**
 * Created by hxb on 2022/8/4.
 */
public class QueueUtils {
    //还没解密的数据包
    private final Queue<LinkPacket> linkPackets;
    private final ExecutorService executorService;
    //是否已经启动
    private boolean started;
    /**
     * instance
     */
    private volatile static QueueUtils instance;
    private QueueUtils() {
        linkPackets = new LinkedList<>();
        executorService = ThreadToolUtils.getInstance().newFixedThreadPool(1);
    }
    /**
     * getInstance
     *
     * @return AuthenticateConfig
     */
    public static synchronized QueueUtils getInstance() {
        if (instance == null) {
            synchronized (QueueUtils.class) {
                if (instance == null) {
                    instance = new QueueUtils();
                }
            }
        }
        return instance;
    }
    /**
     * 增加接收到的每条数据
     *
     * @param linkPacket
     */
    public void add(LinkPacket linkPacket) {
        synchronized (linkPackets) {
            linkPackets.add(linkPacket);
        }
    }
    public LinkPacket poll() {
        synchronized (linkPackets) {
            return linkPackets.poll();
        }
    }
    /**
     * 启动处理接收到的数据
     */
    public synchronized void start() {
        if (started) {
            return;
        }
        started = true;
        executorService.execute(new Runnable() {
            @Override
            public void run() {
                while (true) {
                    try {
                        LinkPacket linkPacket = poll();
                        if (linkPacket == null) {
                            Thread.sleep(10);
                            continue;
                        }
                        manager(linkPacket);
                    } catch (Exception e) {
                        LogUtils.e("处理接收到的数据异常:\r\n" + e.getMessage());
                    }
                }
            }
        });
    }
    private void manager(LinkPacket linkPacket) throws UnsupportedEncodingException {
        LinkResponse response = new LinkResponse();
        response.setTopic(linkPacket.getTopic());
//        GatewayBean gatewayBean = HDLLinkLocalGateway.getInstance().getGatewayByIpAddress(linkPacket.isIpAddress());
//        boolean isEncrypt = false;
//        if (gatewayBean != null) {
//            isEncrypt = gatewayBean.getIsLocalEncrypt();
//        }
//        if (EncryptUtil.ifNeedEncrypt(response.getTopic(), isEncrypt)) {
//        if (gatewayBean != null && gatewayBean.getIsLocalEncrypt() && encrypt(linkPacket.getBody())) {
        if (!linkPacket.isCloudPacket() && encrypt(linkPacket.getBody())) {
            //需要解密
            byte[] bodyBytes = AesUtil.aesDecrypt(linkPacket.getBody(), HDLLinkConfig.getInstance().getLocalSecret());
            if (bodyBytes != null) {
                response.setData(new String(bodyBytes, StandardCharsets.UTF_8));
            } else {
                LogUtils.e("解密失败\r\n" + linkPacket.getTopic() + "\r\n" + ByteUtils.encodeHexString(linkPacket.getBody()));
                response.setData(new String(linkPacket.getBody(), "utf-8"));
            }
        } else {
            response.setData(new String(linkPacket.getBody(), "utf-8"));
        }
        if (HDLConnectHelper.isLocal()) {
            LogUtils.i("本地接收到数据:\r\n" + response.getTopic() + "\r\n" + response.getData());
        }
        //解析完成,topic发送一次
        EventDispatcher.getInstance().post(response.getTopic(), response);
    }
    //是否加密
    private boolean encrypt(byte[] bytes) {
        if (bytes[0] == '{' && bytes[bytes.length - 1] == '}' || (bytes[0] == '[' && bytes[bytes.length - 1] == ']')) {
            return false;
        }
        return true;
    }
}