package com.hdl.sdk.connect.protocol; import android.os.Build; import android.text.TextUtils; import android.util.Log; import androidx.annotation.RequiresApi; import com.google.gson.reflect.TypeToken; import com.hdl.sdk.common.config.TopicConstant; import com.hdl.sdk.common.event.EventDispatcher; import com.hdl.sdk.common.exception.HDLLinkException; import com.hdl.sdk.common.utils.ByteUtils; import com.hdl.sdk.common.utils.LogUtils; import com.hdl.sdk.common.utils.SPUtils; import com.hdl.sdk.common.utils.gson.GsonConvert; import com.hdl.sdk.connect.HDLLink; import com.hdl.sdk.connect.bean.LinkResponse; import com.hdl.sdk.connect.bean.request.AuthenticateRequest; import com.hdl.sdk.connect.bean.response.DeviceDeleteResponse; import com.hdl.sdk.connect.bean.response.DeviceInfoResponse; import com.hdl.sdk.connect.callback.HDLLinkCallBack; import com.hdl.sdk.connect.config.HDLLinkConfig; import com.hdl.sdk.connect.socket.HDLAuthSocket; import com.hdl.sdk.connect.socket.HDLSocket; import com.hdl.sdk.connect.utils.AesUtil; import com.hdl.sdk.connect.utils.ByteBufferUtils; import com.hdl.sdk.socket.codec.ByteToMessageDecoder; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; import android.util.Base64; import kotlin.ParameterName; /** * Created by Tong on 2021/9/22. * link协议粘包拆包 */ public class LinkMessageDecoder extends ByteToMessageDecoder { private static final String TAG = LinkMessageDecoder.class.getName(); //instance private volatile static LinkMessageDecoder instance; //getInstance public static synchronized LinkMessageDecoder getInstance() { if (instance == null) { synchronized (LinkMessageDecoder.class) { if (instance == null) { instance = new LinkMessageDecoder(); } } } return instance; } /** * 接收数据缓冲区 */ private final ByteBuffer byteBuffer; private final byte[] head = "Topic:".getBytes(); public LinkMessageDecoder() { byteBuffer = ByteBuffer.allocate(1024 * 200);//100K } /// /// 获取内容长度 /// /// /// int getLenght(String[] topMsgs) { try { for (int i = 0; i < topMsgs.length; i++) { String topMsg = topMsgs[i].trim(); if (topMsg.startsWith("Length:")) { return Integer.parseInt(topMsg.replace("Length:", "").trim()); } } } catch (Exception e) { LogUtils.e("异常数据:" + topMsgs[0] + "\r\n" + topMsgs[1]); return -1; } //找不到长度 return -1; } /// /// 获取主题 /// /// /// private String getTopic(String[] topMsgs) { for (int i = 0; i < topMsgs.length; i++) { String topMsg = topMsgs[i].trim(); if (topMsg.startsWith("Topic:")) { return topMsg.replace("Topic:", ""); } } //找不到主题 return null; } /** * 获取数据的开始位置 * * @return 数据位的开始索引 */ int getBodyIndex() { byte r = (byte) '\r'; byte n = (byte) '\n'; for (int i = 0; i < byteBuffer.position(); i++) { //找出数据内容前面的两个换行 if (3 <= i && byteBuffer.get(i - 3) == r && byteBuffer.get(i - 2) == n && byteBuffer.get(i - 1) == r && byteBuffer.get(i) == n) { //剩余的数据 return i + 1; } } return -1; } /** * 获取头部数据 * * @return */ String getHeader() { int bodyIndex = getBodyIndex(); if (bodyIndex < 0) { //没有找到头部数据 return null; } else { byte bodyBytes[] = ByteBufferUtils.copyBytes(byteBuffer, bodyIndex); return new String(bodyBytes); } } /** * 获取数据内容 * * @param lenght * @return */ byte[] getBody(int index, int lenght) { //是否已经获取完整所有的数据 byte[] bodyBytes = new byte[lenght]; if (index < 0 || byteBuffer.position() < index + lenght) { //当前数据还没有接收完成 return null; } for (int i = 0; i < bodyBytes.length; i++) { bodyBytes[i] = byteBuffer.get(index + i); } return bodyBytes; } /** * 这边处理了缓存数据粘包的情况,每次请求都需要吧当前完整的文件除去 以便于下次的返回 * tempList用于存储多余的数据 * contentList用于本次数据的存储(发送给订阅的数据) */ byte[] geBody() { int len = 3 + 4 + 4 + ((byteBuffer.get(7) & 0xFF) * 256 * 256 * 256) + ((byteBuffer.get(8) & 0xFF) * 256 * 256) + ((byteBuffer.get(9) * 256) & 0xFF) + (byteBuffer.get(10) & 0xFf); byte[] bodyBytes = new byte[len]; for (int i = 0; i < len; i++) { bodyBytes[i] = byteBuffer.get(i); } int endIndex = byteBuffer.position(); byteBuffer.clear(); for (int i = len; i < endIndex; i++) { byteBuffer.put(byteBuffer.get(i)); } return bodyBytes; } /** * 移除可能存在的无效数据 */ void removeInVoidBytes() { int index = 0; boolean isMatch = false; for (; index < byteBuffer.position() - head.length; index++) { isMatch = true; for (int j = 0, k = 0; j < head.length; j++, k++) { if (head[j] != byteBuffer.get(index + k)) { isMatch = false; break; } } if (isMatch) { break; } } if (0 < index && isMatch) { int endIndex = byteBuffer.position(); byteBuffer.clear(); for (int i = index; i < endIndex; i++) { byteBuffer.put(byteBuffer.get(i)); } } } /** * 移除到指定位置前面的数据 * * @param position 指定位置 */ void remove(int position) { int endIndex = byteBuffer.position(); byteBuffer.clear(); for (int i = position; i < endIndex; i++) { byteBuffer.put(byteBuffer.get(i)); } } int bytes2int(byte[] bytes) { int result = 0; if (bytes.length == 2) { int c = (bytes[0] & 0xff) << 8; int d = (bytes[1] & 0xff); result = c | d; } else if (bytes.length == 4) { return bytes[3] & 0xFF | // (bytes[2] & 0xFF) << 8 | // (bytes[1] & 0xFF) << 16 | // (bytes[0] & 0xFF) << 24; // } return result; } public String byte2hex(byte[] bytes) { StringBuilder sb = new StringBuilder(); String tmp = null; for (byte b : bytes) { //将每个字节与0xFF进行与运算,然后转化为10进制,然后借助于Integer再转化为16进制 tmp = Integer.toHexString(0xFF & b); if (tmp.length() == 1) { tmp = "0" + tmp; } sb.append(tmp + " "); } return sb.toString(); } @RequiresApi(api = Build.VERSION_CODES.O) @Override protected synchronized LinkResponse decoder(Object msg, String ipaddress) throws Exception { if (msg == null || !(msg instanceof byte[])) { return null; } byte[] bytes = (byte[]) msg; try { byteBuffer.put(bytes); } catch (Exception e) { LogUtils.e("接收到数据异常:\r\n" + e.getMessage()); byteBuffer.flip(); byteBuffer.clear(); } try { //如果多条命令打包在一条数据中,都需要处理完 while (true) { removeInVoidBytes();//移除可能存在的无效数据 //头部数据 String header = getHeader(); if (header == null) { break; } String[] topMsgs = header.split("\r\n"); String topic = getTopic(topMsgs); int lenght = getLenght(topMsgs); if (topic == null || lenght <= 0) { //获取不到主题或者头部数据还没有接收完成 break; } int bodyIndex = getBodyIndex(); //是否已经获取完整所有的数据 byte[] body = getBody(bodyIndex, lenght); if (body == null) { //当前数据还没有接收完成 break; } remove(bodyIndex + lenght); if (topic.contains("heartbeat_reply")) { // if (packet.getSocket() != null) { // packet.getSocket().setSoTimeout(10 * 1000); // } continue; } LinkResponse response = new LinkResponse(); response.setSource_ipAddress(ipaddress); response.setTopic(topic); if (encrypt(body)) { //需要解密 if (!TextUtils.isEmpty(HDLLinkConfig.getInstance().getLocalSecret())) { byte[] bodyBytes = AesUtil.aesDecrypt(body, HDLLinkConfig.getInstance().getLocalSecret()); if (bodyBytes != null) { response.setData(new String(bodyBytes, StandardCharsets.UTF_8)); } else { LogUtils.e("解密失败\r\n" + topic); response.setData(new String(body, "utf-8")); continue; } } } else { response.setData(new String(body, "utf-8")); } LogUtils.i("本地接收到数据:\r\n" + response.getTopic() + "\r\n" + response.getData() + "\r\n" + response.getData().length()); //解析完成,topic发送一次 EventDispatcher.getInstance().post(response.getTopic(), response); } } catch (Exception ee) { LogUtils.e("处理接收的数据异常:\r\n" + ee.getMessage()); } return null; } //是否加密 private boolean encrypt(byte[] bytes) { if (bytes[0] == '{' && bytes[bytes.length - 1] == '}' || (bytes[0] == '[' && bytes[bytes.length - 1] == ']')) { return false; } return true; } }