package com.hdl.sdk.connect.protocol; import com.hdl.sdk.common.config.AuthenticateConfig; import com.hdl.sdk.common.event.EventDispatcher; import com.hdl.sdk.common.utils.ByteUtils; import com.hdl.sdk.connect.bean.LinkResponse; import com.hdl.sdk.connect.utils.AESUtils; import com.hdl.sdk.connect.utils.AesUtil; import com.hdl.sdk.connect.utils.ProtocolParse; import com.hdl.sdk.socket.codec.ByteToMessageDecoder; import java.util.ArrayList; import java.util.List; /** * Created by Tong on 2021/9/22. * link协议粘包拆包 */ public class LinkMessageDecoder extends ByteToMessageDecoder { private final List bytes; private final byte[] head = "Topic:".getBytes(); private final byte[] body = "\r\n\r\n".getBytes(); public LinkMessageDecoder() { this.bytes = new ArrayList<>(); } @Override protected LinkResponse decoder(Object msg) throws Exception { LinkResponse response = new LinkResponse(); if (msg instanceof byte[]) { //解析流 byte[] data = (byte[]) msg; bytes.addAll(ByteUtils.toByteList(data)); byte[] byteArray = ByteUtils.toByteArray(bytes); int headIndex = ByteUtils.getByteIndexOf(byteArray, head); if (headIndex > 0) { //移动到head 开始位置 bytes.subList(0, headIndex).clear(); byteArray = ByteUtils.toByteArray(bytes); } int bodyIndex = ByteUtils.getByteIndexOf(byteArray, body); if (bodyIndex < 0) { //头部未获取完成 return null; } int bodyStartIndex = bodyIndex + body.length; //解析头部 ProtocolParse parse = new ProtocolParse(byteArray); response.setTopic(parse.getTopic()); int bodyLength = parse.getLength(); if (bodyLength > 0) { if (byteArray.length >= bodyLength + bodyStartIndex) { byte[] body = ByteUtils.getRangeBytes(bytes, bodyStartIndex, bodyStartIndex + bodyLength); if(AuthenticateConfig.getInstance().ifNeedEncrypt(response.getTopic())){ //需要解密 byte[] bodyBytes = AesUtil.aesDecrypt(body,AuthenticateConfig.getInstance().getLocalSecret()); // byte[] bodyBytes = AESUtils.decryptAES(body,AuthenticateConfig.getInstance().getLocalSecret()); response.setData(new String(bodyBytes, "utf-8")); // Log.i("TAG", "解密 主题:"+response.getTopic()+ " body: "+response.getData()); }else{ response.setData(new String(body, "utf-8")); } if (byteArray.length >= bodyLength + bodyStartIndex) { //保存余留 byte[] remaining = ByteUtils.getRangeBytes(bytes, bodyStartIndex + bodyLength, byteArray.length); bytes.clear(); for (byte b : remaining) { bytes.add(b); } } //解析完成,topic发送一次 EventDispatcher.getInstance().post(response.getTopic(), response); return response; } } else if (bodyLength == 0) { //body为空 return response; } } return null; } }