wjc
2025-05-23 971a24a9e58a21bc306897fd3ad63012a399f7db
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
package com.hdl.sdk.link.core.protocol;
 
 
import com.hdl.sdk.link.common.utils.LogUtils;
import com.hdl.sdk.link.common.event.EventDispatcher;
import com.hdl.sdk.link.common.utils.ByteUtils;
import com.hdl.sdk.link.core.bean.LinkPacket;
import com.hdl.sdk.link.core.bean.LinkResponse;
import com.hdl.sdk.link.core.utils.ByteBufferUtils;
import com.hdl.sdk.link.core.utils.QueueUtils;
import com.hdl.sdk.link.gateway.HDLLinkLocalGateway;
import com.hdl.sdk.link.socket.bean.Packet;
import com.hdl.sdk.link.socket.codec.ByteToMessageDecoder;
 
import java.nio.ByteBuffer;
 
/**
 * Created by Tong on 2021/9/22.
 * link协议粘包拆包
 */
public class LinkMessageDecoder extends ByteToMessageDecoder<LinkResponse> {
 
    private static final String TAG=LinkMessageDecoder.class.getName();
    //instance
    private volatile static LinkMessageDecoder instance;
 
    //getInstance
    public static synchronized LinkMessageDecoder getInstance() {
        if (instance == null) {
            synchronized (LinkMessageDecoder.class) {
                if (instance == null) {
                    instance = new LinkMessageDecoder();
                }
            }
        }
        return instance;
    }
 
    /**
     * 接收数据缓冲区
     */
    private final ByteBuffer byteBuffer;
 
    private final byte[] head = "Topic:".getBytes();
 
    public LinkMessageDecoder() {
        byteBuffer = ByteBuffer.allocate(1024 * 200);//100K
    }
 
    /// <summary>
    /// 获取内容长度
    /// </summary>
    /// <param name="topMsgs"></param>
    /// <returns></returns>
    int getLenght(String[] topMsgs) {
        try {
            for (int i = 0; i < topMsgs.length; i++) {
                String topMsg = topMsgs[i].trim();
                if (topMsg.startsWith("Length:")) {
                    return Integer.parseInt(topMsg.replace("Length:", "").trim());
                }
            }
        } catch (Exception e) {
            LogUtils.e("异常数据:" + topMsgs[0] + "\r\n" + topMsgs[1]);
            return -1;
        }
        //找不到长度
        return -1;
    }
 
    /// <summary>
    /// 获取主题
    /// </summary>
    /// <param name="topMsgs"></param>
    /// <returns></returns>
    private String getTopic(String[] topMsgs) {
        for (int i = 0; i < topMsgs.length; i++) {
            String topMsg = topMsgs[i].trim();
            if (topMsg.startsWith("Topic:")) {
                return topMsg.replace("Topic:", "");
            }
        }
        //找不到主题
        return null;
    }
 
    /**
     * 获取数据的开始位置
     *
     * @return 数据位的开始索引
     */
    int getBodyIndex() {
        byte r = (byte) '\r';
        byte n = (byte) '\n';
        for (int i = 0; i < byteBuffer.position(); i++) {
            //找出数据内容前面的两个换行
            if (3 <= i && byteBuffer.get(i - 3) == r && byteBuffer.get(i - 2) == n && byteBuffer.get(i - 1) == r && byteBuffer.get(i) == n) {
                //剩余的数据
                return i + 1;
            }
        }
        return -1;
    }
 
    /**
     * 获取头部数据
     *
     * @return
     */
    String getHeader() {
        int bodyIndex = getBodyIndex();
        if (bodyIndex < 0) {
            //没有找到头部数据
            return null;
        } else {
            byte bodyBytes[] = ByteBufferUtils.copyBytes(byteBuffer, bodyIndex);
            return new String(bodyBytes);
        }
    }
 
    /**
     * 获取数据内容
     *
     * @param lenght
     * @return
     */
    byte[] getBody(int index, int lenght) {
        //是否已经获取完整所有的数据
        byte[] bodyBytes = new byte[lenght];
        if (index < 0 || byteBuffer.position() < index + lenght) {
            //当前数据还没有接收完成
            return null;
        }
 
        for (int i = 0; i < bodyBytes.length; i++) {
            bodyBytes[i] = byteBuffer.get(index + i);
        }
        return bodyBytes;
    }
 
 
 
    /**
     * 这边处理了缓存数据粘包的情况,每次请求都需要吧当前完整的文件除去   以便于下次的返回
     * tempList用于存储多余的数据
     * contentList用于本次数据的存储(发送给订阅的数据)
     */
    byte[] geBody() {
        int len = 3 + 2 + 2 + 4 + ((byteBuffer.get(7) & 0xFF) * 256 * 256 * 256) + ((byteBuffer.get(8) & 0xFF) * 256 * 256) + ((byteBuffer.get(9) & 0xFF) * 256) + (byteBuffer.get(10) & 0xFF);
        if (byteBuffer.position() < len) {
            return null;
        }
        byte[] bodyBytes = new byte[len];
        for (int i = 0; i < len; i++) {
            bodyBytes[i] = byteBuffer.get(i);
        }
 
        int endIndex = byteBuffer.position();
        byteBuffer.clear();
        for (int i = len; i < endIndex; i++) {
            byteBuffer.put(byteBuffer.get(i));
        }
        return bodyBytes;
    }
 
