From d8bf4f4d66715f002d024cae92862c1d83daa425 Mon Sep 17 00:00:00 2001
From: hxb <hxb@hdlchina.com.cn>
Date: 星期日, 12 十二月 2021 22:06:35 +0800
Subject: [PATCH] 更改了udp的机制

---
 HDLSDK/hdl-connect/src/main/java/com/hdl/sdk/connect/socket/HdlSocketHelper.java |  113 +++++++++++++++++++++++++++++++++++++++++++++++---------
 1 files changed, 95 insertions(+), 18 deletions(-)

diff --git a/HDLSDK/hdl-connect/src/main/java/com/hdl/sdk/connect/socket/HdlSocketHelper.java b/HDLSDK/hdl-connect/src/main/java/com/hdl/sdk/connect/socket/HdlSocketHelper.java
index 99a703a..ea53e38 100644
--- a/HDLSDK/hdl-connect/src/main/java/com/hdl/sdk/connect/socket/HdlSocketHelper.java
+++ b/HDLSDK/hdl-connect/src/main/java/com/hdl/sdk/connect/socket/HdlSocketHelper.java
@@ -7,7 +7,9 @@
 import com.hdl.sdk.common.utils.ThreadToolUtils;
 import com.hdl.sdk.connect.bean.LinkRequest;
 import com.hdl.sdk.socket.SocketBoot;
+import com.hdl.sdk.socket.udp.UdpSocketBoot;
 
+import java.net.InetSocketAddress;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -25,15 +27,17 @@
     private final Long sendAwaitTime;
     private final int maxRetry;
 
-    private final SocketBoot boot;
+    private SocketBoot boot;
+    private UdpSocketBoot udpSocketBoot;
+    private InetSocketAddress inetSocketAddress;
     private final LinkRequest linkRequest;
     private final EventListener eventListener;
 
     private final AtomicInteger sendNumber;
 
-    private final AtomicBoolean isSend = new AtomicBoolean();
+    private final AtomicBoolean isSend = new AtomicBoolean(false);
 
-    private final HdlSocketListener listener;
+    private HdlSocketListener listener;
 
     private ScheduledExecutorService sendThread;
 
@@ -66,12 +70,37 @@
         EventDispatcher.getInstance().register(observeTopic, eventListener);
     }
 
+    private HdlSocketHelper(Long sendAwaitTime, int maxRetry, UdpSocketBoot udpSocketBoot,
+                            InetSocketAddress inetSocketAddress, LinkRequest linkRequest, String observeTopic, HdlSocketListener listener) {
+        this.sendAwaitTime = sendAwaitTime;
+        this.maxRetry = maxRetry;
+        this.udpSocketBoot = udpSocketBoot;
+        this.inetSocketAddress = inetSocketAddress;
+        this.linkRequest = linkRequest;
+        this.listener = listener;
+        this.sendNumber = new AtomicInteger(0);
+        eventListener = new EventListener() {
+            @Override
+            public void onMessage(Object msg) {
+                isSend.set(true);
+                if (listener != null) {
+                    listener.onSucceed(msg);
+                }
+                if (sendThread != null) {
+                    sendThread.shutdownNow();
+                }
+                EventDispatcher.getInstance().remove(eventListener);
+            }
+        };
+        EventDispatcher.getInstance().register(observeTopic, eventListener);
+    }
+
     public static void send(SocketBoot boot, LinkRequest linkRequest, String observeTopic, HdlSocketListener listener, Long sendAwaitTime, int maxRetry) {
         if (TextUtils.isEmpty(observeTopic)) {
             observeTopic = linkRequest.getTopic() + "_reply";
         }
         HdlSocketHelper socketHelper = new HdlSocketHelper(sendAwaitTime, maxRetry, boot, linkRequest, observeTopic, listener);
-        socketHelper.resend();
+        socketHelper.send();
     }
 
     public static void send(SocketBoot boot, LinkRequest linkRequest, String observeTopic, HdlSocketListener listener) {
@@ -87,34 +116,78 @@
         send(boot, linkRequest, "", listener, DEF_SEND_TIMEOUT, DEF_SEND_ONE);
     }
 
