From d8bf4f4d66715f002d024cae92862c1d83daa425 Mon Sep 17 00:00:00 2001 From: hxb <hxb@hdlchina.com.cn> Date: 星期日, 12 十二月 2021 22:06:35 +0800 Subject: [PATCH] 更改了udp的机制 --- HDLSDK/hdl-connect/src/main/java/com/hdl/sdk/connect/socket/HdlSocketHelper.java | 113 +++++++++++++++++++++++++++++++++++++++++++++++--------- 1 files changed, 95 insertions(+), 18 deletions(-) diff --git a/HDLSDK/hdl-connect/src/main/java/com/hdl/sdk/connect/socket/HdlSocketHelper.java b/HDLSDK/hdl-connect/src/main/java/com/hdl/sdk/connect/socket/HdlSocketHelper.java index 99a703a..ea53e38 100644 --- a/HDLSDK/hdl-connect/src/main/java/com/hdl/sdk/connect/socket/HdlSocketHelper.java +++ b/HDLSDK/hdl-connect/src/main/java/com/hdl/sdk/connect/socket/HdlSocketHelper.java @@ -7,7 +7,9 @@ import com.hdl.sdk.common.utils.ThreadToolUtils; import com.hdl.sdk.connect.bean.LinkRequest; import com.hdl.sdk.socket.SocketBoot; +import com.hdl.sdk.socket.udp.UdpSocketBoot; +import java.net.InetSocketAddress; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -25,15 +27,17 @@ private final Long sendAwaitTime; private final int maxRetry; - private final SocketBoot boot; + private SocketBoot boot; + private UdpSocketBoot udpSocketBoot; + private InetSocketAddress inetSocketAddress; private final LinkRequest linkRequest; private final EventListener eventListener; private final AtomicInteger sendNumber; - private final AtomicBoolean isSend = new AtomicBoolean(); + private final AtomicBoolean isSend = new AtomicBoolean(false); - private final HdlSocketListener listener; + private HdlSocketListener listener; private ScheduledExecutorService sendThread; @@ -66,12 +70,37 @@ EventDispatcher.getInstance().register(observeTopic, eventListener); } + private HdlSocketHelper(Long sendAwaitTime, int maxRetry, UdpSocketBoot udpSocketBoot, + InetSocketAddress inetSocketAddress, LinkRequest linkRequest, String observeTopic, HdlSocketListener listener) { + this.sendAwaitTime = sendAwaitTime; + this.maxRetry = maxRetry; + this.udpSocketBoot = udpSocketBoot; + this.inetSocketAddress = inetSocketAddress; + this.linkRequest = linkRequest; + this.listener = listener; + this.sendNumber = new AtomicInteger(0); + eventListener = new EventListener() { + @Override + public void onMessage(Object msg) { + isSend.set(true); + if (listener != null) { + listener.onSucceed(msg); + } + if (sendThread != null) { + sendThread.shutdownNow(); + } + EventDispatcher.getInstance().remove(eventListener); + } + }; + EventDispatcher.getInstance().register(observeTopic, eventListener); + } + public static void send(SocketBoot boot, LinkRequest linkRequest, String observeTopic, HdlSocketListener listener, Long sendAwaitTime, int maxRetry) { if (TextUtils.isEmpty(observeTopic)) { observeTopic = linkRequest.getTopic() + "_reply"; } HdlSocketHelper socketHelper = new HdlSocketHelper(sendAwaitTime, maxRetry, boot, linkRequest, observeTopic, listener); - socketHelper.resend(); + socketHelper.send(); } public static void send(SocketBoot boot, LinkRequest linkRequest, String observeTopic, HdlSocketListener listener) { @@ -87,34 +116,78 @@ send(boot, linkRequest, "", listener, DEF_SEND_TIMEOUT, DEF_SEND_ONE); } - private void resend() { + /** + * Udp鐨勫彂閫佹柟娉� + * + * @param udpSocketBoot Udp褰撳墠瀵规帴 + * @param inetSocketAddress 鍙戦�佺殑鐩爣鍦板潃 + * @param linkRequest 鍙戦�佺殑鏁版嵁 + * @param observeTopic 鍙戦�佺殑涓婚 + * @param listener 鍥炶皟 + */ + public static void send(UdpSocketBoot udpSocketBoot, InetSocketAddress inetSocketAddress, LinkRequest linkRequest, String observeTopic, HdlSocketListener listener) { + if (TextUtils.isEmpty(observeTopic)) { + observeTopic = linkRequest.getTopic() + "_reply"; + } + HdlSocketHelper socketHelper = new HdlSocketHelper(DEF_SEND_TIMEOUT, DEF_SEND_ONE, udpSocketBoot, inetSocketAddress, linkRequest, observeTopic, listener); + socketHelper.send(); + } + + /** + * Udp鍙戦�佺殑鏂规硶 + * + * @param udpSocketBoot 褰撳墠Udp瀵硅薄 + * @param inetSocketAddress 鍙戦�佺殑鐩爣鍦板潃 + * @param linkRequest 鍙戦�佺殑瀵硅薄 + * @param listener 鍥炶皟 + */ + public static void send(UdpSocketBoot udpSocketBoot, InetSocketAddress inetSocketAddress, LinkRequest linkRequest, HdlSocketListener listener) { + String observeTopic = null; + if (TextUtils.isEmpty(observeTopic)) { + observeTopic = linkRequest.getTopic() + "_reply"; + } + HdlSocketHelper socketHelper = new HdlSocketHelper(DEF_SEND_TIMEOUT, DEF_SEND_ONE, udpSocketBoot, inetSocketAddress, linkRequest, observeTopic, listener); + socketHelper.send(); + } + + private void send() { getSendThread().scheduleWithFixedDelay(new Runnable() { @Override public void run() { - if ((sendNumber.get() < maxRetry + 2) || !isSend.get()) { + //鍙戦�佹鏁板皬浜庨噸鍙戞鏁� + if ((sendNumber.get() < maxRetry)) { try { - if (sendNumber.get() < maxRetry + 1) { + //杩樻病鏈夋敹鍒板洖澶嶏紝鍐嶅彂閫� + if (!isSend.get()) { + sendNumber.set(sendNumber.get() + 1); + //濡傛槸tcp if (boot != null) { boot.sendMsg(linkRequest.getSendBytes()); } - sendNumber.set(sendNumber.get() + 1); - } else { - notifyFailure(); + //濡傛灉鏄痷dp + if (null != udpSocketBoot) { + udpSocketBoot.sendMsg(inetSocketAddress, linkRequest.getSendBytes()); + } } } catch (Exception e) { e.printStackTrace(); - sendNumber.set(sendNumber.get() + 1); - } finally { - if (sendNumber.get() > maxRetry + 1 && !isSend.get()) { - notifyFailure(); - } + } + } else { + //瓒呭嚭閲嶅彂娆℃暟骞舵病鏈夋敹鍒板洖澶� + if (!isSend.get()) { + notifyFailure(); } } } }, sendAwaitTime, sendAwaitTime, TimeUnit.MILLISECONDS); } - public ScheduledExecutorService getSendThread() { + /** + * 鑾峰彇鍙戦�佺嚎绋� + * + * @return 杩斿洖鑾峰彇鍒扮殑绾跨▼ + */ + private ScheduledExecutorService getSendThread() { if (sendThread == null) { sendThread = ThreadToolUtils.getInstance().newScheduledThreadPool(1); } @@ -122,14 +195,18 @@ } + /** + * 鍙戦�佸け璐� + */ private void notifyFailure() { + EventDispatcher.getInstance().remove(eventListener); if (sendThread != null) { sendThread.shutdownNow(); + sendThread = null; } if (listener != null) { listener.onFailure(); + listener = null; } - - EventDispatcher.getInstance().remove(eventListener); } } -- Gitblit v1.8.0