package com.hdl.sdk.link.core.protocol;  
 | 
  
 | 
  
 | 
import com.hdl.sdk.link.common.utils.LogUtils;  
 | 
import com.hdl.sdk.link.common.event.EventDispatcher;  
 | 
import com.hdl.sdk.link.common.utils.ByteUtils;  
 | 
import com.hdl.sdk.link.core.bean.LinkPacket;  
 | 
import com.hdl.sdk.link.core.bean.LinkResponse;  
 | 
import com.hdl.sdk.link.core.utils.ByteBufferUtils;  
 | 
import com.hdl.sdk.link.core.utils.QueueUtils;  
 | 
import com.hdl.sdk.link.gateway.HDLLinkLocalGateway;  
 | 
import com.hdl.sdk.link.socket.bean.Packet;  
 | 
import com.hdl.sdk.link.socket.codec.ByteToMessageDecoder;  
 | 
  
 | 
import java.nio.ByteBuffer;  
 | 
  
 | 
/**  
 | 
 * Created by Tong on 2021/9/22.  
 | 
 * link协议粘包拆包  
 | 
 */  
 | 
public class LinkMessageDecoder extends ByteToMessageDecoder<LinkResponse> {  
 | 
  
 | 
    private static final String TAG=LinkMessageDecoder.class.getName();  
 | 
    //instance  
 | 
    private volatile static LinkMessageDecoder instance;  
 | 
  
 | 
    //getInstance  
 | 
    public static synchronized LinkMessageDecoder getInstance() {  
 | 
        if (instance == null) {  
 | 
            synchronized (LinkMessageDecoder.class) {  
 | 
                if (instance == null) {  
 | 
                    instance = new LinkMessageDecoder();  
 | 
                }  
 | 
            }  
 | 
        }  
 | 
        return instance;  
 | 
    }  
 | 
  
 | 
    /**  
 | 
     * 接收数据缓冲区  
 | 
     */  
 | 
    private final ByteBuffer byteBuffer;  
 | 
  
 | 
    private final byte[] head = "Topic:".getBytes();  
 | 
  
 | 
    public LinkMessageDecoder() {  
 | 
        byteBuffer = ByteBuffer.allocate(1024 * 200);//100K  
 | 
    }  
 | 
  
 | 
    /// <summary>  
 | 
    /// 获取内容长度  
 | 
    /// </summary>  
 | 
    /// <param name="topMsgs"></param>  
 | 
    /// <returns></returns>  
 | 
    int getLenght(String[] topMsgs) {  
 | 
        try {  
 | 
            for (int i = 0; i < topMsgs.length; i++) {  
 | 
                String topMsg = topMsgs[i].trim();  
 | 
                if (topMsg.startsWith("Length:")) {  
 | 
                    return Integer.parseInt(topMsg.replace("Length:", "").trim());  
 | 
                }  
 | 
            }  
 | 
        } catch (Exception e) {  
 | 
            LogUtils.e("异常数据:" + topMsgs[0] + "\r\n" + topMsgs[1]);  
 | 
            return -1;  
 | 
        }  
 | 
        //找不到长度  
 | 
        return -1;  
 | 
    }  
 | 
  
 | 
    /// <summary>  
 | 
    /// 获取主题  
 | 
    /// </summary>  
 | 
    /// <param name="topMsgs"></param>  
 | 
    /// <returns></returns>  
 | 
    private String getTopic(String[] topMsgs) {  
 | 
        for (int i = 0; i < topMsgs.length; i++) {  
 | 
            String topMsg = topMsgs[i].trim();  
 | 
            if (topMsg.startsWith("Topic:")) {  
 | 
                return topMsg.replace("Topic:", "");  
 | 
            }  
 | 
        }  
 | 
        //找不到主题  
 | 
        return null;  
 | 
    }  
 | 
  
 | 
    /**  
 | 
     * 获取数据的开始位置  
 | 
     *  
 | 
     * @return 数据位的开始索引  
 | 
     */  
 | 
    int getBodyIndex() {  
 | 
        byte r = (byte) '\r';  
 | 
        byte n = (byte) '\n';  
 | 
        for (int i = 0; i < byteBuffer.position(); i++) {  
 | 
            //找出数据内容前面的两个换行  
 | 
            if (3 <= i && byteBuffer.get(i - 3) == r && byteBuffer.get(i - 2) == n && byteBuffer.get(i - 1) == r && byteBuffer.get(i) == n) {  
 | 
                //剩余的数据  
 | 
                return i + 1;  
 | 
            }  
 | 
        }  
 | 
        return -1;  
 | 
    }  
 | 
  
