JLChen
2021-12-13 e7b8a808c2274e9c4329092bb752c7ea5cb035fc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
package com.hdl.sdk.connect.socket;
 
import android.text.TextUtils;
 
import com.hdl.sdk.common.event.EventDispatcher;
import com.hdl.sdk.common.event.EventListener;
import com.hdl.sdk.common.utils.ThreadToolUtils;
import com.hdl.sdk.connect.bean.LinkRequest;
import com.hdl.sdk.socket.SocketBoot;
import com.hdl.sdk.socket.udp.UdpSocketBoot;
 
import java.net.InetSocketAddress;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
 
/**
 * Created by Tong on 2021/11/11.
 */
public class HdlSocketHelper {
 
    private static final Long DEF_SEND_TIMEOUT = 1000L;
    private static final int DEF_MAX_RETRY = 4;
    private static final int DEF_SEND_ONE = 1;
 
    private final Long sendAwaitTime;
    private final int maxRetry;
 
    private SocketBoot boot;
    private UdpSocketBoot udpSocketBoot;
    /**
     * 发送的目标IP
     */
    private String ipAddress;
    /**
     * 发送的目标地址
     */
    private int port;
    private final LinkRequest linkRequest;
    private final EventListener eventListener;
 
    private final AtomicInteger sendNumber;
 
    private final AtomicBoolean isSend = new AtomicBoolean(false);
 
    private HdlSocketListener listener;
 
    private ScheduledExecutorService sendThread;
 
    public interface HdlSocketListener {
        void onSucceed(Object msg);
 
        void onFailure();
    }
 
    private HdlSocketHelper(Long sendAwaitTime, int maxRetry, SocketBoot boot, LinkRequest linkRequest, String observeTopic, HdlSocketListener listener) {
        this.sendAwaitTime = sendAwaitTime;
        this.maxRetry = maxRetry;
        this.boot = boot;
        this.linkRequest = linkRequest;
        this.listener = listener;
        this.sendNumber = new AtomicInteger(0);
        eventListener = new EventListener() {
            @Override
            public void onMessage(Object msg) {
                isSend.set(true);
                if (listener != null) {
                    listener.onSucceed(msg);
                }
                if (sendThread != null) {
                    sendThread.shutdownNow();
                }
                EventDispatcher.getInstance().remove(eventListener);
            }
        };
        EventDispatcher.getInstance().register(observeTopic, eventListener);
    }
 
    private HdlSocketHelper(Long sendAwaitTime, int maxRetry, UdpSocketBoot udpSocketBoot,
                            String ipAddress,int port , LinkRequest linkRequest, String observeTopic, HdlSocketListener listener) {
        this.sendAwaitTime = sendAwaitTime;
        this.maxRetry = maxRetry;
        this.udpSocketBoot = udpSocketBoot;
        this.ipAddress = ipAddress;
        this.port = port;
        this.linkRequest = linkRequest;
        this.listener = listener;
        this.sendNumber = new AtomicInteger(0);
        eventListener = new EventListener() {
            @Override
            public void onMessage(Object msg) {
                isSend.set(true);
                if (listener != null) {
                    listener.onSucceed(msg);
                }
                if (sendThread != null) {
                    sendThread.shutdownNow();
                }
                EventDispatcher.getInstance().remove(eventListener);
            }
        };
        EventDispatcher.getInstance().register(observeTopic, eventListener);
    }
 
    public static void send(SocketBoot boot, LinkRequest linkRequest, String observeTopic, HdlSocketListener listener, Long sendAwaitTime, int maxRetry) {
        if (TextUtils.isEmpty(observeTopic)) {
            observeTopic = linkRequest.getTopic() + "_reply";
        }
        HdlSocketHelper socketHelper = new HdlSocketHelper(sendAwaitTime, maxRetry, boot, linkRequest, observeTopic, listener);
        socketHelper.send();
    }
 
