package com.hdl.sdk.link.core.connect;
|
|
|
import com.google.gson.JsonObject;
|
import com.google.gson.JsonParser;
|
import com.google.gson.reflect.TypeToken;
|
import com.hdl.sdk.link.bean.LinkRoomBean;
|
import com.hdl.sdk.link.common.config.TopicConstant;
|
import com.hdl.sdk.link.common.event.EventDispatcher;
|
import com.hdl.sdk.link.common.event.EventListener;
|
import com.hdl.sdk.link.common.utils.IdUtils;
|
import com.hdl.sdk.link.common.utils.LogUtils;
|
import com.hdl.sdk.link.common.utils.TextUtils;
|
import com.hdl.sdk.link.common.utils.ThreadToolUtils;
|
import com.hdl.sdk.link.core.bean.LinkRequest;
|
import com.hdl.sdk.link.core.bean.LinkResponse;
|
import com.hdl.sdk.link.core.bean.gateway.GatewayBean;
|
import com.hdl.sdk.link.core.bean.response.BaseLocalResponse;
|
import com.hdl.sdk.link.core.utils.EncryptUtil;
|
import com.hdl.sdk.link.core.utils.LinkResponseUtils;
|
import com.hdl.sdk.link.gateway.HDLLinkLocalGateway;
|
|
|
import java.lang.reflect.Type;
|
import java.util.List;
|
import java.util.concurrent.ScheduledExecutorService;
|
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
/**
|
* Created by Tong on 2021/11/11.
|
*/
|
public class HDLConnectHelper {
|
|
private static final Long DEF_SEND_TIMEOUT = 8000L;
|
private static final int DEF_MAX_RETRY = 1;//最大重发数
|
private static final int DEF_SEND_ONE = 1;
|
private static final int TCP_PORT = 8586;
|
private static final int UDP_PORT = 8585;
|
|
private final Long sendAwaitTime;
|
private final int maxRetry;
|
|
/**
|
* 是否tcp发送类型
|
*/
|
private boolean isTcp;
|
/**
|
* 发送的目标IP
|
*/
|
private String ipAddress;
|
/**
|
* 发送的目标地址
|
*/
|
private int port;
|
private final LinkRequest linkRequest;
|
private final EventListener eventListener;
|
|
private final AtomicInteger sendNumber = new AtomicInteger(0);
|
|
private final AtomicBoolean isSend = new AtomicBoolean(false);
|
|
private HdlSocketListener listener;
|
|
private ScheduledExecutorService sendThread;
|
|
private String observeTopic;
|
|
public interface HdlSocketListener {
|
void onSucceed(Object msg);
|
|
void onFailure();
|
}
|
|
/**
|
* 发送UDP或者TCP数据
|
*
|
* @param sendAwaitTime 每次发送等待时间
|
* @param maxRetry 重试次数
|
* @param ipAddress 发送目标IP
|
* @param port 发送目标端口
|
* @param linkRequest 发送对象
|
* @param observeTopic 接收主题
|
* @param listener 回调
|
* @param isTcp 是否TCP
|
*/
|
public HDLConnectHelper(Long sendAwaitTime, int maxRetry, String ipAddress, int port,
|
LinkRequest linkRequest, String observeTopic, HdlSocketListener listener, boolean isTcp) {
|
this.sendAwaitTime = sendAwaitTime;
|
this.maxRetry = maxRetry;
|
this.ipAddress = ipAddress;
|
this.port = port;
|
this.linkRequest = linkRequest;
|
this.observeTopic = observeTopic;
|
this.listener = listener;
|
this.isTcp = isTcp;
|
|
eventListener = new EventListener() {
|
@Override
|
public void onMessage(Object msg) {
|
try {
|
//移除监听
|
removeListener();
|
isSend.set(true);
|
if (sendThread != null) {
|
sendThread.shutdownNow();
|
}
|
|
if (listener != null && msg instanceof LinkResponse) {
|
LinkResponse linkResponse = (LinkResponse) msg;
|
JsonObject jsonObject = new JsonParser().parse(linkResponse.getData()).getAsJsonObject();
|
|
String code = null;
|
if (jsonObject.get("code") != null) {
|
code = jsonObject.get("code").getAsString();
|
}
|
/**
|
* 可能返回code属性可能没有 没有的话直接成功 有的话只有200才会成功
|
*/
|
if (code == null || code.equals("200") || code.equals("0")) {
|
listener.onSucceed(msg);
|
return;
|
}
|
}
|
} catch (Exception e) {
|
|
}
|
//上面没有正常执行,回调失败
|
notifyFailure();
|
}
|
};
|
//注册监听
|
registerListener();
|
}
|
|
/**
|
* 按照指定次数发,回调
|
*
|
* @param maxRetry 重试次数
|
* @param ipAddress 发送目标IP
|
* @param port 发送目标端口
|
* @param linkRequest 发送对象
|
* @param observeTopic 接收主题
|
* @param listener 回调
|
* @param isTcp 是否TCP
|
*/
|
public HDLConnectHelper(int maxRetry, String ipAddress, int port,
|
LinkRequest linkRequest, String observeTopic, HdlSocketListener listener, boolean isTcp) {
|
this(DEF_SEND_TIMEOUT, maxRetry, ipAddress, port, linkRequest, observeTopic, listener, isTcp);
|
}
|
|
/**
|
* 按照指定次数发,回调
|
*
|
* @param maxRetry 重试次数
|
* @param ipAddress 发送目标IP
|
* @param linkRequest 发送对象
|
* @param observeTopic 接收主题
|
* @param listener 回调
|
* @param isTcp 是否TCP
|
*/
|
public HDLConnectHelper(int maxRetry, String ipAddress,
|
LinkRequest linkRequest, String observeTopic, HdlSocketListener listener, boolean isTcp) {
|
this(maxRetry, ipAddress, isTcp ? TCP_PORT : UDP_PORT, linkRequest, observeTopic, listener, isTcp);
|
}
|
|
/**
|
* 按照指定次数发,不回调
|
*
|
* @param maxRetry 重试次数
|
* @param ipAddress 发送目标IP
|
* @param linkRequest 发送对象
|
* @param isTcp 是否TCP
|
*/
|
public HDLConnectHelper(int maxRetry, String ipAddress,
|
LinkRequest linkRequest, boolean isTcp) {
|
this(maxRetry, ipAddress, linkRequest, null, null, isTcp);
|
}
|
|
/**
|
* 按照默认重发机制发送
|
*
|
* @param ipAddress 发送目标IP
|
* @param port 发送目标端口
|
* @param linkRequest 发送对象
|
* @param observeTopic 接收主题
|
* @param listener 回调
|
* @param isTcp 是否TCP
|
*/
|
public HDLConnectHelper(String ipAddress, int port,
|
LinkRequest linkRequest, String observeTopic, HdlSocketListener listener, boolean isTcp) {
|
this(DEF_MAX_RETRY, ipAddress, port, linkRequest, observeTopic, listener, isTcp);
|
}
|
|
/**
|
* 默认端口发送
|
*
|
* @param ipAddress 发送目标IP
|
* @param linkRequest 发送对象
|
* @param observeTopic 接收主题
|
* @param listener 回调
|
* @param isTcp 是否TCP
|
*/
|
public HDLConnectHelper(String ipAddress,
|
LinkRequest linkRequest, String observeTopic, HdlSocketListener listener, boolean isTcp) {
|
this(DEF_SEND_TIMEOUT, DEF_MAX_RETRY, ipAddress, isTcp ? TCP_PORT : UDP_PORT, linkRequest, observeTopic, listener, isTcp);
|
}
|
|
/**
|
* 发送一次
|
*
|
* @param ipAddress 发送目标IP
|
* @param linkRequest 发送对象
|
* @param isTcp 是否TCP
|
*/
|
public HDLConnectHelper(String ipAddress, LinkRequest linkRequest, boolean isTcp) {
|
this(DEF_SEND_TIMEOUT, DEF_SEND_ONE, ipAddress, isTcp ? TCP_PORT : UDP_PORT, linkRequest, null, null, isTcp);
|
}
|
|
/**
|
* 注册监听
|
*/
|
private void registerListener() {
|
if (!TextUtils.isEmpty(observeTopic) && null != listener) {
|
EventDispatcher.getInstance().register(observeTopic, eventListener);
|
}
|
}
|
|
/**
|
* 移除监听
|
*/
|
private void removeListener() {
|
if (!TextUtils.isEmpty(observeTopic)) {
|
EventDispatcher.getInstance().remove(observeTopic, eventListener);
|
}
|
}
|
|
public void send() {
|
getSendThread().scheduleWithFixedDelay(new Runnable() {
|
@Override
|
public void run() {
|
//发送次数小于重发次数
|
if ((sendNumber.get() < maxRetry)) {
|
try {
|
//还没有收到回复,再发送
|
if (!isSend.get()) {
|
sendNumber.set(sendNumber.get() + 1);
|
if(!linkRequest.getTopic().endsWith("heartbeat")) {//心跳主题数据过多,过滤下
|
LogUtils.i("发送数据:\r\n" + new String(linkRequest.getSendBytes()));
|
}
|
//如是tcp
|
if (isTcp) {
|
HDLTcpConnect.getTcpSocketBoot(ipAddress).sendMsg(EncryptUtil.getEncryBytes(linkRequest));
|
} else {
|
//如果是udp
|
HDLUdpConnect.getInstance().getUdpBoot().sendMsg(ipAddress, port, EncryptUtil.getEncryBytes(linkRequest));
|
}
|
}
|
} catch (Exception e) {
|
e.printStackTrace();
|
}
|
} else {
|
//超出重发次数并没有收到回复
|
if (!isSend.get()) {
|
notifyFailure();
|
}
|
}
|
}
|
}, 0, sendAwaitTime, TimeUnit.MILLISECONDS);
|
//initialdelay - 首次执行的延迟时间 0
|
//delay - 一次执行终止和下一次执行开始之间的延迟
|
}
|
|
|
/**
|
* 获取发送线程
|
*
|
* @return 返回获取到的线程
|
*/
|
private ScheduledExecutorService getSendThread() {
|
if (sendThread == null) {
|
sendThread = ThreadToolUtils.getInstance().newScheduledThreadPool(1);
|
}
|
return sendThread;
|
}
|
|
|
/**
|
* 发送失败
|
*/
|
private void notifyFailure() {
|
//移除监听
|
removeListener();
|
if (sendThread != null) {
|
sendThread.shutdownNow();
|
sendThread = null;
|
}
|
if (listener != null) {
|
listener.onFailure();
|
listener = null;
|
}
|
}
|
}
|