 | 
    /**  
 | 
     * 获取头部数据  
 | 
     *  
 | 
     * @return  
 | 
     */  
 | 
    String getHeader() {  
 | 
        int bodyIndex = getBodyIndex();  
 | 
        if (bodyIndex < 0) {  
 | 
            //没有找到头部数据  
 | 
            return null;  
 | 
        } else {  
 | 
            byte bodyBytes[] = ByteBufferUtils.copyBytes(byteBuffer, bodyIndex);  
 | 
            return new String(bodyBytes);  
 | 
        }  
 | 
    }  
 | 
  
 | 
    /**  
 | 
     * 获取数据内容  
 | 
     *  
 | 
     * @param lenght  
 | 
     * @return  
 | 
     */  
 | 
    byte[] getBody(int index, int lenght) {  
 | 
        //是否已经获取完整所有的数据  
 | 
        byte[] bodyBytes = new byte[lenght];  
 | 
        if (index < 0 || byteBuffer.position() < index + lenght) {  
 | 
            //当前数据还没有接收完成  
 | 
            return null;  
 | 
        }  
 | 
  
 | 
        for (int i = 0; i < bodyBytes.length; i++) {  
 | 
            bodyBytes[i] = byteBuffer.get(index + i);  
 | 
        }  
 | 
        return bodyBytes;  
 | 
    }  
 | 
  
 | 
  
 | 
    /**  
 | 
     * 这边处理了缓存数据粘包的情况,每次请求都需要吧当前完整的文件除去   以便于下次的返回  
 | 
     * tempList用于存储多余的数据  
 | 
     * contentList用于本次数据的存储(发送给订阅的数据)  
 | 
     */  
 | 
    byte[] geBody() {  
 | 
        int len = 3 + 4 + 4 + ((byteBuffer.get(7) & 0xFF) * 256 * 256 * 256) + ((byteBuffer.get(8) & 0xFF) * 256 * 256) + ((byteBuffer.get(9) * 256) & 0xFF) + (byteBuffer.get(10) & 0xFf);  
 | 
        byte[] bodyBytes = new byte[len];  
 | 
        for (int i = 0; i < len; i++) {  
 | 
            bodyBytes[i] = byteBuffer.get(i);  
 | 
        }  
 | 
  
 | 
        int endIndex = byteBuffer.position();  
 | 
        byteBuffer.clear();  
 | 
        for (int i = len; i < endIndex; i++) {  
 | 
            byteBuffer.put(byteBuffer.get(i));  
 | 
        }  
 | 
        return bodyBytes;  
 | 
    }  
 | 
  
 | 
    /**  
 | 
     * 移除可能存在的无效数据  
 | 
     */  
 | 
    void removeInVoidBytes() {  
 | 
        int index = 0;  
 | 
        boolean isMatch = false;  
 | 
        for (; index < byteBuffer.position() - head.length; index++) {  
 | 
            isMatch = true;  
 | 
            for (int j = 0, k = 0; j < head.length; j++, k++) {  
 | 
                if (head[j] != byteBuffer.get(index + k)) {  
 | 
                    isMatch = false;  
 | 
                    break;  
 | 
                }  
 | 
            }  
 | 
            if (isMatch) {  
 | 
                break;  
 | 
            }  
 | 
        }  
 | 
  
 | 
        if (0 < index && isMatch) {  
 | 
            int endIndex = byteBuffer.position();  
 | 
            byteBuffer.clear();  
 | 
            for (int i = index; i < endIndex; i++) {  
 | 
                byteBuffer.put(byteBuffer.get(i));  
 | 
            }  
 | 
        }  
 | 
    }  
 | 
  
 | 
    /**  
 | 
     * 移除到指定位置前面的数据  
 | 
     *  
 | 
     * @param position 指定位置  
 | 
     */  
 | 
    void remove(int position) {  
 | 
        int endIndex = byteBuffer.position();  
 | 
        byteBuffer.clear();  
 | 
        for (int i = position; i < endIndex; i++) {  
 | 
            byteBuffer.put(byteBuffer.get(i));  
 | 
        }  
 | 
    }  
 | 
  
 | 
    void fileManger(int commandAck, byte[] recevieBytes) {  
 | 
        String topic = "65531_" + commandAck;  
 | 
        LinkResponse response = new LinkResponse();  
 | 
        response.setTopic(topic);  
 | 
        response.setByteData(recevieBytes);  
 | 
        EventDispatcher.getInstance().post(response.getTopic(), response);  
 | 
    }  
 | 
  
 | 
    int bytes2int(byte[] bytes) {  
 | 
        int result = 0;  
 | 
        if (bytes.length == 2) {  
 | 
            int c = (bytes[0] & 0xff) << 8;  
 | 
            int d = (bytes[1] & 0xff);  
 | 
            result = c | d;  
 | 
        } else if (bytes.length == 4) {  
 | 
            return bytes[3] & 0xFF | //  
 | 
                    (bytes[2] & 0xFF) << 8 | //  
 | 
                    (bytes[1] & 0xFF) << 16 | //  
 | 
                    (bytes[0] & 0xFF) << 24; //  
 | 
        }  
 | 
        return result;  
 | 
    }  
 | 
  
