package com.hdl.sdk.link.core.connect; import com.google.gson.JsonObject; import com.google.gson.JsonParser; import com.google.gson.reflect.TypeToken; import com.hdl.sdk.link.bean.LinkRoomBean; import com.hdl.sdk.link.common.config.TopicConstant; import com.hdl.sdk.link.common.event.EventDispatcher; import com.hdl.sdk.link.common.event.EventListener; import com.hdl.sdk.link.common.utils.IdUtils; import com.hdl.sdk.link.common.utils.LogUtils; import com.hdl.sdk.link.common.utils.TextUtils; import com.hdl.sdk.link.common.utils.ThreadToolUtils; import com.hdl.sdk.link.core.bean.LinkRequest; import com.hdl.sdk.link.core.bean.LinkResponse; import com.hdl.sdk.link.core.bean.gateway.GatewayBean; import com.hdl.sdk.link.core.bean.response.BaseLocalResponse; import com.hdl.sdk.link.core.utils.EncryptUtil; import com.hdl.sdk.link.core.utils.LinkResponseUtils; import com.hdl.sdk.link.gateway.HDLLinkLocalGateway; import java.lang.reflect.Type; import java.util.List; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; /** * Created by Tong on 2021/11/11. */ public class HDLConnectHelper { private static final Long DEF_SEND_TIMEOUT = 8000L; private static final int DEF_MAX_RETRY = 1;//最大重发数 private static final int DEF_SEND_ONE = 1; private static final int TCP_PORT = 8586; private static final int UDP_PORT = 8585; private final Long sendAwaitTime; private final int maxRetry; /** * 是否tcp发送类型 */ private boolean isTcp; /** * 发送的目标IP */ private String ipAddress; /** * 发送的目标地址 */ private int port; private final LinkRequest linkRequest; private final EventListener eventListener; private final AtomicInteger sendNumber = new AtomicInteger(0); private final AtomicBoolean isSend = new AtomicBoolean(false); private HdlSocketListener listener; private ScheduledExecutorService sendThread; private String observeTopic; public interface HdlSocketListener { void onSucceed(Object msg); void onFailure(); } /** * 发送UDP或者TCP数据 * * @param sendAwaitTime 每次发送等待时间 * @param maxRetry 重试次数 * @param ipAddress 发送目标IP * @param port 发送目标端口 * @param linkRequest 发送对象 * @param observeTopic 接收主题 * @param listener 回调 * @param isTcp 是否TCP */ public HDLConnectHelper(Long sendAwaitTime, int maxRetry, String ipAddress, int port, LinkRequest linkRequest, String observeTopic, HdlSocketListener listener, boolean isTcp) { this.sendAwaitTime = sendAwaitTime; this.maxRetry = maxRetry; this.ipAddress = ipAddress; this.port = port; this.linkRequest = linkRequest; this.observeTopic = observeTopic; this.listener = listener; this.isTcp = isTcp; eventListener = new EventListener() { @Override public void onMessage(Object msg) { try { //移除监听 removeListener(); isSend.set(true); if (sendThread != null) { sendThread.shutdownNow(); } if (listener != null && msg instanceof LinkResponse) { LinkResponse linkResponse = (LinkResponse) msg; JsonObject jsonObject = new JsonParser().parse(linkResponse.getData()).getAsJsonObject(); String code = null; if (jsonObject.get("code") != null) { code = jsonObject.get("code").getAsString(); } /** * 可能返回code属性可能没有 没有的话直接成功 有的话只有200才会成功 */ if (code == null || code.equals("200") || code.equals("0")) { listener.onSucceed(msg); return; } } } catch (Exception e) { } //上面没有正常执行,回调失败 notifyFailure(); } }; //注册监听 registerListener(); } /** * 按照指定次数发,回调 * * @param maxRetry 重试次数 * @param ipAddress 发送目标IP * @param port 发送目标端口 * @param linkRequest 发送对象 * @param observeTopic 接收主题 * @param listener 回调 * @param isTcp 是否TCP */ public HDLConnectHelper(int maxRetry, String ipAddress, int port, LinkRequest linkRequest, String observeTopic, HdlSocketListener listener, boolean isTcp) { this(DEF_SEND_TIMEOUT, maxRetry, ipAddress, port, linkRequest, observeTopic, listener, isTcp); } /** * 按照指定次数发,回调 * * @param maxRetry 重试次数 * @param ipAddress 发送目标IP * @param linkRequest 发送对象 * @param observeTopic 接收主题 * @param listener 回调 * @param isTcp 是否TCP */ public HDLConnectHelper(int maxRetry, String ipAddress, LinkRequest linkRequest, String observeTopic, HdlSocketListener listener, boolean isTcp) { this(maxRetry, ipAddress, isTcp ? TCP_PORT : UDP_PORT, linkRequest, observeTopic, listener, isTcp); } /** * 按照指定次数发,不回调 * * @param maxRetry 重试次数 * @param ipAddress 发送目标IP * @param linkRequest 发送对象 * @param isTcp 是否TCP */ public HDLConnectHelper(int maxRetry, String ipAddress, LinkRequest linkRequest, boolean isTcp) { this(maxRetry, ipAddress, linkRequest, null, null, isTcp); } /** * 按照默认重发机制发送 * * @param ipAddress 发送目标IP * @param port 发送目标端口 * @param linkRequest 发送对象 * @param observeTopic 接收主题 * @param listener 回调 * @param isTcp 是否TCP */ public HDLConnectHelper(String ipAddress, int port, LinkRequest linkRequest, String observeTopic, HdlSocketListener listener, boolean isTcp) { this(DEF_MAX_RETRY, ipAddress, port, linkRequest, observeTopic, listener, isTcp); } /** * 默认端口发送 * * @param ipAddress 发送目标IP * @param linkRequest 发送对象 * @param observeTopic 接收主题 * @param listener 回调 * @param isTcp 是否TCP */ public HDLConnectHelper(String ipAddress, LinkRequest linkRequest, String observeTopic, HdlSocketListener listener, boolean isTcp) { this(DEF_SEND_TIMEOUT, DEF_MAX_RETRY, ipAddress, isTcp ? TCP_PORT : UDP_PORT, linkRequest, observeTopic, listener, isTcp); } /** * 发送一次 * * @param ipAddress 发送目标IP * @param linkRequest 发送对象 * @param isTcp 是否TCP */ public HDLConnectHelper(String ipAddress, LinkRequest linkRequest, boolean isTcp) { this(DEF_SEND_TIMEOUT, DEF_SEND_ONE, ipAddress, isTcp ? TCP_PORT : UDP_PORT, linkRequest, null, null, isTcp); } /** * 注册监听 */ private void registerListener() { if (!TextUtils.isEmpty(observeTopic) && null != listener) { EventDispatcher.getInstance().register(observeTopic, eventListener); } } /** * 移除监听 */ private void removeListener() { if (!TextUtils.isEmpty(observeTopic)) { EventDispatcher.getInstance().remove(observeTopic, eventListener); } } public void send() { getSendThread().scheduleWithFixedDelay(new Runnable() { @Override public void run() { //发送次数小于重发次数 if ((sendNumber.get() < maxRetry)) { try { //还没有收到回复,再发送 if (!isSend.get()) { sendNumber.set(sendNumber.get() + 1); if(!linkRequest.getTopic().endsWith("heartbeat")) {//心跳主题数据过多,过滤下 LogUtils.i("发送数据:\r\n" + new String(linkRequest.getSendBytes())); } //如是tcp if (isTcp) { HDLTcpConnect.getTcpSocketBoot(ipAddress).sendMsg(EncryptUtil.getEncryBytes(linkRequest)); } else { //如果是udp HDLUdpConnect.getInstance().getUdpBoot().sendMsg(ipAddress, port, EncryptUtil.getEncryBytes(linkRequest)); } } } catch (Exception e) { e.printStackTrace(); } } else { //超出重发次数并没有收到回复 if (!isSend.get()) { notifyFailure(); } } } }, 0, sendAwaitTime, TimeUnit.MILLISECONDS); //initialdelay - 首次执行的延迟时间 0 //delay - 一次执行终止和下一次执行开始之间的延迟 } /** * 获取发送线程 * * @return 返回获取到的线程 */ private ScheduledExecutorService getSendThread() { if (sendThread == null) { sendThread = ThreadToolUtils.getInstance().newScheduledThreadPool(1); } return sendThread; } /** * 发送失败 */ private void notifyFailure() { //移除监听 removeListener(); if (sendThread != null) { sendThread.shutdownNow(); sendThread = null; } if (listener != null) { listener.onFailure(); listener = null; } } }