    public static void send(SocketBoot boot, LinkRequest linkRequest, String observeTopic, HdlSocketListener listener) {
        send(boot, linkRequest, observeTopic, listener, DEF_SEND_TIMEOUT, DEF_MAX_RETRY);
    }
 
 
    public static void send(SocketBoot boot, LinkRequest linkRequest, HdlSocketListener listener) {
        send(boot, linkRequest, "", listener, DEF_SEND_TIMEOUT, DEF_MAX_RETRY);
    }
 
    public static void sendOne(SocketBoot boot, LinkRequest linkRequest, HdlSocketListener listener) {
        send(boot, linkRequest, "", listener, DEF_SEND_TIMEOUT, DEF_SEND_ONE);
    }
 
    /**
     * Udp的发送方法
     *
     * @param udpSocketBoot     Udp当前对接
     * @param ipAddress 发送的目标IP地址
     * @param port 目的端口
     * @param linkRequest       发送的数据
     * @param observeTopic      发送的主题
     * @param retry 重发数次
     * @param listener          回调
     */
    public static void sendUdp(UdpSocketBoot udpSocketBoot, String  ipAddress ,int port, LinkRequest linkRequest, String observeTopic, int retry,HdlSocketListener listener) {
        if (TextUtils.isEmpty(observeTopic)) {
            observeTopic = linkRequest.getTopic() + "_reply";
        }
        HdlSocketHelper socketHelper = new HdlSocketHelper(DEF_SEND_TIMEOUT, retry, udpSocketBoot, ipAddress,port, linkRequest, observeTopic, listener);
        socketHelper.send();
    }
 
    /**
     * Udp的发送方法
     *
     * @param udpSocketBoot     Udp当前对接
     * @param ipAddress 发送的目标IP地址
     * @param port 目的端口
     * @param linkRequest       发送的数据
     * @param listener          回调
     */
    public static void sendUdp(UdpSocketBoot udpSocketBoot, String  ipAddress ,int port, LinkRequest linkRequest, HdlSocketListener listener) {
        sendUdp(udpSocketBoot,ipAddress,port,linkRequest,null,DEF_MAX_RETRY,listener);
    }
 
    /**
     * Udp的发送方法
     *
     * @param udpSocketBoot     Udp当前对接
     * @param ipAddress 发送的目标IP地址
     * @param port 目的端口
     * @param linkRequest       发送的数据
     */
    public static void sendUdpOne(UdpSocketBoot udpSocketBoot, String  ipAddress ,int port, LinkRequest linkRequest) {
        sendUdp(udpSocketBoot, ipAddress, port, linkRequest, null, DEF_SEND_ONE, null);
    }
 
    private void send() {
        getSendThread().scheduleWithFixedDelay(new Runnable() {
            @Override
            public void run() {
                //发送次数小于重发次数
                if ((sendNumber.get() < maxRetry)) {
                    try {
                        //还没有收到回复,再发送
                        if (!isSend.get()) {
                            sendNumber.set(sendNumber.get() + 1);
                            //如是tcp
                            if (boot != null) {
                                boot.sendMsg(linkRequest.getSendBytes());
                            }
                            //如果是udp
                            if (null != udpSocketBoot) {
                                udpSocketBoot.sendMsg(ipAddress,port, linkRequest.getSendBytes());
                            }
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                } else {
                    //超出重发次数并没有收到回复
                    if (!isSend.get()) {
                        notifyFailure();
                    }
                }
            }
        }, 0, sendAwaitTime, TimeUnit.MILLISECONDS);
        //initialdelay - 首次执行的延迟时间 0
        //delay - 一次执行终止和下一次执行开始之间的延迟
    }
 
    /**
     * 获取发送线程
     *
     * @return 返回获取到的线程
     */
    private ScheduledExecutorService getSendThread() {
        if (sendThread == null) {
            sendThread = ThreadToolUtils.getInstance().newScheduledThreadPool(1);
        }
        return sendThread;
    }
 
 
    /**
     * 发送失败
     */
    private void notifyFailure() {
        EventDispatcher.getInstance().remove(eventListener);
        if (sendThread != null) {
            sendThread.shutdownNow();
            sendThread = null;
        }
        if (listener != null) {
            listener.onFailure();
            listener = null;
        }
    }
}