| | |
| | | package com.hdl.sdk.connect.protocol;
|
| | |
|
| | |
|
| | | import com.hdl.sdk.common.config.AuthenticateConfig;
|
| | | import android.os.Build;
|
| | | import android.text.TextUtils;
|
| | |
|
| | | import androidx.annotation.RequiresApi;
|
| | |
|
| | | import com.hdl.sdk.common.event.EventDispatcher;
|
| | | import com.hdl.sdk.common.utils.ByteUtils;
|
| | | import com.hdl.sdk.common.utils.LogUtils;
|
| | | import com.hdl.sdk.connect.bean.LinkResponse;
|
| | | import com.hdl.sdk.connect.utils.AESUtils;
|
| | | import com.hdl.sdk.connect.config.HDLLinkConfig;
|
| | | import com.hdl.sdk.connect.utils.AesUtil;
|
| | | import com.hdl.sdk.connect.utils.ProtocolParse;
|
| | | import com.hdl.sdk.connect.utils.ByteBufferUtils;
|
| | | import com.hdl.sdk.socket.codec.ByteToMessageDecoder;
|
| | |
|
| | | import java.util.ArrayList;
|
| | | import java.util.List;
|
| | | import java.nio.ByteBuffer;
|
| | | import java.nio.charset.StandardCharsets;
|
| | |
|
| | | /**
|
| | | * Created by Tong on 2021/9/22.
|
| | |
| | | */
|
| | | public class LinkMessageDecoder extends ByteToMessageDecoder<LinkResponse> {
|
| | |
|
| | | private final List<Byte> bytes;
|
| | | private static final String TAG = LinkMessageDecoder.class.getName();
|
| | | //instance
|
| | | private volatile static LinkMessageDecoder instance;
|
| | |
|
| | | private final byte[] head = "Topic:".getBytes();
|
| | | private final byte[] body = "\r\n\r\n".getBytes();
|
| | |
|
| | | public LinkMessageDecoder() {
|
| | | this.bytes = new ArrayList<>();
|
| | | //getInstance
|
| | | public static synchronized LinkMessageDecoder getInstance() {
|
| | | if (instance == null) {
|
| | | synchronized (LinkMessageDecoder.class) {
|
| | | if (instance == null) {
|
| | | instance = new LinkMessageDecoder();
|
| | | }
|
| | | }
|
| | | }
|
| | | return instance;
|
| | | }
|
| | |
|
| | | /**
|
| | | * 接收数据缓冲区
|
| | | */
|
| | | private final ByteBuffer byteBuffer;
|
| | |
|
| | | private final byte[] head = "Topic:".getBytes();
|
| | |
|
| | | public LinkMessageDecoder() {
|
| | | byteBuffer = ByteBuffer.allocate(1024 * 200);//100K
|
| | | }
|
| | |
|
| | | /// <summary>
|
| | | /// 获取内容长度
|
| | | /// </summary>
|
| | | /// <param name="topMsgs"></param>
|
| | | /// <returns></returns>
|
| | | int getLenght(String[] topMsgs) {
|
| | | try {
|
| | | for (int i = 0; i < topMsgs.length; i++) {
|
| | | String topMsg = topMsgs[i].trim();
|
| | | if (topMsg.startsWith("Length:")) {
|
| | | return Integer.parseInt(topMsg.replace("Length:", "").trim());
|
| | | }
|
| | | }
|
| | | } catch (Exception e) {
|
| | | LogUtils.e("异常数据:" + topMsgs[0] + "\r\n" + topMsgs[1]);
|
| | | return -1;
|
| | | }
|
| | | //找不到长度
|
| | | return -1;
|
| | | }
|
| | |
|
| | | /// <summary>
|
| | | /// 获取主题
|
| | | /// </summary>
|
| | | /// <param name="topMsgs"></param>
|
| | | /// <returns></returns>
|
| | | private String getTopic(String[] topMsgs) {
|
| | | for (int i = 0; i < topMsgs.length; i++) {
|
| | | String topMsg = topMsgs[i].trim();
|
| | | if (topMsg.startsWith("Topic:")) {
|
| | | return topMsg.replace("Topic:", "");
|
| | | }
|
| | | }
|
| | | //找不到主题
|
| | | return null;
|
| | | }
|
| | |
|
| | | /**
|
| | | * 获取数据的开始位置
|
| | | *
|
| | | * @return 数据位的开始索引
|
| | | */
|
| | | int getBodyIndex() {
|
| | | byte r = (byte) '\r';
|
| | | byte n = (byte) '\n';
|
| | | for (int i = 0; i < byteBuffer.position(); i++) {
|
| | | //找出数据内容前面的两个换行
|
| | | if (3 <= i && byteBuffer.get(i - 3) == r && byteBuffer.get(i - 2) == n && byteBuffer.get(i - 1) == r && byteBuffer.get(i) == n) {
|
| | | //剩余的数据
|
| | | return i + 1;
|
| | | }
|
| | | }
|
| | | return -1;
|
| | | }
|
| | |
|
| | | /**
|
| | | * 获取头部数据
|
| | | *
|
| | | * @return
|
| | | */
|
| | | String getHeader() {
|
| | | int bodyIndex = getBodyIndex();
|
| | | if (bodyIndex < 0) {
|
| | | //没有找到头部数据
|
| | | return null;
|
| | | } else {
|
| | | byte bodyBytes[] = ByteBufferUtils.copyBytes(byteBuffer, bodyIndex);
|
| | | return new String(bodyBytes);
|
| | | }
|
| | | }
|
| | |
|
| | | /**
|
| | | * 获取数据内容
|
| | | *
|
| | | * @param lenght
|
| | | * @return
|
| | | */
|
| | | byte[] getBody(int index, int lenght) {
|
| | | //是否已经获取完整所有的数据
|
| | | byte[] bodyBytes = new byte[lenght];
|
| | | if (index < 0 || byteBuffer.position() < index + lenght) {
|
| | | //当前数据还没有接收完成
|
| | | return null;
|
| | | }
|
| | |
|
| | | for (int i = 0; i < bodyBytes.length; i++) {
|
| | | bodyBytes[i] = byteBuffer.get(index + i);
|
| | | }
|
| | | return bodyBytes;
|
| | | }
|
| | |
|
| | |
|
| | | /**
|
| | | * 这边处理了缓存数据粘包的情况,每次请求都需要吧当前完整的文件除去 以便于下次的返回
|
| | | * tempList用于存储多余的数据
|
| | | * contentList用于本次数据的存储(发送给订阅的数据)
|
| | | */
|
| | | byte[] geBody() {
|
| | | int len = 3 + 4 + 4 + ((byteBuffer.get(7) & 0xFF) * 256 * 256 * 256) + ((byteBuffer.get(8) & 0xFF) * 256 * 256) + ((byteBuffer.get(9) * 256) & 0xFF) + (byteBuffer.get(10) & 0xFf);
|
| | | byte[] bodyBytes = new byte[len];
|
| | | for (int i = 0; i < len; i++) {
|
| | | bodyBytes[i] = byteBuffer.get(i);
|
| | | }
|
| | |
|
| | | int endIndex = byteBuffer.position();
|
| | | byteBuffer.clear();
|
| | | for (int i = len; i < endIndex; i++) {
|
| | | byteBuffer.put(byteBuffer.get(i));
|
| | | }
|
| | | return bodyBytes;
|
| | | }
|
| | |
|
| | | /**
|
| | | * 移除可能存在的无效数据
|
| | | */
|
| | | void removeInVoidBytes() {
|
| | | int index = 0;
|
| | | boolean isMatch = false;
|
| | | for (; index < byteBuffer.position() - head.length; index++) {
|
| | | isMatch = true;
|
| | | for (int j = 0, k = 0; j < head.length; j++, k++) {
|
| | | if (head[j] != byteBuffer.get(index + k)) {
|
| | | isMatch = false;
|
| | | break;
|
| | | }
|
| | | }
|
| | | if (isMatch) {
|
| | | break;
|
| | | }
|
| | | }
|
| | |
|
| | | if (0 < index && isMatch) {
|
| | | int endIndex = byteBuffer.position();
|
| | | byteBuffer.clear();
|
| | | for (int i = index; i < endIndex; i++) {
|
| | | byteBuffer.put(byteBuffer.get(i));
|
| | | }
|
| | | }
|
| | | }
|
| | |
|
| | | /**
|
| | | * 移除到指定位置前面的数据
|
| | | *
|
| | | * @param position 指定位置
|
| | | */
|
| | | void remove(int position) {
|
| | | int endIndex = byteBuffer.position();
|
| | | byteBuffer.clear();
|
| | | for (int i = position; i < endIndex; i++) {
|
| | | byteBuffer.put(byteBuffer.get(i));
|
| | | }
|
| | | }
|
| | |
|
| | |
|
| | | int bytes2int(byte[] bytes) {
|
| | | int result = 0;
|
| | | if (bytes.length == 2) {
|
| | | int c = (bytes[0] & 0xff) << 8;
|
| | | int d = (bytes[1] & 0xff);
|
| | | result = c | d;
|
| | | } else if (bytes.length == 4) {
|
| | | return bytes[3] & 0xFF | //
|
| | | (bytes[2] & 0xFF) << 8 | //
|
| | | (bytes[1] & 0xFF) << 16 | //
|
| | | (bytes[0] & 0xFF) << 24; //
|
| | | }
|
| | | return result;
|
| | | }
|
| | |
|
| | | public String byte2hex(byte[] bytes) {
|
| | | StringBuilder sb = new StringBuilder();
|
| | | String tmp = null;
|
| | | for (byte b : bytes) {
|
| | | //将每个字节与0xFF进行与运算,然后转化为10进制,然后借助于Integer再转化为16进制
|
| | | tmp = Integer.toHexString(0xFF & b);
|
| | | if (tmp.length() == 1) {
|
| | | tmp = "0" + tmp;
|
| | | }
|
| | | sb.append(tmp + " ");
|
| | | }
|
| | | return sb.toString();
|
| | | }
|
| | |
|
| | |
|
| | | @RequiresApi(api = Build.VERSION_CODES.O)
|
| | | @Override
|
| | | protected LinkResponse decoder(Object msg) throws Exception {
|
| | | LinkResponse response = new LinkResponse();
|
| | | if (msg instanceof byte[]) {
|
| | | //解析流
|
| | | byte[] data = (byte[]) msg;
|
| | | bytes.addAll(ByteUtils.toByteList(data));
|
| | | protected synchronized LinkResponse decoder(Object msg, String ipaddress) throws Exception {
|
| | | if (msg == null || !(msg instanceof byte[])) {
|
| | | return null;
|
| | | }
|
| | |
|
| | | byte[] byteArray = ByteUtils.toByteArray(bytes);
|
| | | int headIndex = ByteUtils.getByteIndexOf(byteArray, head);
|
| | | if (headIndex > 0) {
|
| | | //移动到head 开始位置
|
| | | bytes.subList(0, headIndex).clear();
|
| | | byteArray = ByteUtils.toByteArray(bytes);
|
| | | }
|
| | | byte[] bytes = (byte[]) msg;
|
| | | try {
|
| | | byteBuffer.put(bytes);
|
| | | } catch (Exception e) {
|
| | | LogUtils.e("接收到数据异常:\r\n" + e.getMessage());
|
| | | byteBuffer.flip();
|
| | | byteBuffer.clear();
|
| | | }
|
| | |
|
| | | int bodyIndex = ByteUtils.getByteIndexOf(byteArray, body);
|
| | | if (bodyIndex < 0) {
|
| | | //头部未获取完成
|
| | | return null;
|
| | | }
|
| | | int bodyStartIndex = bodyIndex + body.length;
|
| | | try {
|
| | | //如果多条命令打包在一条数据中,都需要处理完
|
| | | while (true) {
|
| | | removeInVoidBytes();//移除可能存在的无效数据
|
| | |
|
| | | //解析头部
|
| | | ProtocolParse parse = new ProtocolParse(byteArray);
|
| | | response.setTopic(parse.getTopic());
|
| | | //头部数据
|
| | | String header = getHeader();
|
| | |
|
| | | int bodyLength = parse.getLength();
|
| | | if (bodyLength > 0) {
|
| | | if (byteArray.length >= bodyLength + bodyStartIndex) {
|
| | | byte[] body = ByteUtils.getRangeBytes(bytes, bodyStartIndex, bodyStartIndex + bodyLength);
|
| | | if (header == null) {
|
| | | break;
|
| | | }
|
| | | String[] topMsgs = header.split("\r\n");
|
| | |
|
| | | if(AuthenticateConfig.getInstance().ifNeedEncrypt(response.getTopic())){
|
| | | //需要解密
|
| | | byte[] bodyBytes = AesUtil.aesDecrypt(body,AuthenticateConfig.getInstance().getLocalSecret());
|
| | | // byte[] bodyBytes = AESUtils.decryptAES(body,AuthenticateConfig.getInstance().getLocalSecret());
|
| | | response.setData(new String(bodyBytes, "utf-8"));
|
| | | // Log.i("TAG", "解密 主题:"+response.getTopic()+ " body: "+response.getData());
|
| | | }else{
|
| | | response.setData(new String(body, "utf-8"));
|
| | | }
|
| | | String topic = getTopic(topMsgs);
|
| | | int lenght = getLenght(topMsgs);
|
| | | if (topic == null || lenght <= 0) {
|
| | | //获取不到主题或者头部数据还没有接收完成
|
| | | break;
|
| | | }
|
| | |
|
| | | if (byteArray.length >= bodyLength + bodyStartIndex) {
|
| | | //保存余留
|
| | | byte[] remaining = ByteUtils.getRangeBytes(bytes, bodyStartIndex + bodyLength, byteArray.length);
|
| | | bytes.clear();
|
| | | for (byte b : remaining) {
|
| | | bytes.add(b);
|
| | | int bodyIndex = getBodyIndex();
|
| | | //是否已经获取完整所有的数据
|
| | | byte[] body = getBody(bodyIndex, lenght);
|
| | |
|
| | | if (body == null) {
|
| | | //当前数据还没有接收完成
|
| | | break;
|
| | | }
|
| | |
|
| | | remove(bodyIndex + lenght);
|
| | |
|
| | | if (topic.contains("heartbeat_reply")) {
|
| | | // if (packet.getSocket() != null) {
|
| | | // packet.getSocket().setSoTimeout(10 * 1000);
|
| | | // }
|
| | | continue;
|
| | | }
|
| | |
|
| | | LinkResponse response = new LinkResponse();
|
| | | response.setSource_ipAddress(ipaddress);
|
| | | response.setTopic(topic);
|
| | |
|
| | | if (encrypt(body)) {
|
| | | //需要解密
|
| | | if (!TextUtils.isEmpty(HDLLinkConfig.getInstance().getLocalSecret())) {
|
| | | byte[] bodyBytes = AesUtil.aesDecrypt(body, HDLLinkConfig.getInstance().getLocalSecret());
|
| | | if (bodyBytes != null) {
|
| | | response.setData(new String(bodyBytes, StandardCharsets.UTF_8));
|
| | | } else {
|
| | | LogUtils.e("解密失败\r\n" + topic);
|
| | | response.setData(new String(body, "utf-8"));
|
| | | continue;
|
| | | }
|
| | | }
|
| | | //解析完成,topic发送一次
|
| | | EventDispatcher.getInstance().post(response.getTopic(), response);
|
| | | return response;
|
| | | } else {
|
| | | response.setData(new String(body, "utf-8"));
|
| | | }
|
| | | } else if (bodyLength == 0) {
|
| | | //body为空
|
| | | return response;
|
| | | }
|
| | |
|
| | | LogUtils.i("本地接收到数据:\r\n" + response.getTopic() + "\r\n" + response.getData() + "\r\n" + response.getData().length());
|
| | |
|
| | | //解析完成,topic发送一次
|
| | | EventDispatcher.getInstance().post(response.getTopic(), response);
|
| | | }
|
| | | } catch (Exception ee) {
|
| | | LogUtils.e("处理接收的数据异常:\r\n" + ee.getMessage());
|
| | | }
|
| | | return null;
|
| | | }
|
| | |
|
| | | //是否加密
|
| | | private boolean encrypt(byte[] bytes) {
|
| | | if (bytes[0] == '{' && bytes[bytes.length - 1] == '}' || (bytes[0] == '[' && bytes[bytes.length - 1] == ']')) {
|
| | | return false;
|
| | | }
|
| | | return true;
|
| | | }
|
| | | }
|
| | |
|
| | |
|