-    private void resend() {
+    /**
+     * Udp鐨勫彂閫佹柟娉�
+     *
+     * @param udpSocketBoot     Udp褰撳墠瀵规帴
+     * @param inetSocketAddress 鍙戦�佺殑鐩爣鍦板潃
+     * @param linkRequest       鍙戦�佺殑鏁版嵁
+     * @param observeTopic      鍙戦�佺殑涓婚
+     * @param listener          鍥炶皟
+     */
+    public static void send(UdpSocketBoot udpSocketBoot, InetSocketAddress inetSocketAddress, LinkRequest linkRequest, String observeTopic, HdlSocketListener listener) {
+        if (TextUtils.isEmpty(observeTopic)) {
+            observeTopic = linkRequest.getTopic() + "_reply";
+        }
+        HdlSocketHelper socketHelper = new HdlSocketHelper(DEF_SEND_TIMEOUT, DEF_SEND_ONE, udpSocketBoot, inetSocketAddress, linkRequest, observeTopic, listener);
+        socketHelper.send();
+    }
+
+    /**
+     * Udp鍙戦�佺殑鏂规硶
+     *
+     * @param udpSocketBoot     褰撳墠Udp瀵硅薄
+     * @param inetSocketAddress 鍙戦�佺殑鐩爣鍦板潃
+     * @param linkRequest       鍙戦�佺殑瀵硅薄
+     * @param listener          鍥炶皟
+     */
+    public static void send(UdpSocketBoot udpSocketBoot, InetSocketAddress inetSocketAddress, LinkRequest linkRequest, HdlSocketListener listener) {
+        String observeTopic = null;
+        if (TextUtils.isEmpty(observeTopic)) {
+            observeTopic = linkRequest.getTopic() + "_reply";
+        }
+        HdlSocketHelper socketHelper = new HdlSocketHelper(DEF_SEND_TIMEOUT, DEF_SEND_ONE, udpSocketBoot, inetSocketAddress, linkRequest, observeTopic, listener);
+        socketHelper.send();
+    }
+
+    private void send() {
         getSendThread().scheduleWithFixedDelay(new Runnable() {
             @Override
             public void run() {
-                if ((sendNumber.get() < maxRetry + 2) || !isSend.get()) {
+                //鍙戦�佹鏁板皬浜庨噸鍙戞鏁�
+                if ((sendNumber.get() < maxRetry)) {
                     try {
-                        if (sendNumber.get() < maxRetry + 1) {
+                        //杩樻病鏈夋敹鍒板洖澶嶏紝鍐嶅彂閫�
+                        if (!isSend.get()) {
+                            sendNumber.set(sendNumber.get() + 1);
+                            //濡傛槸tcp
                             if (boot != null) {
                                 boot.sendMsg(linkRequest.getSendBytes());
                             }
-                            sendNumber.set(sendNumber.get() + 1);
-                        } else {
-                            notifyFailure();
+                            //濡傛灉鏄痷dp
+                            if (null != udpSocketBoot) {
+                                udpSocketBoot.sendMsg(inetSocketAddress, linkRequest.getSendBytes());
+                            }
                         }
                     } catch (Exception e) {
                         e.printStackTrace();
-                        sendNumber.set(sendNumber.get() + 1);
-                    } finally {
-                        if (sendNumber.get() > maxRetry + 1 && !isSend.get()) {
-                            notifyFailure();
-                        }
+                    }
+                } else {
+                    //瓒呭嚭閲嶅彂娆℃暟骞舵病鏈夋敹鍒板洖澶�
+                    if (!isSend.get()) {
+                        notifyFailure();
                     }
                 }
             }
         }, sendAwaitTime, sendAwaitTime, TimeUnit.MILLISECONDS);
     }
 
-    public ScheduledExecutorService getSendThread() {
+    /**
+     * 鑾峰彇鍙戦�佺嚎绋�
+     *
+     * @return 杩斿洖鑾峰彇鍒扮殑绾跨▼
+     */
+    private ScheduledExecutorService getSendThread() {
         if (sendThread == null) {
             sendThread = ThreadToolUtils.getInstance().newScheduledThreadPool(1);
         }
@@ -122,14 +195,18 @@
     }
 
 
+    /**
+     * 鍙戦�佸け璐�
+     */
     private void notifyFailure() {
+        EventDispatcher.getInstance().remove(eventListener);
         if (sendThread != null) {
             sendThread.shutdownNow();
+            sendThread = null;
         }
         if (listener != null) {
             listener.onFailure();
+            listener = null;
         }
-
-        EventDispatcher.getInstance().remove(eventListener);
     }
 }

--
Gitblit v1.8.0