JLChen
2021-12-13 e7b8a808c2274e9c4329092bb752c7ea5cb035fc
HDLSDK/hdl-connect/src/main/java/com/hdl/sdk/connect/socket/HdlSocketHelper.java
@@ -7,7 +7,9 @@
import com.hdl.sdk.common.utils.ThreadToolUtils;
import com.hdl.sdk.connect.bean.LinkRequest;
import com.hdl.sdk.socket.SocketBoot;
import com.hdl.sdk.socket.udp.UdpSocketBoot;
import java.net.InetSocketAddress;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -25,15 +27,24 @@
    private final Long sendAwaitTime;
    private final int maxRetry;
    private final SocketBoot boot;
    private SocketBoot boot;
    private UdpSocketBoot udpSocketBoot;
    /**
     * 发送的目标IP
     */
    private String ipAddress;
    /**
     * 发送的目标地址
     */
    private int port;
    private final LinkRequest linkRequest;
    private final EventListener eventListener;
    private final AtomicInteger sendNumber;
    private final AtomicBoolean isSend = new AtomicBoolean();
    private final AtomicBoolean isSend = new AtomicBoolean(false);
    private final HdlSocketListener listener;
    private HdlSocketListener listener;
    private ScheduledExecutorService sendThread;
@@ -66,12 +77,38 @@
        EventDispatcher.getInstance().register(observeTopic, eventListener);
    }
    private HdlSocketHelper(Long sendAwaitTime, int maxRetry, UdpSocketBoot udpSocketBoot,
                            String ipAddress,int port , LinkRequest linkRequest, String observeTopic, HdlSocketListener listener) {
        this.sendAwaitTime = sendAwaitTime;
        this.maxRetry = maxRetry;
        this.udpSocketBoot = udpSocketBoot;
        this.ipAddress = ipAddress;
        this.port = port;
        this.linkRequest = linkRequest;
        this.listener = listener;
        this.sendNumber = new AtomicInteger(0);
        eventListener = new EventListener() {
            @Override
            public void onMessage(Object msg) {
                isSend.set(true);
                if (listener != null) {
                    listener.onSucceed(msg);
                }
                if (sendThread != null) {
                    sendThread.shutdownNow();
                }
                EventDispatcher.getInstance().remove(eventListener);
            }
        };
        EventDispatcher.getInstance().register(observeTopic, eventListener);
    }
    public static void send(SocketBoot boot, LinkRequest linkRequest, String observeTopic, HdlSocketListener listener, Long sendAwaitTime, int maxRetry) {
        if (TextUtils.isEmpty(observeTopic)) {
            observeTopic = linkRequest.getTopic() + "_reply";
        }
        HdlSocketHelper socketHelper = new HdlSocketHelper(sendAwaitTime, maxRetry, boot, linkRequest, observeTopic, listener);
        socketHelper.resend();
        socketHelper.send();
    }
    public static void send(SocketBoot boot, LinkRequest linkRequest, String observeTopic, HdlSocketListener listener) {
@@ -87,34 +124,90 @@
        send(boot, linkRequest, "", listener, DEF_SEND_TIMEOUT, DEF_SEND_ONE);
    }
    private void resend() {
    /**
     * Udp的发送方法
     *
     * @param udpSocketBoot     Udp当前对接
     * @param ipAddress 发送的目标IP地址
     * @param port 目的端口
     * @param linkRequest       发送的数据
     * @param observeTopic      发送的主题
     * @param retry 重发数次
     * @param listener          回调
     */
    public static void sendUdp(UdpSocketBoot udpSocketBoot, String  ipAddress ,int port, LinkRequest linkRequest, String observeTopic, int retry,HdlSocketListener listener) {
        if (TextUtils.isEmpty(observeTopic)) {
            observeTopic = linkRequest.getTopic() + "_reply";
        }
        HdlSocketHelper socketHelper = new HdlSocketHelper(DEF_SEND_TIMEOUT, retry, udpSocketBoot, ipAddress,port, linkRequest, observeTopic, listener);
        socketHelper.send();
    }
    /**
     * Udp的发送方法
     *
     * @param udpSocketBoot     Udp当前对接
     * @param ipAddress 发送的目标IP地址
     * @param port 目的端口
     * @param linkRequest       发送的数据
     * @param listener          回调
     */
    public static void sendUdp(UdpSocketBoot udpSocketBoot, String  ipAddress ,int port, LinkRequest linkRequest, HdlSocketListener listener) {
        sendUdp(udpSocketBoot,ipAddress,port,linkRequest,null,DEF_MAX_RETRY,listener);
    }
    /**
     * Udp的发送方法
     *
     * @param udpSocketBoot     Udp当前对接
     * @param ipAddress 发送的目标IP地址
     * @param port 目的端口
     * @param linkRequest       发送的数据
     */
    public static void sendUdpOne(UdpSocketBoot udpSocketBoot, String  ipAddress ,int port, LinkRequest linkRequest) {
        sendUdp(udpSocketBoot, ipAddress, port, linkRequest, null, DEF_SEND_ONE, null);
    }
    private void send() {
        getSendThread().scheduleWithFixedDelay(new Runnable() {
            @Override
            public void run() {
                if ((sendNumber.get() < maxRetry + 2) || !isSend.get()) {
                //发送次数小于重发次数
                if ((sendNumber.get() < maxRetry)) {
                    try {
                        if (sendNumber.get() < maxRetry + 1) {
                        //还没有收到回复,再发送
                        if (!isSend.get()) {
                            sendNumber.set(sendNumber.get() + 1);
                            //如是tcp
                            if (boot != null) {
                                boot.sendMsg(linkRequest.getSendBytes());
                            }
                            sendNumber.set(sendNumber.get() + 1);
                        } else {
                            notifyFailure();
                            //如果是udp
                            if (null != udpSocketBoot) {
                                udpSocketBoot.sendMsg(ipAddress,port, linkRequest.getSendBytes());
                            }
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                        sendNumber.set(sendNumber.get() + 1);
                    } finally {
                        if (sendNumber.get() > maxRetry + 1 && !isSend.get()) {
                            notifyFailure();
                        }
                    }
                } else {
                    //超出重发次数并没有收到回复
                    if (!isSend.get()) {
                        notifyFailure();
                    }
                }
            }
        }, sendAwaitTime, sendAwaitTime, TimeUnit.MILLISECONDS);
        }, 0, sendAwaitTime, TimeUnit.MILLISECONDS);
        //initialdelay - 首次执行的延迟时间 0
        //delay - 一次执行终止和下一次执行开始之间的延迟
    }
    public ScheduledExecutorService getSendThread() {
    /**
     * 获取发送线程
     *
     * @return 返回获取到的线程
     */
    private ScheduledExecutorService getSendThread() {
        if (sendThread == null) {
            sendThread = ThreadToolUtils.getInstance().newScheduledThreadPool(1);
        }
@@ -122,14 +215,18 @@
    }
    /**
     * 发送失败
     */
    private void notifyFailure() {
        EventDispatcher.getInstance().remove(eventListener);
        if (sendThread != null) {
            sendThread.shutdownNow();
            sendThread = null;
        }
        if (listener != null) {
            listener.onFailure();
            listener = null;
        }
        EventDispatcher.getInstance().remove(eventListener);
    }
}