JLChen
2021-11-15 44155b50cbb4f6ad78474f40331ed8838a3b0d49
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package com.hdl.sdk.connect.protocol;
 
 
import android.util.Log;
 
import com.hdl.sdk.common.config.AuthenticateConfig;
import com.hdl.sdk.common.event.EventDispatcher;
import com.hdl.sdk.common.utils.ByteUtils;
import com.hdl.sdk.connect.bean.LinkResponse;
import com.hdl.sdk.connect.utils.AesUtil;
import com.hdl.sdk.connect.utils.ProtocolParse;
import com.hdl.sdk.socket.codec.ByteToMessageDecoder;
 
import java.util.ArrayList;
import java.util.List;
 
/**
 * Created by Tong on 2021/9/22.
 * link协议粘包拆包
 */
public class LinkMessageDecoder extends ByteToMessageDecoder<LinkResponse> {
 
    private final List<Byte> bytes;
 
    private final byte[] head = "Topic:".getBytes();
    private final byte[] body = "\r\n\r\n".getBytes();
 
    public LinkMessageDecoder() {
        this.bytes = new ArrayList<>();
    }
 
    @Override
    protected LinkResponse decoder(Object msg) throws Exception {
        LinkResponse response = new LinkResponse();
        if (msg instanceof byte[]) {
            //解析流
            byte[] data = (byte[]) msg;
            bytes.addAll(ByteUtils.toByteList(data));
 
            byte[] byteArray = ByteUtils.toByteArray(bytes);
            int headIndex = ByteUtils.getByteIndexOf(byteArray, head);
            if (headIndex > 0) {
                //移动到head 开始位置
                bytes.subList(0, headIndex).clear();
                byteArray = ByteUtils.toByteArray(bytes);
            }
 
            int bodyIndex = ByteUtils.getByteIndexOf(byteArray, body);
            if (bodyIndex < 0) {
                //头部未获取完成
                return null;
            }
            int bodyStartIndex = bodyIndex + body.length;
 
            //解析头部
            ProtocolParse parse = new ProtocolParse(byteArray);
            response.setTopic(parse.getTopic());
 
            int bodyLength = parse.getLength();
            if (bodyLength > 0) {
                if (byteArray.length >= bodyLength + bodyStartIndex) {
                    byte[] body = ByteUtils.getRangeBytes(bytes, bodyStartIndex, bodyStartIndex + bodyLength);
 
                    if(AuthenticateConfig.getInstance().ifNeedEncrypt(response.getTopic())){
                        //需要解密
                        byte[] bodyBytes = AesUtil.aesDecrypt(body,AuthenticateConfig.getInstance().getLocalSecret());
                        response.setData(new String(bodyBytes, "utf-8"));
                        Log.i("TAG", "decoder: ");
                    }else{
                        response.setData(new String(body, "utf-8"));
                    }
 
 
 
                    if (byteArray.length >= bodyLength + bodyStartIndex) {
                        //保存余留
                        byte[] remaining = ByteUtils.getRangeBytes(bytes, bodyStartIndex + bodyLength, byteArray.length);
                        bytes.clear();
                        for (byte b : remaining) {
                            bytes.add(b);
                        }
                    }
                    //解析完成,topic发送一次
                    EventDispatcher.getInstance().post(response.getTopic(), response);
                    return response;
                }
            } else if (bodyLength == 0) {
                //body为空
                return response;
            }
 
        }
        return null;
    }
}