package com.hdl.sdk.connect.protocol;
|
|
|
import android.util.Log;
|
|
import com.hdl.sdk.common.config.AuthenticateConfig;
|
import com.hdl.sdk.common.event.EventDispatcher;
|
import com.hdl.sdk.common.utils.ByteUtils;
|
import com.hdl.sdk.connect.bean.LinkResponse;
|
import com.hdl.sdk.connect.utils.AesUtil;
|
import com.hdl.sdk.connect.utils.ProtocolParse;
|
import com.hdl.sdk.socket.codec.ByteToMessageDecoder;
|
|
import java.util.ArrayList;
|
import java.util.List;
|
|
/**
|
* Created by Tong on 2021/9/22.
|
* link协议粘包拆包
|
*/
|
public class LinkMessageDecoder extends ByteToMessageDecoder<LinkResponse> {
|
|
private final List<Byte> bytes;
|
|
private final byte[] head = "Topic:".getBytes();
|
private final byte[] body = "\r\n\r\n".getBytes();
|
|
public LinkMessageDecoder() {
|
this.bytes = new ArrayList<>();
|
}
|
|
@Override
|
protected LinkResponse decoder(Object msg) throws Exception {
|
LinkResponse response = new LinkResponse();
|
if (msg instanceof byte[]) {
|
//解析流
|
byte[] data = (byte[]) msg;
|
bytes.addAll(ByteUtils.toByteList(data));
|
|
byte[] byteArray = ByteUtils.toByteArray(bytes);
|
int headIndex = ByteUtils.getByteIndexOf(byteArray, head);
|
if (headIndex > 0) {
|
//移动到head 开始位置
|
bytes.subList(0, headIndex).clear();
|
byteArray = ByteUtils.toByteArray(bytes);
|
}
|
|
int bodyIndex = ByteUtils.getByteIndexOf(byteArray, body);
|
if (bodyIndex < 0) {
|
//头部未获取完成
|
return null;
|
}
|
int bodyStartIndex = bodyIndex + body.length;
|
|
//解析头部
|
ProtocolParse parse = new ProtocolParse(byteArray);
|
response.setTopic(parse.getTopic());
|
|
int bodyLength = parse.getLength();
|
if (bodyLength > 0) {
|
if (byteArray.length >= bodyLength + bodyStartIndex) {
|
byte[] body = ByteUtils.getRangeBytes(bytes, bodyStartIndex, bodyStartIndex + bodyLength);
|
|
if(AuthenticateConfig.getInstance().ifNeedEncrypt(response.getTopic())){
|
//需要解密
|
byte[] bodyBytes = AesUtil.aesDecrypt(body,AuthenticateConfig.getInstance().getLocalSecret());
|
response.setData(new String(bodyBytes, "utf-8"));
|
Log.i("TAG", "decoder: ");
|
}else{
|
response.setData(new String(body, "utf-8"));
|
}
|
|
|
|
if (byteArray.length >= bodyLength + bodyStartIndex) {
|
//保存余留
|
byte[] remaining = ByteUtils.getRangeBytes(bytes, bodyStartIndex + bodyLength, byteArray.length);
|
bytes.clear();
|
for (byte b : remaining) {
|
bytes.add(b);
|
}
|
}
|
//解析完成,topic发送一次
|
EventDispatcher.getInstance().post(response.getTopic(), response);
|
return response;
|
}
|
} else if (bodyLength == 0) {
|
//body为空
|
return response;
|
}
|
|
}
|
return null;
|
}
|
}
|