package com.hdl.sdk.connect.socket; import android.text.TextUtils; import com.hdl.sdk.common.event.EventDispatcher; import com.hdl.sdk.common.event.EventListener; import com.hdl.sdk.common.utils.LogUtils; import com.hdl.sdk.common.utils.ThreadToolUtils; import com.hdl.sdk.connect.bean.LinkRequest; import com.hdl.sdk.socket.SocketBoot; import com.hdl.sdk.socket.udp.UdpSocketBoot; import java.net.InetSocketAddress; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; /** * Created by Tong on 2021/11/11. */ public class HdlSocketHelper { private static final Long DEF_SEND_TIMEOUT = 1000L; private static final int DEF_MAX_RETRY = 4; private static final int DEF_SEND_ONE = 1; private final Long sendAwaitTime; private final int maxRetry; private SocketBoot boot; private UdpSocketBoot udpSocketBoot; /** * 发送的目标IP */ private String ipAddress; /** * 发送的目标地址 */ private int port; private final LinkRequest linkRequest; private final EventListener eventListener; private final AtomicInteger sendNumber; private final AtomicBoolean isSend = new AtomicBoolean(false); private HdlSocketListener listener; private ScheduledExecutorService sendThread; private String observeTopic; public interface HdlSocketListener { void onSucceed(Object msg); void onFailure(); } private HdlSocketHelper(Long sendAwaitTime, int maxRetry, UdpSocketBoot udpSocketBoot, String ipAddress, int port, LinkRequest linkRequest, String observeTopic, HdlSocketListener listener) { this.sendAwaitTime = sendAwaitTime; this.maxRetry = maxRetry; this.udpSocketBoot = udpSocketBoot; this.ipAddress = ipAddress; this.port = port; this.linkRequest = linkRequest; this.listener = listener; this.observeTopic = observeTopic; this.sendNumber = new AtomicInteger(0); eventListener = new EventListener() { @Override public void onMessage(Object msg) { isSend.set(true); if (listener != null) { listener.onSucceed(msg); } if (sendThread != null) { sendThread.shutdownNow(); } //移除监听 removeListener(); } }; //注册监听 registerListener(); } /** * 注册监听 */ void registerListener() { if (!TextUtils.isEmpty(observeTopic)) { EventDispatcher.getInstance().register(observeTopic, eventListener); // LogUtils.i("HdlSocketHelper", "register event"); } } /** * 移除监听 */ void removeListener() { if (!TextUtils.isEmpty(observeTopic)) { EventDispatcher.getInstance().remove(observeTopic, eventListener); // LogUtils.i("HdlSocketHelper", "remove event"); } } /** * Udp的发送方法 * * @param udpSocketBoot Udp当前对接 * @param ipAddress 发送的目标IP地址 * @param port 目的端口 * @param linkRequest 发送的数据 * @param observeTopic 发送的主题 * @param retry 重发数次 * @param listener 回调 */ public static void sendUdp(UdpSocketBoot udpSocketBoot, String ipAddress, int port, LinkRequest linkRequest, String observeTopic, int retry, HdlSocketListener listener) { if (TextUtils.isEmpty(observeTopic)) { observeTopic = linkRequest.getTopic() + "_reply"; } HdlSocketHelper socketHelper = new HdlSocketHelper(DEF_SEND_TIMEOUT, retry, udpSocketBoot, ipAddress, port, linkRequest, observeTopic, listener); socketHelper.send(); } /** * Udp的发送方法 * * @param udpSocketBoot Udp当前对接 * @param ipAddress 发送的目标IP地址 * @param port 目的端口 * @param linkRequest 发送的数据 * @param listener 回调 */ public static void sendUdp(UdpSocketBoot udpSocketBoot, String ipAddress, int port, LinkRequest linkRequest, HdlSocketListener listener) { sendUdp(udpSocketBoot, ipAddress, port, linkRequest, "", DEF_MAX_RETRY, listener); } /** * Udp的发送方法 * * @param udpSocketBoot Udp当前对接 * @param ipAddress 发送的目标IP地址 * @param port 目的端口 * @param linkRequest 发送的数据 */ public static void sendUdpOne(UdpSocketBoot udpSocketBoot, String ipAddress, int port, LinkRequest linkRequest) { HdlSocketHelper socketHelper = new HdlSocketHelper(DEF_SEND_TIMEOUT, DEF_SEND_ONE, udpSocketBoot, ipAddress, port, linkRequest, null, null); socketHelper.send(); } private void send() { getSendThread().scheduleWithFixedDelay(new Runnable() { @Override public void run() { //发送次数小于重发次数 if ((sendNumber.get() < maxRetry)) { try { //还没有收到回复,再发送 if (!isSend.get()) { sendNumber.set(sendNumber.get() + 1); //如是tcp if (boot != null) { boot.sendMsg(linkRequest.getSendBytes()); } //如果是udp if (null != udpSocketBoot) { udpSocketBoot.sendMsg(ipAddress, port, linkRequest.getSendBytes()); } } } catch (Exception e) { LogUtils.e("发送数据失败:" + e.getMessage()); } } else { //超出重发次数并没有收到回复 if (!isSend.get()) { notifyFailure(); } } } }, 0, sendAwaitTime, TimeUnit.MILLISECONDS); //initialdelay - 首次执行的延迟时间 0 //delay - 一次执行终止和下一次执行开始之间的延迟 } /** * 获取发送线程 * * @return 返回获取到的线程 */ private ScheduledExecutorService getSendThread() { if (sendThread == null) { sendThread = ThreadToolUtils.getInstance().newScheduledThreadPool(1); } return sendThread; } /** * 发送失败 */ private void notifyFailure() { //移除监听 removeListener(); try { if (sendThread != null) { sendThread.shutdownNow(); sendThread = null; } if (listener != null) { listener.onFailure(); listener = null; } } catch (Exception e) { } } }