package com.hdl.sdk.link.core.protocol;
|
|
|
import com.hdl.sdk.link.common.utils.LogUtils;
|
import com.hdl.sdk.link.common.event.EventDispatcher;
|
import com.hdl.sdk.link.common.utils.ByteUtils;
|
import com.hdl.sdk.link.core.bean.LinkPacket;
|
import com.hdl.sdk.link.core.bean.LinkResponse;
|
import com.hdl.sdk.link.core.utils.ByteBufferUtils;
|
import com.hdl.sdk.link.core.utils.QueueUtils;
|
import com.hdl.sdk.link.gateway.HDLLinkLocalGateway;
|
import com.hdl.sdk.link.socket.bean.Packet;
|
import com.hdl.sdk.link.socket.codec.ByteToMessageDecoder;
|
|
import java.nio.ByteBuffer;
|
|
/**
|
* Created by Tong on 2021/9/22.
|
* link协议粘包拆包
|
*/
|
public class LinkMessageDecoder extends ByteToMessageDecoder<LinkResponse> {
|
|
private static final String TAG=LinkMessageDecoder.class.getName();
|
//instance
|
private volatile static LinkMessageDecoder instance;
|
|
//getInstance
|
public static synchronized LinkMessageDecoder getInstance() {
|
if (instance == null) {
|
synchronized (LinkMessageDecoder.class) {
|
if (instance == null) {
|
instance = new LinkMessageDecoder();
|
}
|
}
|
}
|
return instance;
|
}
|
|
/**
|
* 接收数据缓冲区
|
*/
|
private final ByteBuffer byteBuffer;
|
|
private final byte[] head = "Topic:".getBytes();
|
|
public LinkMessageDecoder() {
|
byteBuffer = ByteBuffer.allocate(1024 * 200);//100K
|
}
|
|
/// <summary>
|
/// 获取内容长度
|
/// </summary>
|
/// <param name="topMsgs"></param>
|
/// <returns></returns>
|
int getLenght(String[] topMsgs) {
|
try {
|
for (int i = 0; i < topMsgs.length; i++) {
|
String topMsg = topMsgs[i].trim();
|
if (topMsg.startsWith("Length:")) {
|
return Integer.parseInt(topMsg.replace("Length:", "").trim());
|
}
|
}
|
} catch (Exception e) {
|
LogUtils.e("异常数据:" + topMsgs[0] + "\r\n" + topMsgs[1]);
|
return -1;
|
}
|
//找不到长度
|
return -1;
|
}
|
|
/// <summary>
|
/// 获取主题
|
/// </summary>
|
/// <param name="topMsgs"></param>
|
/// <returns></returns>
|
private String getTopic(String[] topMsgs) {
|
for (int i = 0; i < topMsgs.length; i++) {
|
String topMsg = topMsgs[i].trim();
|
if (topMsg.startsWith("Topic:")) {
|
return topMsg.replace("Topic:", "");
|
}
|
}
|
//找不到主题
|
return null;
|
}
|
|
/**
|
* 获取数据的开始位置
|
*
|
* @return 数据位的开始索引
|
*/
|
int getBodyIndex() {
|
byte r = (byte) '\r';
|
byte n = (byte) '\n';
|
for (int i = 0; i < byteBuffer.position(); i++) {
|
//找出数据内容前面的两个换行
|
if (3 <= i && byteBuffer.get(i - 3) == r && byteBuffer.get(i - 2) == n && byteBuffer.get(i - 1) == r && byteBuffer.get(i) == n) {
|
//剩余的数据
|
return i + 1;
|
}
|
}
|
return -1;
|
}
|
|
/**
|
* 获取头部数据
|
*
|
* @return
|
*/
|
String getHeader() {
|
int bodyIndex = getBodyIndex();
|
if (bodyIndex < 0) {
|
//没有找到头部数据
|
return null;
|
} else {
|
byte bodyBytes[] = ByteBufferUtils.copyBytes(byteBuffer, bodyIndex);
|
return new String(bodyBytes);
|
}
|
}
|
|
/**
|
* 获取数据内容
|
*
|
* @param lenght
|
* @return
|
*/
|
byte[] getBody(int index, int lenght) {
|
//是否已经获取完整所有的数据
|
byte[] bodyBytes = new byte[lenght];
|
if (index < 0 || byteBuffer.position() < index + lenght) {
|
//当前数据还没有接收完成
|
return null;
|
}
|
|
for (int i = 0; i < bodyBytes.length; i++) {
|
bodyBytes[i] = byteBuffer.get(index + i);
|
}
|
return bodyBytes;
|
}
|
|
|
|
/**
|
* 这边处理了缓存数据粘包的情况,每次请求都需要吧当前完整的文件除去 以便于下次的返回
|
* tempList用于存储多余的数据
|
* contentList用于本次数据的存储(发送给订阅的数据)
|
*/
|
byte[] geBody() {
|
int len = 3 + 2 + 2 + 4 + ((byteBuffer.get(7) & 0xFF) * 256 * 256 * 256) + ((byteBuffer.get(8) & 0xFF) * 256 * 256) + ((byteBuffer.get(9) & 0xFF) * 256) + (byteBuffer.get(10) & 0xFF);
|
if (byteBuffer.position() < len) {
|
return null;
|
}
|
byte[] bodyBytes = new byte[len];
|
for (int i = 0; i < len; i++) {
|
bodyBytes[i] = byteBuffer.get(i);
|
}
|
|
int endIndex = byteBuffer.position();
|
byteBuffer.clear();
|
for (int i = len; i < endIndex; i++) {
|
byteBuffer.put(byteBuffer.get(i));
|
}
|
return bodyBytes;
|
}
|
|
/**
|
* 移除可能存在的无效数据
|
*/
|
void removeInVoidBytes() {
|
int index = 0;
|
boolean isMatch = false;
|
for (; index < byteBuffer.position() - head.length; index++) {
|
isMatch = true;
|
for (int j = 0, k = 0; j < head.length; j++, k++) {
|
if (head[j] != byteBuffer.get(index + k)) {
|
isMatch = false;
|
break;
|
}
|
}
|
if (isMatch) {
|
break;
|
}
|
}
|
|
if (0 < index && isMatch) {
|
int endIndex = byteBuffer.position();
|
byteBuffer.clear();
|
for (int i = index; i < endIndex; i++) {
|
byteBuffer.put(byteBuffer.get(i));
|
}
|
}
|
}
|
|
/**
|
* 移除到指定位置前面的数据
|
*
|
* @param position 指定位置
|
*/
|
void remove(int position) {
|
int endIndex = byteBuffer.position();
|
byteBuffer.clear();
|
for (int i = position; i < endIndex; i++) {
|
byteBuffer.put(byteBuffer.get(i));
|
}
|
}
|
|
void fileManger(int commandAck, byte[] recevieBytes) {
|
String topic = "65531_" + commandAck;
|
LinkResponse response = new LinkResponse();
|
response.setTopic(topic);
|
response.setByteData(recevieBytes);
|
EventDispatcher.getInstance().post(response.getTopic(), response);
|
}
|
|
int bytes2int(byte[] bytes) {
|
int result = 0;
|
if (bytes.length == 2) {
|
int c = (bytes[0] & 0xff) << 8;
|
int d = (bytes[1] & 0xff);
|
result = c | d;
|
} else if (bytes.length == 4) {
|
return bytes[3] & 0xFF | //
|
(bytes[2] & 0xFF) << 8 | //
|
(bytes[1] & 0xFF) << 16 | //
|
(bytes[0] & 0xFF) << 24; //
|
}
|
return result;
|
}
|
|
public String byte2hex(byte[] bytes) {
|
StringBuilder sb = new StringBuilder();
|
String tmp = null;
|
for (byte b : bytes) {
|
//将每个字节与0xFF进行与运算,然后转化为10进制,然后借助于Integer再转化为16进制
|
tmp = Integer.toHexString(0xFF & b);
|
if (tmp.length() == 1) {
|
tmp = "0" + tmp;
|
}
|
sb.append(tmp + " ");
|
}
|
return sb.toString();
|
}
|
|
@Override
|
protected synchronized LinkResponse decoder(Packet packet) {
|
try {
|
if (null == packet) {
|
return null;
|
}
|
byteBuffer.put(packet.getBytes());
|
} catch (Exception e) {
|
LogUtils.e("接收到数据异常:\r\n" + e.getMessage());
|
byteBuffer.flip();
|
byteBuffer.clear();
|
}
|
try {
|
//如果多条命令打包在一条数据中,都需要处理完
|
while (true) {
|
if (byteBuffer.position() > 2) {//判断是否是文件处理通知 wxr
|
byte[] topBytes = new byte[3];
|
topBytes[0] = byteBuffer.get(0);
|
topBytes[1] = byteBuffer.get(1);
|
topBytes[2] = byteBuffer.get(2);
|
if (new String(topBytes).equals("hex")) {
|
//TODO 这块代码统一移出其它地方处理
|
byte[] commandBytes = ByteBufferUtils.copyBytes(byteBuffer, 5, 2);
|
int command = bytes2int(commandBytes);
|
byte[] submitBytes = geBody();
|
if(submitBytes==null)
|
{
|
//还没有接收完成
|
continue;
|
}
|
if (command == 258 || command == 260 || command == 261) {
|
//读取驱动列表响应 ||驱动安装申请响应
|
if (submitBytes.length > 11) {
|
byte[] rangeBytes = ByteUtils.copyBytes(submitBytes, 11, submitBytes.length - 11);
|
fileManger(command, rangeBytes);
|
} else {
|
//方便问题排查
|
fileManger(command, submitBytes);
|
}
|
} else {
|
//给秀桡使用 后面的业务最好都在这边处理 不然会造成业务分散
|
fileManger(command, submitBytes);
|
}
|
continue;
|
}
|
}
|
removeInVoidBytes();//移除可能存在的无效数据
|
|
//头部数据
|
String header = getHeader();
|
|
if (header == null) {
|
break;
|
}
|
String[] topMsgs = header.split("\r\n");
|
|
String topic = getTopic(topMsgs);
|
int lenght = getLenght(topMsgs);
|
if (topic == null || lenght <= 0) {
|
//获取不到主题或者头部数据还没有接收完成
|
break;
|
}
|
|
int bodyIndex = getBodyIndex();
|
//是否已经获取完整所有的数据
|
byte[] body = getBody(bodyIndex, lenght);
|
|
if (body == null) {
|
//当前数据还没有接收完成
|
break;
|
}
|
|
remove(bodyIndex + lenght);
|
|
if (topic.contains("heartbeat_reply")) {
|
if (packet.getSocket() != null) {
|
packet.getSocket().setSoTimeout(10 * 1000);
|
}
|
continue;
|
}
|
|
QueueUtils.getInstance().add(new LinkPacket(topic, body));
|
}
|
} catch (Exception ee) {
|
LogUtils.e("处理接收的数据异常:\r\n" + ee.getMessage());
|
}
|
return null;
|
}
|
}
|