package com.hdl.sdk.link.core.protocol; import com.hdl.sdk.link.common.utils.LogUtils; import com.hdl.sdk.link.common.event.EventDispatcher; import com.hdl.sdk.link.common.utils.ByteUtils; import com.hdl.sdk.link.core.bean.LinkPacket; import com.hdl.sdk.link.core.bean.LinkResponse; import com.hdl.sdk.link.core.utils.ByteBufferUtils; import com.hdl.sdk.link.core.utils.QueueUtils; import com.hdl.sdk.link.gateway.HDLLinkLocalGateway; import com.hdl.sdk.link.socket.bean.Packet; import com.hdl.sdk.link.socket.codec.ByteToMessageDecoder; import java.nio.ByteBuffer; /** * Created by Tong on 2021/9/22. * link协议粘包拆包 */ public class LinkMessageDecoder extends ByteToMessageDecoder { private static final String TAG=LinkMessageDecoder.class.getName(); //instance private volatile static LinkMessageDecoder instance; //getInstance public static synchronized LinkMessageDecoder getInstance() { if (instance == null) { synchronized (LinkMessageDecoder.class) { if (instance == null) { instance = new LinkMessageDecoder(); } } } return instance; } /** * 接收数据缓冲区 */ private final ByteBuffer byteBuffer; private final byte[] head = "Topic:".getBytes(); public LinkMessageDecoder() { byteBuffer = ByteBuffer.allocate(1024 * 200);//100K } /// /// 获取内容长度 /// /// /// int getLenght(String[] topMsgs) { try { for (int i = 0; i < topMsgs.length; i++) { String topMsg = topMsgs[i].trim(); if (topMsg.startsWith("Length:")) { return Integer.parseInt(topMsg.replace("Length:", "").trim()); } } } catch (Exception e) { LogUtils.e("异常数据:" + topMsgs[0] + "\r\n" + topMsgs[1]); return -1; } //找不到长度 return -1; } /// /// 获取主题 /// /// /// private String getTopic(String[] topMsgs) { for (int i = 0; i < topMsgs.length; i++) { String topMsg = topMsgs[i].trim(); if (topMsg.startsWith("Topic:")) { return topMsg.replace("Topic:", ""); } } //找不到主题 return null; } /** * 获取数据的开始位置 * * @return 数据位的开始索引 */ int getBodyIndex() { byte r = (byte) '\r'; byte n = (byte) '\n'; for (int i = 0; i < byteBuffer.position(); i++) { //找出数据内容前面的两个换行 if (3 <= i && byteBuffer.get(i - 3) == r && byteBuffer.get(i - 2) == n && byteBuffer.get(i - 1) == r && byteBuffer.get(i) == n) { //剩余的数据 return i + 1; } } return -1; } /** * 获取头部数据 * * @return */ String getHeader() { int bodyIndex = getBodyIndex(); if (bodyIndex < 0) { //没有找到头部数据 return null; } else { byte bodyBytes[] = ByteBufferUtils.copyBytes(byteBuffer, bodyIndex); return new String(bodyBytes); } } /** * 获取数据内容 * * @param lenght * @return */ byte[] getBody(int index, int lenght) { //是否已经获取完整所有的数据 byte[] bodyBytes = new byte[lenght]; if (index < 0 || byteBuffer.position() < index + lenght) { //当前数据还没有接收完成 return null; } for (int i = 0; i < bodyBytes.length; i++) { bodyBytes[i] = byteBuffer.get(index + i); } return bodyBytes; } /** * 这边处理了缓存数据粘包的情况,每次请求都需要吧当前完整的文件除去 以便于下次的返回 * tempList用于存储多余的数据 * contentList用于本次数据的存储(发送给订阅的数据) */ byte[] geBody() { int len = 3 + 2 + 2 + 4 + ((byteBuffer.get(7) & 0xFF) * 256 * 256 * 256) + ((byteBuffer.get(8) & 0xFF) * 256 * 256) + ((byteBuffer.get(9) & 0xFF) * 256) + (byteBuffer.get(10) & 0xFF); if (byteBuffer.position() < len) { return null; } byte[] bodyBytes = new byte[len]; for (int i = 0; i < len; i++) { bodyBytes[i] = byteBuffer.get(i); } int endIndex = byteBuffer.position(); byteBuffer.clear(); for (int i = len; i < endIndex; i++) { byteBuffer.put(byteBuffer.get(i)); } return bodyBytes; } /** * 移除可能存在的无效数据 */ void removeInVoidBytes() { int index = 0; boolean isMatch = false; for (; index < byteBuffer.position() - head.length; index++) { isMatch = true; for (int j = 0, k = 0; j < head.length; j++, k++) { if (head[j] != byteBuffer.get(index + k)) { isMatch = false; break; } } if (isMatch) { break; } } if (0 < index && isMatch) { int endIndex = byteBuffer.position(); byteBuffer.clear(); for (int i = index; i < endIndex; i++) { byteBuffer.put(byteBuffer.get(i)); } } } /** * 移除到指定位置前面的数据 * * @param position 指定位置 */ void remove(int position) { int endIndex = byteBuffer.position(); byteBuffer.clear(); for (int i = position; i < endIndex; i++) { byteBuffer.put(byteBuffer.get(i)); } } void fileManger(int commandAck, byte[] recevieBytes) { String topic = "65531_" + commandAck; LinkResponse response = new LinkResponse(); response.setTopic(topic); response.setByteData(recevieBytes); EventDispatcher.getInstance().post(response.getTopic(), response); } int bytes2int(byte[] bytes) { int result = 0; if (bytes.length == 2) { int c = (bytes[0] & 0xff) << 8; int d = (bytes[1] & 0xff); result = c | d; } else if (bytes.length == 4) { return bytes[3] & 0xFF | // (bytes[2] & 0xFF) << 8 | // (bytes[1] & 0xFF) << 16 | // (bytes[0] & 0xFF) << 24; // } return result; } public String byte2hex(byte[] bytes) { StringBuilder sb = new StringBuilder(); String tmp = null; for (byte b : bytes) { //将每个字节与0xFF进行与运算,然后转化为10进制,然后借助于Integer再转化为16进制 tmp = Integer.toHexString(0xFF & b); if (tmp.length() == 1) { tmp = "0" + tmp; } sb.append(tmp + " "); } return sb.toString(); } @Override protected synchronized LinkResponse decoder(Packet packet) { try { if (null == packet) { return null; } byteBuffer.put(packet.getBytes()); } catch (Exception e) { LogUtils.e("接收到数据异常:\r\n" + e.getMessage()); byteBuffer.flip(); byteBuffer.clear(); } try { //如果多条命令打包在一条数据中,都需要处理完 while (true) { if (byteBuffer.position() > 2) {//判断是否是文件处理通知 wxr byte[] topBytes = new byte[3]; topBytes[0] = byteBuffer.get(0); topBytes[1] = byteBuffer.get(1); topBytes[2] = byteBuffer.get(2); if (new String(topBytes).equals("hex")) { //TODO 这块代码统一移出其它地方处理 byte[] commandBytes = ByteBufferUtils.copyBytes(byteBuffer, 5, 2); int command = bytes2int(commandBytes); byte[] submitBytes = geBody(); if(submitBytes==null) { //还没有接收完成 continue; } if (command == 258 || command == 260 || command == 261) { //读取驱动列表响应 ||驱动安装申请响应 if (submitBytes.length > 11) { byte[] rangeBytes = ByteUtils.copyBytes(submitBytes, 11, submitBytes.length - 11); fileManger(command, rangeBytes); } else { //方便问题排查 fileManger(command, submitBytes); } } else { //给秀桡使用 后面的业务最好都在这边处理 不然会造成业务分散 fileManger(command, submitBytes); } continue; } } removeInVoidBytes();//移除可能存在的无效数据 //头部数据 String header = getHeader(); if (header == null) { break; } String[] topMsgs = header.split("\r\n"); String topic = getTopic(topMsgs); int lenght = getLenght(topMsgs); if (topic == null || lenght <= 0) { //获取不到主题或者头部数据还没有接收完成 break; } int bodyIndex = getBodyIndex(); //是否已经获取完整所有的数据 byte[] body = getBody(bodyIndex, lenght); if (body == null) { //当前数据还没有接收完成 break; } remove(bodyIndex + lenght); if (topic.contains("heartbeat_reply")) { if (packet.getSocket() != null) { packet.getSocket().setSoTimeout(10 * 1000); } continue; } QueueUtils.getInstance().add(new LinkPacket(topic, body)); } } catch (Exception ee) { LogUtils.e("处理接收的数据异常:\r\n" + ee.getMessage()); } return null; } }