hxb
2023-09-18 5873127e4249693d11d9c321599011ea6b12ba1f
处理接收数据优化
1个文件已添加
3个文件已修改
314 ■■■■ 已修改文件
HDLSDK/com.hdl.sdk/build.gradle 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
HDLSDK/hdl-connect/src/main/java/com/hdl/sdk/common/HDLSdk.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
HDLSDK/hdl-connect/src/main/java/com/hdl/sdk/connect/protocol/LinkMessageDecoder.java 271 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
HDLSDK/hdl-connect/src/main/java/com/hdl/sdk/connect/utils/ByteBufferUtils.java 37 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
HDLSDK/com.hdl.sdk/build.gradle
@@ -8,8 +8,8 @@
    defaultConfig {
        minSdkVersion rootProject.minSdkVersion
        targetSdkVersion rootProject.targetSdkVersion
        versionCode 115
        versionName "1.1.5"
        versionCode 116
        versionName "1.1.6"
    }
    buildTypes {
        debug {
HDLSDK/hdl-connect/src/main/java/com/hdl/sdk/common/HDLSdk.java
@@ -35,7 +35,7 @@
        return version;
    }
    private String version = "1.1.5";
    private String version = "1.1.6";
    private HDLSdk() {
    }
HDLSDK/hdl-connect/src/main/java/com/hdl/sdk/connect/protocol/LinkMessageDecoder.java
@@ -25,8 +25,11 @@
import com.hdl.sdk.connect.socket.HDLAuthSocket;
import com.hdl.sdk.connect.socket.HDLSocket;
import com.hdl.sdk.connect.utils.AesUtil;
import com.hdl.sdk.connect.utils.ByteBufferUtils;
import com.hdl.sdk.socket.codec.ByteToMessageDecoder;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
@@ -40,13 +43,31 @@
 */
public class LinkMessageDecoder extends ByteToMessageDecoder<LinkResponse> {
    private final List<Byte> bytes;
    private static final String TAG = LinkMessageDecoder.class.getName();
    //instance
    private volatile static LinkMessageDecoder instance;
    //getInstance
    public static synchronized LinkMessageDecoder getInstance() {
        if (instance == null) {
            synchronized (LinkMessageDecoder.class) {
                if (instance == null) {
                    instance = new LinkMessageDecoder();
                }
            }
        }
        return instance;
    }
    /**
     * 接收数据缓冲区
     */
    private final ByteBuffer byteBuffer;
    private final byte[] head = "Topic:".getBytes();
//    private final byte[] body = "\r\n\r\n".getBytes();
    public LinkMessageDecoder() {
        this.bytes = new ArrayList<>();
        byteBuffer = ByteBuffer.allocate(1024 * 200);//100K
    }
    /// <summary>
@@ -55,11 +76,16 @@
    /// <param name="topMsgs"></param>
    /// <returns></returns>
    int getLenght(String[] topMsgs) {
        for (int i = 0; i < topMsgs.length; i++) {
            String topMsg = topMsgs[i].trim();
            if (topMsg.startsWith("Length:")) {
                return Integer.parseInt(topMsg.replace("Length:", ""));
        try {
            for (int i = 0; i < topMsgs.length; i++) {
                String topMsg = topMsgs[i].trim();
                if (topMsg.startsWith("Length:")) {
                    return Integer.parseInt(topMsg.replace("Length:", "").trim());
                }
            }
        } catch (Exception e) {
            LogUtils.e("异常数据:" + topMsgs[0] + "\r\n" + topMsgs[1]);
            return -1;
        }
        //找不到长度
        return -1;
@@ -84,15 +110,14 @@
    /**
     * 获取数据的开始位置
     *
     * @param arrayList 接收到的所有数据
     * @return 数据位的开始索引
     */
    int getDataIndex(List<Byte> arrayList) {
    int getBodyIndex() {
        byte r = (byte) '\r';
        byte n = (byte) '\n';
        for (int i = 0; i < arrayList.size(); i++) {
        for (int i = 0; i < byteBuffer.position(); i++) {
            //找出数据内容前面的两个换行
            if (3 <= i && arrayList.get(i - 3) == r && arrayList.get(i - 2) == n && arrayList.get(i - 1) == r && arrayList.get(i) == n) {
            if (3 <= i && byteBuffer.get(i - 3) == r && byteBuffer.get(i - 2) == n && byteBuffer.get(i - 1) == r && byteBuffer.get(i) == n) {
                //剩余的数据
                return i + 1;
            }
@@ -100,14 +125,73 @@
        return -1;
    }
    void initReceiveData(List<Byte> list) {
    /**
     * 获取头部数据
     *
     * @return
     */
    String getHeader() {
        int bodyIndex = getBodyIndex();
        if (bodyIndex < 0) {
            //没有找到头部数据
            return null;
        } else {
            byte bodyBytes[] = ByteBufferUtils.copyBytes(byteBuffer, bodyIndex);
            return new String(bodyBytes);
        }
    }
    /**
     * 获取数据内容
     *
     * @param lenght
     * @return
     */
    byte[] getBody(int index, int lenght) {
        //是否已经获取完整所有的数据
        byte[] bodyBytes = new byte[lenght];
        if (index < 0 || byteBuffer.position() < index + lenght) {
            //当前数据还没有接收完成
            return null;
        }
        for (int i = 0; i < bodyBytes.length; i++) {
            bodyBytes[i] = byteBuffer.get(index + i);
        }
        return bodyBytes;
    }
    /**
     * 这边处理了缓存数据粘包的情况,每次请求都需要吧当前完整的文件除去   以便于下次的返回
     * tempList用于存储多余的数据
     * contentList用于本次数据的存储(发送给订阅的数据)
     */
    byte[] geBody() {
        int len = 3 + 4 + 4 + ((byteBuffer.get(7) & 0xFF) * 256 * 256 * 256) + ((byteBuffer.get(8) & 0xFF) * 256 * 256) + ((byteBuffer.get(9) * 256) & 0xFF) + (byteBuffer.get(10) & 0xFf);
        byte[] bodyBytes = new byte[len];
        for (int i = 0; i < len; i++) {
            bodyBytes[i] = byteBuffer.get(i);
        }
        int endIndex = byteBuffer.position();
        byteBuffer.clear();
        for (int i = len; i < endIndex; i++) {
            byteBuffer.put(byteBuffer.get(i));
        }
        return bodyBytes;
    }
    /**
     * 移除可能存在的无效数据
     */
    void removeInVoidBytes() {
        int index = 0;
        boolean isMatch = false;
        for (; index < list.size() - head.length; index++) {
        for (; index < byteBuffer.position() - head.length; index++) {
            isMatch = true;
            for (int j = 0, k = 0; j < head.length; j++, k++) {
                if (head[j] != list.get(index + k)) {
                if (head[j] != byteBuffer.get(index + k)) {
                    isMatch = false;
                    break;
                }
@@ -118,97 +202,148 @@
        }
        if (0 < index && isMatch) {
            List<Byte> tempList = new ArrayList<Byte>();
            for (int i = index; i < list.size(); i++) {
                tempList.add(list.get(i));
            }
            list.clear();
            for (int i = 0; i < tempList.size(); i++) {
                list.add(tempList.get(i));
            int endIndex = byteBuffer.position();
            byteBuffer.clear();
            for (int i = index; i < endIndex; i++) {
                byteBuffer.put(byteBuffer.get(i));
            }
        }
    }
    /**
     * 移除到指定位置前面的数据
     *
     * @param position 指定位置
     */
    void remove(int position) {
        int endIndex = byteBuffer.position();
        byteBuffer.clear();
        for (int i = position; i < endIndex; i++) {
            byteBuffer.put(byteBuffer.get(i));
        }
    }
    int bytes2int(byte[] bytes) {
        int result = 0;
        if (bytes.length == 2) {
            int c = (bytes[0] & 0xff) << 8;
            int d = (bytes[1] & 0xff);
            result = c | d;
        } else if (bytes.length == 4) {
            return bytes[3] & 0xFF | //
                    (bytes[2] & 0xFF) << 8 | //
                    (bytes[1] & 0xFF) << 16 | //
                    (bytes[0] & 0xFF) << 24; //
        }
        return result;
    }
    public String byte2hex(byte[] bytes) {
        StringBuilder sb = new StringBuilder();
        String tmp = null;
        for (byte b : bytes) {
            //将每个字节与0xFF进行与运算,然后转化为10进制,然后借助于Integer再转化为16进制
            tmp = Integer.toHexString(0xFF & b);
            if (tmp.length() == 1) {
                tmp = "0" + tmp;
            }
            sb.append(tmp + " ");
        }
        return sb.toString();
    }
    @RequiresApi(api = Build.VERSION_CODES.O)
    @Override
    protected synchronized LinkResponse decoder(Object msg, String ipaddress) throws Exception {
        if (msg instanceof byte[]) {
            bytes.addAll(ByteUtils.toByteList((byte[]) msg));
        if (msg == null || !(msg instanceof byte[])) {
            return null;
        }
        byte[] bytes = (byte[]) msg;
        try {
            byteBuffer.put(bytes);
        } catch (Exception e) {
            LogUtils.e("接收到数据异常:\r\n" + e.getMessage());
            byteBuffer.flip();
            byteBuffer.clear();
        }
        try {
            //如果多条命令打包在一条数据中,都需要处理完
            while (true) {
                initReceiveData(bytes);
                byte[] recevieBytes = ByteUtils.toByteArray(bytes);
                String data = new String(recevieBytes);
                removeInVoidBytes();//移除可能存在的无效数据
                String[] topMsgs = data.split("\r\n");
                //头部数据
                String header = getHeader();
                if (header == null) {
                    break;
                }
                String[] topMsgs = header.split("\r\n");
                String topic = getTopic(topMsgs);
                if (topic == null) {
                    break;
                }
                int lenght = getLenght(topMsgs);
                if (lenght <= 0) {
                    //头部数据还没有接收完成
                if (topic == null || lenght <= 0) {
                    //获取不到主题或者头部数据还没有接收完成
                    break;
                }
                int bodyIndex = getBodyIndex();
                //是否已经获取完整所有的数据
                byte[] body = new byte[lenght];
                int index = getDataIndex(bytes);
                if (bytes.size() < index + lenght) {
                byte[] body = getBody(bodyIndex, lenght);
                if (body == null) {
                    //当前数据还没有接收完成
                    break;
                }
                //复制出body数据
                System.arraycopy(recevieBytes, index, body, 0, body.length);
                //保留剩余的数据,以次用
                bytes.clear();
                for (int i = index + lenght; i < recevieBytes.length; i++) {
                    bytes.add(recevieBytes[i]);
                remove(bodyIndex + lenght);
                if (topic.contains("heartbeat_reply")) {
//                    if (packet.getSocket() != null) {
//                        packet.getSocket().setSoTimeout(10 * 1000);
//                    }
                    continue;
                }
                LinkResponse response = new LinkResponse();
                response.setSource_ipAddress(ipaddress);
                Log.d("panlili", "----->source_ipAddress= " + ipaddress);
                response.setTopic(topic);
                if (HDLLinkConfig.getInstance().ifNeedEncrypt(response.getTopic())) {
                if (encrypt(body)) {
                    //需要解密
                    byte[] bodyBytes = AesUtil.aesDecrypt(body, HDLLinkConfig.getInstance().getLocalSecret());
                    if (bodyBytes != null) {
                        body = bodyBytes;
                        response.setData(new String(bodyBytes, StandardCharsets.UTF_8));
                    } else {
                        try {
                            //之前的版本这块是明文的
                            if (!topic.contains("heartbeat_reply")) {
                                if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.O)
                                    LogUtils.e("解密失败,数据内容是:\r\n" + Base64.encodeToString(body, Base64.NO_WRAP));
                                else {
                                    LogUtils.e("解密失败,数据内容是:\r\n" + new String(body, "utf-8"));
                                }
                            }
                        } catch (Exception e) {
                        }
                        LogUtils.e("解密失败\r\n" + topic);
                        response.setData(new String(body, "utf-8"));
                        continue;
                    }
                } else {
                    response.setData(new String(body, "utf-8"));
                }
                String bodyString = new String(body, "utf-8");
                response.setData(bodyString);
                LogUtils.i("接收到数据:" + response.getTopic() + "\r\n" + response.getData());
                LogUtils.i("本地接收到数据:\r\n" + response.getTopic() + "\r\n" + response.getData() + "\r\n" + response.getData().length());
                //非正常数据,返回
                if (!((bodyString.startsWith("{") && bodyString.endsWith("}"))
                        || (bodyString.startsWith("[") && bodyString.endsWith("]")))) {
                    continue;
                }
                //解析完成,topic发送一次
                EventDispatcher.getInstance().post(response.getTopic(), response);
            }
            return null;
        } catch (Exception ee) {
            LogUtils.e("处理接收的数据异常:\r\n" + ee.getMessage());
        }
        return null;
    }
    //是否加密
    private boolean encrypt(byte[] bytes) {
        if (bytes[0] == '{' && bytes[bytes.length - 1] == '}' || (bytes[0] == '[' && bytes[bytes.length - 1] == ']')) {
            return false;
        }
        return true;
    }
}
HDLSDK/hdl-connect/src/main/java/com/hdl/sdk/connect/utils/ByteBufferUtils.java
New file
@@ -0,0 +1,37 @@
package com.hdl.sdk.connect.utils;
import java.nio.ByteBuffer;
/**
 * Created by hxb on 2022/8/3.
 *
 */
public class ByteBufferUtils {
    /**
     * 获取ByteBuffer指定位置数据
     *
     * @param byteBuffer 源对象
     * @param length 指定长度
     * @return 根据长度生成的数组
     */
    public static byte[] copyBytes(ByteBuffer byteBuffer, int length) {
        return copyBytes(byteBuffer,0,length);
    }
    /**
     * 复制指定位置的数据
     * @param byteBuffer
     * @param index
     * @param length
     * @return
     */
    public static byte[] copyBytes(ByteBuffer byteBuffer, int index,int length) {
        byte[] bytes = new byte[length];
        for (int i = 0; i < bytes.length; i++) {
            bytes[i] = byteBuffer.get(index + i);
        }
        return bytes;
    }
}