    /**
     * 移除可能存在的无效数据
     */
    void removeInVoidBytes() {
        int index = 0;
        boolean isMatch = false;
        for (; index < byteBuffer.position() - head.length; index++) {
            isMatch = true;
            for (int j = 0, k = 0; j < head.length; j++, k++) {
                if (head[j] != byteBuffer.get(index + k)) {
                    isMatch = false;
                    break;
                }
            }
            if (isMatch) {
                break;
            }
        }
 
        if (0 < index && isMatch) {
            int endIndex = byteBuffer.position();
            byteBuffer.clear();
            for (int i = index; i < endIndex; i++) {
                byteBuffer.put(byteBuffer.get(i));
            }
        }
    }
 
    /**
     * 移除到指定位置前面的数据
     *
     * @param position 指定位置
     */
    void remove(int position) {
        int endIndex = byteBuffer.position();
        byteBuffer.clear();
        for (int i = position; i < endIndex; i++) {
            byteBuffer.put(byteBuffer.get(i));
        }
    }
 
    void fileManger(int commandAck, byte[] recevieBytes) {
        String topic = "65531_" + commandAck;
        LinkResponse response = new LinkResponse();
        response.setTopic(topic);
        response.setByteData(recevieBytes);
        EventDispatcher.getInstance().post(response.getTopic(), response);
    }
 
    int bytes2int(byte[] bytes) {
        int result = 0;
        if (bytes.length == 2) {
            int c = (bytes[0] & 0xff) << 8;
            int d = (bytes[1] & 0xff);
            result = c | d;
        } else if (bytes.length == 4) {
            return bytes[3] & 0xFF | //
                    (bytes[2] & 0xFF) << 8 | //
                    (bytes[1] & 0xFF) << 16 | //
                    (bytes[0] & 0xFF) << 24; //
        }
        return result;
    }
 
    public String byte2hex(byte[] bytes) {
        StringBuilder sb = new StringBuilder();
        String tmp = null;
        for (byte b : bytes) {
            //将每个字节与0xFF进行与运算,然后转化为10进制,然后借助于Integer再转化为16进制
            tmp = Integer.toHexString(0xFF & b);
            if (tmp.length() == 1) {
                tmp = "0" + tmp;
            }
            sb.append(tmp + " ");
        }
        return sb.toString();
    }
 
    @Override
    protected synchronized LinkResponse decoder(Packet packet) {
        try {
            if (null == packet) {
                return null;
            }
            byteBuffer.put(packet.getBytes());
        } catch (Exception e) {
            LogUtils.e("接收到数据异常:\r\n" + e.getMessage());
            byteBuffer.flip();
            byteBuffer.clear();
        }
        try {
            //如果多条命令打包在一条数据中,都需要处理完
            while (true) {
                if (byteBuffer.position() > 2) {//判断是否是文件处理通知 wxr
                    byte[] topBytes = new byte[3];
                    topBytes[0] = byteBuffer.get(0);
                    topBytes[1] = byteBuffer.get(1);
                    topBytes[2] = byteBuffer.get(2);
                    if (new String(topBytes).equals("hex")) {
                        //TODO 这块代码统一移出其它地方处理
                        byte[] commandBytes = ByteBufferUtils.copyBytes(byteBuffer, 5, 2);
                        int command = bytes2int(commandBytes);
                        byte[] submitBytes = geBody();
                        if(submitBytes==null)
                        {
                            //还没有接收完成
                            continue;
                        }
                        if (command == 258 || command == 260 || command == 261) {
                            //读取驱动列表响应 ||驱动安装申请响应
                            if (submitBytes.length > 11) {
                                byte[] rangeBytes = ByteUtils.copyBytes(submitBytes, 11, submitBytes.length - 11);
                                fileManger(command, rangeBytes);
                            } else {
                                //方便问题排查
                                fileManger(command, submitBytes);
                            }
                        } else {
                            //给秀桡使用  后面的业务最好都在这边处理 不然会造成业务分散
                            fileManger(command, submitBytes);
                        }
                        continue;
                    }
                }
                removeInVoidBytes();//移除可能存在的无效数据
 
                //头部数据
                String header = getHeader();
 
                if (header == null) {
                    break;
                }
                String[] topMsgs = header.split("\r\n");
 
                String topic = getTopic(topMsgs);
                int lenght = getLenght(topMsgs);
                if (topic == null || lenght <= 0) {
                    //获取不到主题或者头部数据还没有接收完成
                    break;
                }
 
                int bodyIndex = getBodyIndex();
                //是否已经获取完整所有的数据
                byte[] body = getBody(bodyIndex, lenght);
 
                if (body == null) {
                    //当前数据还没有接收完成
                    break;
                }
 
                remove(bodyIndex + lenght);
 
                if (topic.contains("heartbeat_reply")) {
                    if (packet.getSocket() != null) {
                        packet.getSocket().setSoTimeout(10 * 1000);
                    }
                    continue;
                }
 
                QueueUtils.getInstance().add(new LinkPacket(topic, body));
            }
        } catch (Exception ee) {
            LogUtils.e("处理接收的数据异常:\r\n" + ee.getMessage());
        }
        return null;
    }
}