 | 
    public String byte2hex(byte[] bytes) {  
 | 
        StringBuilder sb = new StringBuilder();  
 | 
        String tmp = null;  
 | 
        for (byte b : bytes) {  
 | 
            //将每个字节与0xFF进行与运算,然后转化为10进制,然后借助于Integer再转化为16进制  
 | 
            tmp = Integer.toHexString(0xFF & b);  
 | 
            if (tmp.length() == 1) {  
 | 
                tmp = "0" + tmp;  
 | 
            }  
 | 
            sb.append(tmp + " ");  
 | 
        }  
 | 
        return sb.toString();  
 | 
    }  
 | 
  
 | 
    @Override  
 | 
    protected synchronized LinkResponse decoder(Packet packet) {  
 | 
        try {  
 | 
            if (null == packet) {  
 | 
                return null;  
 | 
            }  
 | 
            byteBuffer.put(packet.getBytes());  
 | 
        } catch (Exception e) {  
 | 
            LogUtils.e("接收到数据异常:\r\n" + e.getMessage());  
 | 
            byteBuffer.flip();  
 | 
            byteBuffer.clear();  
 | 
        }  
 | 
        try {  
 | 
            //如果多条命令打包在一条数据中,都需要处理完  
 | 
            while (true) {  
 | 
                if (byteBuffer.position() > 2) {//判断是否是文件处理通知 wxr  
 | 
                    byte[] topBytes = new byte[3];  
 | 
                    topBytes[0] = byteBuffer.get(0);  
 | 
                    topBytes[1] = byteBuffer.get(1);  
 | 
                    topBytes[2] = byteBuffer.get(2);  
 | 
                    if (new String(topBytes).equals("hex")) {  
 | 
                        //TODO 这块代码统一移出其它地方处理  
 | 
                        byte[] commandBytes = ByteBufferUtils.copyBytes(byteBuffer, 5, 2);  
 | 
                        int command = bytes2int(commandBytes);  
 | 
                        byte[] submitBytes = geBody();  
 | 
                        if (command == 258 || command == 260 || command == 261) {  
 | 
                            //读取驱动列表响应 ||驱动安装申请响应  
 | 
                            if (submitBytes.length > 11) {  
 | 
                                byte[] rangeBytes = ByteUtils.copyBytes(submitBytes, 11, submitBytes.length - 11);  
 | 
                                fileManger(command, rangeBytes);  
 | 
                            } else {  
 | 
                                //方便问题排查  
 | 
                                fileManger(command, submitBytes);  
 | 
                            }  
 | 
                        } else {  
 | 
                            //给秀桡使用  后面的业务最好都在这边处理 不然会造成业务分散  
 | 
                            fileManger(command, submitBytes);  
 | 
                        }  
 | 
                        continue;  
 | 
                    }  
 | 
                }  
 | 
                removeInVoidBytes();//移除可能存在的无效数据  
 | 
  
 | 
                //头部数据  
 | 
                String header = getHeader();  
 | 
  
 | 
                if (header == null) {  
 | 
                    break;  
 | 
                }  
 | 
                String[] topMsgs = header.split("\r\n");  
 | 
  
 | 
                String topic = getTopic(topMsgs);  
 | 
                int lenght = getLenght(topMsgs);  
 | 
                if (topic == null || lenght <= 0) {  
 | 
                    //获取不到主题或者头部数据还没有接收完成  
 | 
                    break;  
 | 
                }  
 | 
  
 | 
                int bodyIndex = getBodyIndex();  
 | 
                //是否已经获取完整所有的数据  
 | 
                byte[] body = getBody(bodyIndex, lenght);  
 | 
  
 | 
                if (body == null) {  
 | 
                    //当前数据还没有接收完成  
 | 
                    break;  
 | 
                }  
 | 
  
 | 
                remove(bodyIndex + lenght);  
 | 
  
 | 
                if (topic.contains("heartbeat_reply")) {  
 | 
                    if (packet.getSocket() != null) {  
 | 
                        packet.getSocket().setSoTimeout(10 * 1000);  
 | 
                    }  
 | 
                    continue;  
 | 
                }  
 | 
  
 | 
                QueueUtils.getInstance().add(new LinkPacket(topic, body));  
 | 
            }  
 | 
        } catch (Exception ee) {  
 | 
            LogUtils.e("处理接收的数据异常:\r\n" + ee.getMessage());  
 | 
        }  
 | 
        return null;  
 | 
    }  
 | 
}  
 |