From d8bf4f4d66715f002d024cae92862c1d83daa425 Mon Sep 17 00:00:00 2001
From: hxb <hxb@hdlchina.com.cn>
Date: 星期日, 12 十二月 2021 22:06:35 +0800
Subject: [PATCH] 更改了udp的机制
---
HDLSDK/hdl-connect/src/main/java/com/hdl/sdk/connect/socket/HdlSocketHelper.java | 111 +++++++++++++++++++++++++++++++++++++++++++++++--------
1 files changed, 95 insertions(+), 16 deletions(-)
diff --git a/HDLSDK/hdl-connect/src/main/java/com/hdl/sdk/connect/socket/HdlSocketHelper.java b/HDLSDK/hdl-connect/src/main/java/com/hdl/sdk/connect/socket/HdlSocketHelper.java
index 53ef271..ea53e38 100644
--- a/HDLSDK/hdl-connect/src/main/java/com/hdl/sdk/connect/socket/HdlSocketHelper.java
+++ b/HDLSDK/hdl-connect/src/main/java/com/hdl/sdk/connect/socket/HdlSocketHelper.java
@@ -7,7 +7,9 @@
import com.hdl.sdk.common.utils.ThreadToolUtils;
import com.hdl.sdk.connect.bean.LinkRequest;
import com.hdl.sdk.socket.SocketBoot;
+import com.hdl.sdk.socket.udp.UdpSocketBoot;
+import java.net.InetSocketAddress;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -25,15 +27,17 @@
private final Long sendAwaitTime;
private final int maxRetry;
- private final SocketBoot boot;
+ private SocketBoot boot;
+ private UdpSocketBoot udpSocketBoot;
+ private InetSocketAddress inetSocketAddress;
private final LinkRequest linkRequest;
private final EventListener eventListener;
private final AtomicInteger sendNumber;
- private final AtomicBoolean isSend = new AtomicBoolean();
+ private final AtomicBoolean isSend = new AtomicBoolean(false);
- private final HdlSocketListener listener;
+ private HdlSocketListener listener;
private ScheduledExecutorService sendThread;
@@ -66,12 +70,37 @@
EventDispatcher.getInstance().register(observeTopic, eventListener);
}
+ private HdlSocketHelper(Long sendAwaitTime, int maxRetry, UdpSocketBoot udpSocketBoot,
+ InetSocketAddress inetSocketAddress, LinkRequest linkRequest, String observeTopic, HdlSocketListener listener) {
+ this.sendAwaitTime = sendAwaitTime;
+ this.maxRetry = maxRetry;
+ this.udpSocketBoot = udpSocketBoot;
+ this.inetSocketAddress = inetSocketAddress;
+ this.linkRequest = linkRequest;
+ this.listener = listener;
+ this.sendNumber = new AtomicInteger(0);
+ eventListener = new EventListener() {
+ @Override
+ public void onMessage(Object msg) {
+ isSend.set(true);
+ if (listener != null) {
+ listener.onSucceed(msg);
+ }
+ if (sendThread != null) {
+ sendThread.shutdownNow();
+ }
+ EventDispatcher.getInstance().remove(eventListener);
+ }
+ };
+ EventDispatcher.getInstance().register(observeTopic, eventListener);
+ }
+
public static void send(SocketBoot boot, LinkRequest linkRequest, String observeTopic, HdlSocketListener listener, Long sendAwaitTime, int maxRetry) {
if (TextUtils.isEmpty(observeTopic)) {
observeTopic = linkRequest.getTopic() + "_reply";
}
HdlSocketHelper socketHelper = new HdlSocketHelper(sendAwaitTime, maxRetry, boot, linkRequest, observeTopic, listener);
- socketHelper.resend();
+ socketHelper.send();
}
public static void send(SocketBoot boot, LinkRequest linkRequest, String observeTopic, HdlSocketListener listener) {
@@ -87,34 +116,78 @@
send(boot, linkRequest, "", listener, DEF_SEND_TIMEOUT, DEF_SEND_ONE);
}
- private void resend() {
+ /**
+ * Udp鐨勫彂閫佹柟娉�
+ *
+ * @param udpSocketBoot Udp褰撳墠瀵规帴
+ * @param inetSocketAddress 鍙戦�佺殑鐩爣鍦板潃
+ * @param linkRequest 鍙戦�佺殑鏁版嵁
+ * @param observeTopic 鍙戦�佺殑涓婚
+ * @param listener 鍥炶皟
+ */
+ public static void send(UdpSocketBoot udpSocketBoot, InetSocketAddress inetSocketAddress, LinkRequest linkRequest, String observeTopic, HdlSocketListener listener) {
+ if (TextUtils.isEmpty(observeTopic)) {
+ observeTopic = linkRequest.getTopic() + "_reply";
+ }
+ HdlSocketHelper socketHelper = new HdlSocketHelper(DEF_SEND_TIMEOUT, DEF_SEND_ONE, udpSocketBoot, inetSocketAddress, linkRequest, observeTopic, listener);
+ socketHelper.send();
+ }
+
+ /**
+ * Udp鍙戦�佺殑鏂规硶
+ *
+ * @param udpSocketBoot 褰撳墠Udp瀵硅薄
+ * @param inetSocketAddress 鍙戦�佺殑鐩爣鍦板潃
+ * @param linkRequest 鍙戦�佺殑瀵硅薄
+ * @param listener 鍥炶皟
+ */
+ public static void send(UdpSocketBoot udpSocketBoot, InetSocketAddress inetSocketAddress, LinkRequest linkRequest, HdlSocketListener listener) {
+ String observeTopic = null;
+ if (TextUtils.isEmpty(observeTopic)) {
+ observeTopic = linkRequest.getTopic() + "_reply";
+ }
+ HdlSocketHelper socketHelper = new HdlSocketHelper(DEF_SEND_TIMEOUT, DEF_SEND_ONE, udpSocketBoot, inetSocketAddress, linkRequest, observeTopic, listener);
+ socketHelper.send();
+ }
+
+ private void send() {
getSendThread().scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
- if ((sendNumber.get() < maxRetry + 2) || !isSend.get()) {
+ //鍙戦�佹鏁板皬浜庨噸鍙戞鏁�
+ if ((sendNumber.get() < maxRetry)) {
try {
- if (sendNumber.get() < maxRetry + 1) {
+ //杩樻病鏈夋敹鍒板洖澶嶏紝鍐嶅彂閫�
+ if (!isSend.get()) {
+ sendNumber.set(sendNumber.get() + 1);
+ //濡傛槸tcp
if (boot != null) {
boot.sendMsg(linkRequest.getSendBytes());
}
- sendNumber.set(sendNumber.get() + 1);
- } else {
- notifyFailure();
+ //濡傛灉鏄痷dp
+ if (null != udpSocketBoot) {
+ udpSocketBoot.sendMsg(inetSocketAddress, linkRequest.getSendBytes());
+ }
}
} catch (Exception e) {
e.printStackTrace();
- sendNumber.set(sendNumber.get() + 1);
- } finally {
- if (sendNumber.get() > maxRetry + 1 && !isSend.get()) {
- notifyFailure();
- }
+ }
+ } else {
+ //瓒呭嚭閲嶅彂娆℃暟骞舵病鏈夋敹鍒板洖澶�
+ if (!isSend.get()) {
+ notifyFailure();
}
}
}
}, sendAwaitTime, sendAwaitTime, TimeUnit.MILLISECONDS);
}
- public ScheduledExecutorService getSendThread() {
+ /**
+ * 鑾峰彇鍙戦�佺嚎绋�
+ *
+ * @return 杩斿洖鑾峰彇鍒扮殑绾跨▼
+ */
+ private ScheduledExecutorService getSendThread() {
if (sendThread == null) {
sendThread = ThreadToolUtils.getInstance().newScheduledThreadPool(1);
}
@@ -122,12 +195,18 @@
}
+ /**
+ * 鍙戦�佸け璐�
+ */
private void notifyFailure() {
+ EventDispatcher.getInstance().remove(eventListener);
if (sendThread != null) {
sendThread.shutdownNow();
+ sendThread = null;
}
if (listener != null) {
listener.onFailure();
+ listener = null;
}
}
}
--
Gitblit v1.8.0