JLChen
2021-12-13 43e38e768360ac8ced4f31fb4a423f2badda5587
HDLSDK/hdl-connect/src/main/java/com/hdl/sdk/connect/socket/HdlSocketHelper.java
@@ -4,10 +4,13 @@
import com.hdl.sdk.common.event.EventDispatcher;
import com.hdl.sdk.common.event.EventListener;
import com.hdl.sdk.common.utils.LogUtils;
import com.hdl.sdk.common.utils.ThreadToolUtils;
import com.hdl.sdk.connect.bean.LinkRequest;
import com.hdl.sdk.socket.SocketBoot;
import com.hdl.sdk.socket.udp.UdpSocketBoot;
import java.net.InetSocketAddress;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -25,17 +28,28 @@
    private final Long sendAwaitTime;
    private final int maxRetry;
    private final SocketBoot boot;
    private SocketBoot boot;
    private UdpSocketBoot udpSocketBoot;
    /**
     * 发送的目标IP
     */
    private String ipAddress;
    /**
     * 发送的目标地址
     */
    private int port;
    private final LinkRequest linkRequest;
    private final EventListener eventListener;
    private final AtomicInteger sendNumber;
    private final AtomicBoolean isSend = new AtomicBoolean();
    private final AtomicBoolean isSend = new AtomicBoolean(false);
    private final HdlSocketListener listener;
    private HdlSocketListener listener;
    private ScheduledExecutorService sendThread;
    private String observeTopic;
    public interface HdlSocketListener {
        void onSucceed(Object msg);
@@ -43,12 +57,16 @@
        void onFailure();
    }
    private HdlSocketHelper(Long sendAwaitTime, int maxRetry, SocketBoot boot, LinkRequest linkRequest, String observeTopic, HdlSocketListener listener) {
    private HdlSocketHelper(Long sendAwaitTime, int maxRetry, UdpSocketBoot udpSocketBoot,
                            String ipAddress, int port, LinkRequest linkRequest, String observeTopic, HdlSocketListener listener) {
        this.sendAwaitTime = sendAwaitTime;
        this.maxRetry = maxRetry;
        this.boot = boot;
        this.udpSocketBoot = udpSocketBoot;
        this.ipAddress = ipAddress;
        this.port = port;
        this.linkRequest = linkRequest;
        this.listener = listener;
        this.observeTopic = observeTopic;
        this.sendNumber = new AtomicInteger(0);
        eventListener = new EventListener() {
            @Override
@@ -60,61 +78,121 @@
                if (sendThread != null) {
                    sendThread.shutdownNow();
                }
                EventDispatcher.getInstance().remove(eventListener);
                //移除监听
                removeListener();
            }
        };
        EventDispatcher.getInstance().register(observeTopic, eventListener);
        //注册监听
        registerListener();
    }
    public static void send(SocketBoot boot, LinkRequest linkRequest, String observeTopic, HdlSocketListener listener, Long sendAwaitTime, int maxRetry) {
    /**
     * 注册监听
     */
    void registerListener() {
        if (!TextUtils.isEmpty(observeTopic)) {
            EventDispatcher.getInstance().register(observeTopic, eventListener);
//            LogUtils.i("HdlSocketHelper", "register event");
        }
    }
    /**
     * 移除监听
     */
    void removeListener() {
        if (!TextUtils.isEmpty(observeTopic)) {
            EventDispatcher.getInstance().remove(observeTopic, eventListener);
//            LogUtils.i("HdlSocketHelper", "remove event");
        }
    }
    /**
     * Udp的发送方法
     *
     * @param udpSocketBoot Udp当前对接
     * @param ipAddress     发送的目标IP地址
     * @param port          目的端口
     * @param linkRequest   发送的数据
     * @param observeTopic  发送的主题
     * @param retry         重发数次
     * @param listener      回调
     */
    public static void sendUdp(UdpSocketBoot udpSocketBoot, String ipAddress, int port, LinkRequest linkRequest, String observeTopic, int retry, HdlSocketListener listener) {
        if (TextUtils.isEmpty(observeTopic)) {
            observeTopic = linkRequest.getTopic() + "_reply";
        }
        HdlSocketHelper socketHelper = new HdlSocketHelper(sendAwaitTime, maxRetry, boot, linkRequest, observeTopic, listener);
        socketHelper.resend();
        HdlSocketHelper socketHelper = new HdlSocketHelper(DEF_SEND_TIMEOUT, retry, udpSocketBoot, ipAddress, port, linkRequest, observeTopic, listener);
        socketHelper.send();
    }
    public static void send(SocketBoot boot, LinkRequest linkRequest, String observeTopic, HdlSocketListener listener) {
        send(boot, linkRequest, observeTopic, listener, DEF_SEND_TIMEOUT, DEF_MAX_RETRY);
    /**
     * Udp的发送方法
     *
     * @param udpSocketBoot Udp当前对接
     * @param ipAddress     发送的目标IP地址
     * @param port          目的端口
     * @param linkRequest   发送的数据
     * @param listener      回调
     */
    public static void sendUdp(UdpSocketBoot udpSocketBoot, String ipAddress, int port, LinkRequest linkRequest, HdlSocketListener listener) {
        sendUdp(udpSocketBoot, ipAddress, port, linkRequest, "", DEF_MAX_RETRY, listener);
    }
    public static void send(SocketBoot boot, LinkRequest linkRequest, HdlSocketListener listener) {
        send(boot, linkRequest, "", listener, DEF_SEND_TIMEOUT, DEF_MAX_RETRY);
    /**
     * Udp的发送方法
     *
     * @param udpSocketBoot Udp当前对接
     * @param ipAddress     发送的目标IP地址
     * @param port          目的端口
     * @param linkRequest   发送的数据
     */
    public static void sendUdpOne(UdpSocketBoot udpSocketBoot, String ipAddress, int port, LinkRequest linkRequest) {
        HdlSocketHelper socketHelper = new HdlSocketHelper(DEF_SEND_TIMEOUT, DEF_SEND_ONE, udpSocketBoot, ipAddress, port, linkRequest, null, null);
        socketHelper.send();
    }
    public static void sendOne(SocketBoot boot, LinkRequest linkRequest, HdlSocketListener listener) {
        send(boot, linkRequest, "", listener, DEF_SEND_TIMEOUT, DEF_SEND_ONE);
    }
    private void resend() {
    private void send() {
        getSendThread().scheduleWithFixedDelay(new Runnable() {
            @Override
            public void run() {
                if ((sendNumber.get() < maxRetry + 2) || !isSend.get()) {
                //发送次数小于重发次数
                if ((sendNumber.get() < maxRetry)) {
                    try {
                        if (sendNumber.get() < maxRetry + 1) {
                        //还没有收到回复,再发送
                        if (!isSend.get()) {
                            sendNumber.set(sendNumber.get() + 1);
                            //如是tcp
                            if (boot != null) {
                                boot.sendMsg(linkRequest.getSendBytes());
                            }
                            sendNumber.set(sendNumber.get() + 1);
                        } else {
                            notifyFailure();
                            //如果是udp
                            if (null != udpSocketBoot) {
                                udpSocketBoot.sendMsg(ipAddress, port, linkRequest.getSendBytes());
                            }
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                        sendNumber.set(sendNumber.get() + 1);
                    } finally {
                        if (sendNumber.get() > maxRetry + 1 && !isSend.get()) {
                            notifyFailure();
                        }
                    }
                } else {
                    //超出重发次数并没有收到回复
                    if (!isSend.get()) {
                        notifyFailure();
                    }
                }
            }
        }, sendAwaitTime, sendAwaitTime, TimeUnit.MILLISECONDS);
        }, 0, sendAwaitTime, TimeUnit.MILLISECONDS);
        //initialdelay - 首次执行的延迟时间 0
        //delay - 一次执行终止和下一次执行开始之间的延迟
    }
    public ScheduledExecutorService getSendThread() {
    /**
     * 获取发送线程
     *
     * @return 返回获取到的线程
     */
    private ScheduledExecutorService getSendThread() {
        if (sendThread == null) {
            sendThread = ThreadToolUtils.getInstance().newScheduledThreadPool(1);
        }
@@ -122,12 +200,19 @@
    }
    /**
     * 发送失败
     */
    private void notifyFailure() {
        //移除监听
        removeListener();
        if (sendThread != null) {
            sendThread.shutdownNow();
            sendThread = null;
        }
        if (listener != null) {
            listener.onFailure();
            listener = null;
        }
    }
}