package com.hdl.sdk.link.core.utils; import android.util.Log; import com.hdl.sdk.link.common.event.EventDispatcher; import com.hdl.sdk.link.common.utils.LogUtils; import com.hdl.sdk.link.common.utils.ThreadToolUtils; import com.hdl.sdk.link.core.bean.LinkPacket; import com.hdl.sdk.link.core.bean.LinkResponse; import com.hdl.sdk.link.core.config.HDLLinkConfig; import com.hdl.sdk.link.enums.NativeType; import java.io.UnsupportedEncodingException; import java.nio.charset.StandardCharsets; import java.util.LinkedList; import java.util.Queue; import java.util.concurrent.ExecutorService; /** * Created by hxb on 2022/8/4. */ public class QueueUtils { private static final String TAG = "QueueUtils"; private static final String busProTAG = "BusPro"; private static final String zigbeeTAG = "Zigbee"; private static final String inverterTAG = "Inverter"; private static final String knxTAG = "KNX"; private static final String linkTAG = "Link"; //还没解密的数据包 private final Queue linkPackets; private final ExecutorService executorService; //是否已经启动 private boolean started; /** * instance */ private volatile static QueueUtils instance; private QueueUtils() { linkPackets = new LinkedList<>(); executorService = ThreadToolUtils.getInstance().newFixedThreadPool(1); } /** * getInstance * * @return AuthenticateConfig */ public static synchronized QueueUtils getInstance() { if (instance == null) { synchronized (QueueUtils.class) { if (instance == null) { instance = new QueueUtils(); } } } return instance; } /** * 增加接收到的每条数据 * * @param linkPacket */ public void add(LinkPacket linkPacket) { synchronized (linkPackets) { linkPackets.add(linkPacket); } } public LinkPacket poll() { synchronized (linkPackets) { return linkPackets.poll(); } } /** * 启动处理接收到的数据 */ public synchronized void start() { if (started) { return; } started = true; executorService.execute(new Runnable() { @Override public void run() { while (true) { try { LinkPacket linkPacket = poll(); if (linkPacket == null) { Thread.sleep(10); continue; } manager(linkPacket); } catch (Exception e) { LogUtils.e(TAG, "处理接收到的数据异常\r\n" + e.getMessage()); } } } }); } private void manager(LinkPacket linkPacket) throws UnsupportedEncodingException { // LogUtils.e(TAG, "===11"); LinkResponse response = new LinkResponse(); String topic = linkPacket.getTopic(); response.setTopic(topic); byte[] bodyBytes = linkPacket.getBody(); //云端数据已经外部打印,这里不用再打印 // if(!linkPacket.isCloudPacket()) { // LogUtils.i(TAG, "接收原始数据,Topic:" + response.getTopic() + "\r\nPayload:" + ByteUtils.encodeHexString(bodyBytes)); // } //非云端数据,尝试解密。云端数据已经在外部解密 if (!linkPacket.isCloudPacket()) { //需要解密 byte[] decryptBytes = AesUtil.aesDecrypt(bodyBytes, HDLLinkConfig.getInstance().getLocalSecret()); if (decryptBytes == null) { //如果是明文,或者zigbee的数据可以不用打印 if (!JsonUtils.isJson(bodyBytes) && !topic.contains("/custom/native/zigbee/up")) { LogUtils.e(TAG, "本地接收数据解密失败,Topic:" + topic + "\r\nPayload:" + ByteUtils.encodeHexString(bodyBytes) + "\r\nSecret:" + HDLLinkConfig.getInstance().getLocalSecret()); } } else { bodyBytes = decryptBytes; } } // LogUtils.e(TAG, "===12"); response.setByteData(bodyBytes); //zigbee数据比较特殊,是json,但前面有其它数据 if (JsonUtils.isJson(response.getByteData()) || topic.contains("/custom/native/zigbee/up")) { response.setData(new String(response.getByteData(), StandardCharsets.UTF_8)); } else { response.setData(ByteUtils.encodeHexString(response.getByteData())); } // LogUtils.e(TAG, "===13"); //云端数据已经外部打印,这里不用再打印 if (!linkPacket.isCloudPacket()) { // if (response.getTopic().endsWith("/custom/native/buspro/up")) { // LogUtils.i(TAG, "本地接收数据,Topic:" + response.getTopic() + "\r\nPayload:" + response.getData()); // } String tag; if (topic.contains("/native/zigbee/")) { tag = zigbeeTAG; } else if (topic.contains("/native/buspro/")) { tag = busProTAG; } else if (topic.contains("/native/inverter/")) { tag = inverterTAG; } else if (topic.contains("/native/knx/")) { tag = knxTAG; } else { tag = linkTAG; } LogUtils.i(tag, "本地接收数据,Topic:" + response.getTopic() + "\r\nPayload:" + response.getData()); } // LogUtils.e(TAG, "===14"); //解析完成,topic发送一次 EventDispatcher.getInstance().post(response.getTopic(), response); } }