From 43e38e768360ac8ced4f31fb4a423f2badda5587 Mon Sep 17 00:00:00 2001 From: JLChen <551775569@qq.com> Date: 星期一, 13 十二月 2021 15:49:05 +0800 Subject: [PATCH] 2021-12-13 1.优化udp发送 --- HDLSDK/hdl-connect/src/main/java/com/hdl/sdk/connect/socket/HdlSocketHelper.java | 149 ++++++++++++++++++++++++++++++++++++++----------- 1 files changed, 116 insertions(+), 33 deletions(-) diff --git a/HDLSDK/hdl-connect/src/main/java/com/hdl/sdk/connect/socket/HdlSocketHelper.java b/HDLSDK/hdl-connect/src/main/java/com/hdl/sdk/connect/socket/HdlSocketHelper.java index 99a703a..7776163 100644 --- a/HDLSDK/hdl-connect/src/main/java/com/hdl/sdk/connect/socket/HdlSocketHelper.java +++ b/HDLSDK/hdl-connect/src/main/java/com/hdl/sdk/connect/socket/HdlSocketHelper.java @@ -4,10 +4,13 @@ import com.hdl.sdk.common.event.EventDispatcher; import com.hdl.sdk.common.event.EventListener; +import com.hdl.sdk.common.utils.LogUtils; import com.hdl.sdk.common.utils.ThreadToolUtils; import com.hdl.sdk.connect.bean.LinkRequest; import com.hdl.sdk.socket.SocketBoot; +import com.hdl.sdk.socket.udp.UdpSocketBoot; +import java.net.InetSocketAddress; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -25,17 +28,28 @@ private final Long sendAwaitTime; private final int maxRetry; - private final SocketBoot boot; + private SocketBoot boot; + private UdpSocketBoot udpSocketBoot; + /** + * 鍙戦�佺殑鐩爣IP + */ + private String ipAddress; + /** + * 鍙戦�佺殑鐩爣鍦板潃 + */ + private int port; private final LinkRequest linkRequest; private final EventListener eventListener; private final AtomicInteger sendNumber; - private final AtomicBoolean isSend = new AtomicBoolean(); + private final AtomicBoolean isSend = new AtomicBoolean(false); - private final HdlSocketListener listener; + private HdlSocketListener listener; private ScheduledExecutorService sendThread; + + private String observeTopic; public interface HdlSocketListener { void onSucceed(Object msg); @@ -43,12 +57,16 @@ void onFailure(); } - private HdlSocketHelper(Long sendAwaitTime, int maxRetry, SocketBoot boot, LinkRequest linkRequest, String observeTopic, HdlSocketListener listener) { + private HdlSocketHelper(Long sendAwaitTime, int maxRetry, UdpSocketBoot udpSocketBoot, + String ipAddress, int port, LinkRequest linkRequest, String observeTopic, HdlSocketListener listener) { this.sendAwaitTime = sendAwaitTime; this.maxRetry = maxRetry; - this.boot = boot; + this.udpSocketBoot = udpSocketBoot; + this.ipAddress = ipAddress; + this.port = port; this.linkRequest = linkRequest; this.listener = listener; + this.observeTopic = observeTopic; this.sendNumber = new AtomicInteger(0); eventListener = new EventListener() { @Override @@ -60,61 +78,121 @@ if (sendThread != null) { sendThread.shutdownNow(); } - EventDispatcher.getInstance().remove(eventListener); + //绉婚櫎鐩戝惉 + removeListener(); } }; - EventDispatcher.getInstance().register(observeTopic, eventListener); + //娉ㄥ唽鐩戝惉 + registerListener(); } - public static void send(SocketBoot boot, LinkRequest linkRequest, String observeTopic, HdlSocketListener listener, Long sendAwaitTime, int maxRetry) { + + /** + * 娉ㄥ唽鐩戝惉 + */ + void registerListener() { + if (!TextUtils.isEmpty(observeTopic)) { + EventDispatcher.getInstance().register(observeTopic, eventListener); +// LogUtils.i("HdlSocketHelper", "register event"); + } + } + + /** + * 绉婚櫎鐩戝惉 + */ + void removeListener() { + if (!TextUtils.isEmpty(observeTopic)) { + EventDispatcher.getInstance().remove(observeTopic, eventListener); +// LogUtils.i("HdlSocketHelper", "remove event"); + } + } + + /** + * Udp鐨勫彂閫佹柟娉� + * + * @param udpSocketBoot Udp褰撳墠瀵规帴 + * @param ipAddress 鍙戦�佺殑鐩爣IP鍦板潃 + * @param port 鐩殑绔彛 + * @param linkRequest 鍙戦�佺殑鏁版嵁 + * @param observeTopic 鍙戦�佺殑涓婚 + * @param retry 閲嶅彂鏁版 + * @param listener 鍥炶皟 + */ + public static void sendUdp(UdpSocketBoot udpSocketBoot, String ipAddress, int port, LinkRequest linkRequest, String observeTopic, int retry, HdlSocketListener listener) { if (TextUtils.isEmpty(observeTopic)) { observeTopic = linkRequest.getTopic() + "_reply"; } - HdlSocketHelper socketHelper = new HdlSocketHelper(sendAwaitTime, maxRetry, boot, linkRequest, observeTopic, listener); - socketHelper.resend(); + HdlSocketHelper socketHelper = new HdlSocketHelper(DEF_SEND_TIMEOUT, retry, udpSocketBoot, ipAddress, port, linkRequest, observeTopic, listener); + socketHelper.send(); } - public static void send(SocketBoot boot, LinkRequest linkRequest, String observeTopic, HdlSocketListener listener) { - send(boot, linkRequest, observeTopic, listener, DEF_SEND_TIMEOUT, DEF_MAX_RETRY); + /** + * Udp鐨勫彂閫佹柟娉� + * + * @param udpSocketBoot Udp褰撳墠瀵规帴 + * @param ipAddress 鍙戦�佺殑鐩爣IP鍦板潃 + * @param port 鐩殑绔彛 + * @param linkRequest 鍙戦�佺殑鏁版嵁 + * @param listener 鍥炶皟 + */ + public static void sendUdp(UdpSocketBoot udpSocketBoot, String ipAddress, int port, LinkRequest linkRequest, HdlSocketListener listener) { + sendUdp(udpSocketBoot, ipAddress, port, linkRequest, "", DEF_MAX_RETRY, listener); } - public static void send(SocketBoot boot, LinkRequest linkRequest, HdlSocketListener listener) { - send(boot, linkRequest, "", listener, DEF_SEND_TIMEOUT, DEF_MAX_RETRY); + /** + * Udp鐨勫彂閫佹柟娉� + * + * @param udpSocketBoot Udp褰撳墠瀵规帴 + * @param ipAddress 鍙戦�佺殑鐩爣IP鍦板潃 + * @param port 鐩殑绔彛 + * @param linkRequest 鍙戦�佺殑鏁版嵁 + */ + public static void sendUdpOne(UdpSocketBoot udpSocketBoot, String ipAddress, int port, LinkRequest linkRequest) { + HdlSocketHelper socketHelper = new HdlSocketHelper(DEF_SEND_TIMEOUT, DEF_SEND_ONE, udpSocketBoot, ipAddress, port, linkRequest, null, null); + socketHelper.send(); } - public static void sendOne(SocketBoot boot, LinkRequest linkRequest, HdlSocketListener listener) { - send(boot, linkRequest, "", listener, DEF_SEND_TIMEOUT, DEF_SEND_ONE); - } - - private void resend() { + private void send() { getSendThread().scheduleWithFixedDelay(new Runnable() { @Override public void run() { - if ((sendNumber.get() < maxRetry + 2) || !isSend.get()) { + //鍙戦�佹鏁板皬浜庨噸鍙戞鏁� + if ((sendNumber.get() < maxRetry)) { try { - if (sendNumber.get() < maxRetry + 1) { + //杩樻病鏈夋敹鍒板洖澶嶏紝鍐嶅彂閫� + if (!isSend.get()) { + sendNumber.set(sendNumber.get() + 1); + //濡傛槸tcp if (boot != null) { boot.sendMsg(linkRequest.getSendBytes()); } - sendNumber.set(sendNumber.get() + 1); - } else { - notifyFailure(); + //濡傛灉鏄痷dp + if (null != udpSocketBoot) { + udpSocketBoot.sendMsg(ipAddress, port, linkRequest.getSendBytes()); + } } } catch (Exception e) { e.printStackTrace(); - sendNumber.set(sendNumber.get() + 1); - } finally { - if (sendNumber.get() > maxRetry + 1 && !isSend.get()) { - notifyFailure(); - } + } + } else { + //瓒呭嚭閲嶅彂娆℃暟骞舵病鏈夋敹鍒板洖澶� + if (!isSend.get()) { + notifyFailure(); } } } - }, sendAwaitTime, sendAwaitTime, TimeUnit.MILLISECONDS); + }, 0, sendAwaitTime, TimeUnit.MILLISECONDS); + //initialdelay - 棣栨鎵ц鐨勫欢杩熸椂闂� 0 + //delay - 涓�娆℃墽琛岀粓姝㈠拰涓嬩竴娆℃墽琛屽紑濮嬩箣闂寸殑寤惰繜 } - public ScheduledExecutorService getSendThread() { + /** + * 鑾峰彇鍙戦�佺嚎绋� + * + * @return 杩斿洖鑾峰彇鍒扮殑绾跨▼ + */ + private ScheduledExecutorService getSendThread() { if (sendThread == null) { sendThread = ThreadToolUtils.getInstance().newScheduledThreadPool(1); } @@ -122,14 +200,19 @@ } + /** + * 鍙戦�佸け璐� + */ private void notifyFailure() { + //绉婚櫎鐩戝惉 + removeListener(); if (sendThread != null) { sendThread.shutdownNow(); + sendThread = null; } if (listener != null) { listener.onFailure(); + listener = null; } - - EventDispatcher.getInstance().remove(eventListener); } } -- Gitblit v1.8.0