package com.hdl.sdk.link.core.connect;
|
|
import android.text.TextUtils;
|
|
import com.alibaba.fastjson.JSON;
|
import com.alibaba.fastjson.JSONObject;
|
import com.hdl.link.error.ErrorUtils;
|
import com.hdl.link.error.HDLLinkCode;
|
import com.hdl.sdk.link.common.event.EventDispatcher;
|
import com.hdl.sdk.link.common.event.EventListener;
|
import com.hdl.sdk.link.common.utils.LogUtils;
|
import com.hdl.sdk.link.common.utils.ThreadToolUtils;
|
import com.hdl.sdk.link.core.bean.BusProResponse;
|
import com.hdl.sdk.link.core.bean.LinkRequest;
|
import com.hdl.sdk.link.core.bean.LinkResponse;
|
import com.hdl.sdk.link.core.bean.ModbusResponse;
|
import com.hdl.sdk.link.core.bean.RealLinkResponse;
|
import com.hdl.sdk.link.core.bean.ZigbeeResponse;
|
import com.hdl.sdk.link.core.bean.gateway.GatewayBean;
|
import com.hdl.sdk.link.core.config.HDLLinkConfig;
|
import com.hdl.sdk.link.core.utils.ByteUtils;
|
import com.hdl.sdk.link.core.utils.EncryptUtil;
|
import com.hdl.sdk.link.core.utils.JsonUtils;
|
import com.hdl.sdk.link.core.utils.mqtt.MqttRecvClient;
|
import com.hdl.sdk.link.gateway.HDLLinkLocalGateway;
|
|
import java.util.ArrayList;
|
import java.util.List;
|
import java.util.concurrent.ScheduledExecutorService;
|
import java.util.concurrent.ScheduledFuture;
|
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
/**
|
* Created by Tong on 2021/11/11.
|
*/
|
public class HDLConnectHelper {
|
|
private static final String TAG="HDLConnectHelper";
|
private static final Long DEF_SEND_TIMEOUT = 8000L;
|
private static final int DEF_MAX_RETRY = 1;//最大重发数
|
private static final int DEF_SEND_ONE = 1;
|
private static final int TCP_PORT = 8586;
|
private static final int UDP_PORT = 8585;
|
|
private final Long sendAwaitTime;
|
private final int maxRetry;
|
|
/**
|
* 是否tcp发送类型
|
*/
|
private boolean isTcp;
|
/**
|
* 设备mac
|
*/
|
private String mac;
|
/**
|
* 发送的目标IP
|
*/
|
private String ipAddress;
|
/**
|
* 发送的目标地址
|
*/
|
private int port;
|
private final LinkRequest linkRequest;
|
private final EventListener eventListener;
|
|
private final AtomicInteger sendNumber = new AtomicInteger(0);
|
|
private final AtomicBoolean isSend = new AtomicBoolean(false);
|
|
private HdlSocketListener listener;
|
|
private static ScheduledExecutorService scheduledExecutorService;
|
|
private ScheduledFuture<?> scheduledFuture;
|
|
private String replyTopic;
|
|
private boolean useSubThread=false;
|
|
public interface HdlSocketListener {
|
void onSucceed(Object msg);
|
|
void onFailure(HDLLinkCode hdlLinkCode);
|
}
|
|
/**
|
* 发送UDP或者TCP数据
|
*
|
* @param sendAwaitTime 每次发送等待时间
|
* @param maxRetry 重试次数
|
* @param ipAddress 发送目标IP
|
* @param port 发送目标端口
|
* @param linkRequest 发送对象
|
* @param listener 回调
|
* @param isTcp 是否TCP
|
*/
|
public HDLConnectHelper(Long sendAwaitTime, int maxRetry, String ipAddress, int port,
|
LinkRequest linkRequest, HdlSocketListener listener, boolean isTcp) {
|
this.sendAwaitTime = sendAwaitTime;
|
this.maxRetry = maxRetry;
|
this.ipAddress = ipAddress;
|
this.port = port;
|
this.linkRequest = linkRequest;
|
this.replyTopic = linkRequest.getReplyTopic();
|
this.listener = listener;
|
this.isTcp = isTcp;
|
|
|
eventListener = new EventListener() {
|
@Override
|
public void onMessage(Object object) {
|
isSend.set(true);
|
try {
|
if (object instanceof LinkResponse) {//这个是兼容以前使用主题做为回调的方式,后续可以不需要使用,link还是用信息id合理些
|
LinkResponse linkResponse = (LinkResponse) object;
|
JSONObject jsonObject = JSON.parseObject(linkResponse.getData());
|
String id = null;
|
Integer code=null;
|
if(jsonObject!=null) {
|
id = jsonObject.getString("id");
|
code = jsonObject.getInteger("code");
|
}
|
/**
|
* 可能返回code属性可能没有 没有的话直接成功 有的话只有200才会成功
|
*/
|
if (code == null || code.intValue() == 200 || code.intValue() == 0) {
|
notifySucceed(object);
|
} else {
|
notifyFailure(ErrorUtils.getByCode(code));
|
}
|
} else if (object instanceof ZigbeeResponse) {
|
notifySucceed(object);
|
} else if (object instanceof ModbusResponse) {
|
notifySucceed(object);
|
} else if (object instanceof BusProResponse) {
|
notifySucceed(object);
|
} else if(object instanceof RealLinkResponse){
|
//真实的link数据,不包含透传的原生数据
|
notifySucceed(object);
|
}
|
else {
|
notifyFailure(new HDLLinkCode(HDLLinkCode.HDL_OBJECT_NOT_SUPPORT.getCode(), "Object Name:" + object));
|
}
|
} catch (Exception e) {
|
notifyFailure(new HDLLinkCode(HDLLinkCode.HDL_APPLICATION_CODE.getCode(), e.getMessage()));
|
}
|
}
|
};
|
//注册监听
|
registerListener();
|
}
|
|
/**
|
* 发送UDP或者TCP数据(参数有mac)
|
*
|
* @param sendAwaitTime 每次发送等待时间
|
* @param maxRetry 重试次数
|
* @param ipAddress 发送目标IP
|
* @param port 发送目标端口
|
* @param linkRequest 发送对象
|
* @param listener 回调
|
* @param isTcp 是否TCP
|
* @param mac 设备mac
|
*/
|
public HDLConnectHelper(Long sendAwaitTime, int maxRetry, String ipAddress, int port,
|
LinkRequest linkRequest, HdlSocketListener listener, boolean isTcp, String mac) {
|
this(sendAwaitTime, maxRetry, ipAddress, port, linkRequest, listener, isTcp);
|
this.mac = mac;
|
}
|
|
/**
|
* 按照指定次数发,回调
|
*
|
* @param maxRetry 重试次数
|
* @param ipAddress 发送目标IP
|
* @param port 发送目标端口
|
* @param linkRequest 发送对象
|
* @param listener 回调
|
* @param isTcp 是否TCP
|
*/
|
public HDLConnectHelper(int maxRetry, String ipAddress, int port,
|
LinkRequest linkRequest, HdlSocketListener listener, boolean isTcp) {
|
this(DEF_SEND_TIMEOUT, maxRetry, ipAddress, port, linkRequest, listener, isTcp);
|
}
|
|
/**
|
* 按照指定次数发,回调
|
*
|
* @param maxRetry 重试次数
|
* @param ipAddress 发送目标IP
|
* @param linkRequest 发送对象
|
* @param listener 回调
|
* @param isTcp 是否TCP
|
*/
|
public HDLConnectHelper(int maxRetry, String ipAddress,
|
LinkRequest linkRequest, HdlSocketListener listener, boolean isTcp) {
|
this(maxRetry, ipAddress, isTcp ? TCP_PORT : UDP_PORT, linkRequest, listener, isTcp);
|
}
|
|
/**
|
* 按照指定次数发,不回调
|
*
|
* @param maxRetry 重试次数
|
* @param ipAddress 发送目标IP
|
* @param linkRequest 发送对象
|
* @param isTcp 是否TCP
|
*/
|
public HDLConnectHelper(int maxRetry, String ipAddress,
|
LinkRequest linkRequest, boolean isTcp) {
|
this(maxRetry, ipAddress, linkRequest, null, isTcp);
|
}
|
|
/**
|
* 按照默认重发机制发送
|
*
|
* @param ipAddress 发送目标IP
|
* @param port 发送目标端口
|
* @param linkRequest 发送对象
|
* @param listener 回调
|
* @param isTcp 是否TCP
|
*/
|
public HDLConnectHelper(String ipAddress, int port,
|
LinkRequest linkRequest, HdlSocketListener listener, boolean isTcp) {
|
this(DEF_MAX_RETRY, ipAddress, port, linkRequest, listener, isTcp);
|
}
|
|
/**
|
* 默认端口发送
|
*
|
* @param ipAddress 发送目标IP
|
* @param linkRequest 发送对象
|
* @param listener 回调
|
* @param isTcp 是否TCP
|
*/
|
public HDLConnectHelper(String ipAddress,
|
LinkRequest linkRequest, HdlSocketListener listener, boolean isTcp) {
|
this(DEF_SEND_TIMEOUT, DEF_MAX_RETRY, ipAddress, isTcp ? TCP_PORT : UDP_PORT, linkRequest, listener, isTcp);
|
}
|
|
/**
|
* 默认端口发送(参数有mac)
|
*
|
* @param ipAddress 发送目标IP
|
* @param linkRequest 发送对象
|
* @param listener 回调
|
* @param isTcp 是否TCP
|
* @param mac 设备mac
|
*/
|
public HDLConnectHelper(String ipAddress,
|
LinkRequest linkRequest, HdlSocketListener listener, boolean isTcp, String mac) {
|
this(DEF_SEND_TIMEOUT, DEF_MAX_RETRY, ipAddress, isTcp ? TCP_PORT : UDP_PORT, linkRequest, listener, isTcp, mac);
|
}
|
|
|
/**
|
* 发送一次
|
*
|
* @param ipAddress 发送目标IP
|
* @param linkRequest 发送对象
|
* @param isTcp 是否TCP
|
*/
|
public HDLConnectHelper(String ipAddress, LinkRequest linkRequest, boolean isTcp) {
|
this(DEF_SEND_TIMEOUT, DEF_SEND_ONE, ipAddress, isTcp ? TCP_PORT : UDP_PORT, linkRequest, null, isTcp);
|
}
|
|
/**
|
* 发送一次
|
*
|
* @param ipAddress 发送目标IP
|
* @param linkRequest 发送对象
|
* @param isTcp 是否TCP
|
*/
|
public HDLConnectHelper(Long timeout, String ipAddress, LinkRequest linkRequest, boolean isTcp) {
|
this(timeout, DEF_SEND_ONE, ipAddress, isTcp ? TCP_PORT : UDP_PORT, linkRequest, null, isTcp);
|
}
|
|
|
/**
|
* 注册监听
|
*/
|
private void registerListener() {
|
if (!TextUtils.isEmpty(replyTopic) && null != listener) {
|
if(useSubThread){//使用子线程回调
|
EventDispatcher.getInstance().asyncRegister(replyTopic, eventListener);
|
}
|
else {
|
//默认用主线程
|
EventDispatcher.getInstance().register(replyTopic, eventListener);
|
}
|
}
|
}
|
|
/**
|
* 移除监听
|
*/
|
private void removeListener() {
|
if (!TextUtils.isEmpty(replyTopic)) {
|
EventDispatcher.getInstance().remove(replyTopic, eventListener);
|
}
|
}
|
|
public static boolean isLocal() {
|
String ip = HDLLinkConfig.getInstance().getIpAddress();
|
if (ip == null) {
|
//如是本地是可以搜索到ip的
|
return false;
|
}
|
|
//本地是可以远程成功的
|
return HDLTcpConnect.getTcpSocketBoot(ip).isConnected();
|
}
|
|
public void send() {
|
|
scheduledFuture= getSendThread().scheduleWithFixedDelay(new Runnable() {
|
@Override
|
public void run() {
|
try {
|
//发送次数小于重发次数
|
if ((sendNumber.get() < maxRetry)) {
|
try {
|
|
//还没有收到回复,再发送
|
if (!isSend.get()) {
|
sendNumber.set(sendNumber.get() + 1);
|
|
//如是tcp或者mqtt
|
if (isTcp) {
|
//mqtt
|
if (TextUtils.isEmpty(ipAddress) || !HDLTcpConnect.getTcpSocketBoot(ipAddress).isConnected()) {
|
if (!linkRequest.getTopic().endsWith("heartbeat")) {//心跳主题数据过多,过滤下
|
//LogUtils.i("心跳包发送数据:\r\n" + new String(linkRequest.getCloudSendBytes()));
|
} else {
|
return;//云端情况下,心跳可以不用
|
}
|
String requestTopic = linkRequest.getCloudTopic();
|
GatewayBean gatewayBean = HDLLinkLocalGateway.getInstance().getLocalGateway(mac);
|
//网关的数据
|
String aes = HDLLinkConfig.getInstance().getAesKey();//默认用主网关的密钥
|
if (gatewayBean != null) {
|
aes = gatewayBean.getAesKey();
|
}
|
//可能对象中没有设置密钥,不应该存在这种情况。
|
if (TextUtils.isEmpty(aes)) {
|
LogUtils.e(TAG, "找不到远程加密的密钥,这个问题要排期解决");
|
return;
|
}
|
byte[] encryBytes = EncryptUtil.encryBytes(linkRequest.getCloudSendBytes(), aes);
|
if (MqttRecvClient.getInstance() != null) {
|
MqttRecvClient.getInstance().send(requestTopic, encryBytes);
|
if (HDLConnectHelper.isInverterTopic(linkRequest.getCloudTopic())) {
|
LogUtils.i(TAG, "远程发送数据,主题:" + linkRequest.getCloudTopic() + "\r\nPayload:" + ByteUtils.encodeHexString(linkRequest.getData()));
|
} else {
|
LogUtils.i(TAG, "远程发送数据,主题:" + linkRequest.getCloudTopic() + "\r\nPayload:" + new String(linkRequest.getCloudSendBytes()));
|
}
|
}
|
}
|
//本地TCP
|
else {
|
if (!linkRequest.getTopic().endsWith("heartbeat")) {//心跳主题数据过多,过滤下
|
if (JsonUtils.isJson(linkRequest.getData())) {
|
LogUtils.i(TAG, "本地发送数据\r\n" + new String(linkRequest.getSendBytes()));
|
} else {
|
LogUtils.i(TAG, String.format("本地发送主题:%s\r\nPayload:%s", linkRequest.getTopic(), ByteUtils.encodeHexString(linkRequest.getData())));
|
}
|
}
|
HDLTcpConnect.getTcpSocketBoot(ipAddress).sendMsg(EncryptUtil.getEncryBytes(linkRequest));
|
}
|
} else {
|
//如果是udp
|
LogUtils.i(TAG, "本地发送UDP数据\r\n" + new String(linkRequest.getSendBytes()));
|
HDLUdpConnect.getInstance().getUdpBoot().sendMsg(ipAddress, port, EncryptUtil.getEncryBytes(linkRequest));
|
}
|
}
|
} catch (Exception e) {
|
LogUtils.e(TAG, "数据发送异常:" + e.getMessage());
|
}
|
} else {
|
//超出重发次数并没有收到回复
|
if (!isSend.get()) {
|
if (linkRequest.getTopic().endsWith("heartbeat")) {//心跳主题先不通知
|
notifyFailure(null);
|
} else {
|
if (!TextUtils.isEmpty(replyTopic) && null != listener) {//需要打印出失败的日志
|
LogUtils.e(TAG, "发送失败数据主题:" + linkRequest.getTopic());
|
}
|
if (isTcp) {
|
//mqtt
|
if (TextUtils.isEmpty(ipAddress) || !HDLTcpConnect.getTcpSocketBoot(ipAddress).isConnected()) {
|
notifyFailure(HDLLinkCode.HDL_GATEWAY_REMOTE_NOT_RESPONSE);
|
}
|
//本地TCP,并是连接状态
|
else {
|
notifyFailure(HDLLinkCode.HDL_TIMEOUT_ERROR);
|
}
|
} else {
|
notifyFailure(HDLLinkCode.HDL_TIMEOUT_ERROR);
|
}
|
}
|
}
|
}
|
}catch (Exception e){
|
e.printStackTrace();
|
}
|
}
|
}, 0, sendAwaitTime, TimeUnit.MILLISECONDS);
|
//initialdelay - 首次执行的延迟时间 0
|
//delay - 一次执行终止和下一次执行开始之间的延迟
|
}
|
|
/**
|
* 获取发送线程
|
*
|
* @return 返回获取到的线程
|
*/
|
private synchronized ScheduledExecutorService getSendThread() {
|
if (scheduledExecutorService == null) {
|
scheduledExecutorService = ThreadToolUtils.getInstance().newScheduledThreadPool(3);
|
}
|
return scheduledExecutorService;
|
}
|
|
/**
|
* 发送失败
|
*/
|
private void notifyFailure(HDLLinkCode hdlLinkCode) {
|
//移除监听
|
removeListener();
|
if(scheduledFuture!=null){
|
scheduledFuture.cancel(false);
|
}
|
// if (sendThread != null) {
|
// sendThread.shutdownNow();
|
// sendThread = null;
|
// }
|
if (listener != null && hdlLinkCode != null) {
|
listener.onFailure(hdlLinkCode);
|
listener = null;
|
}
|
}
|
|
|
|
// /**
|
// * 支持毫米类型
|
// *
|
// * @return 类型列表
|
// */
|
// public static List<String> getGatewayTypeList() {
|
// List<String> typeList = new ArrayList<>();
|
//// typeList.add("sensor.mmv_sleep");//睡眠毫米波spk
|
//// typeList.add("sensor.mmv_pose");//姿态毫米波spk
|
// typeList.add("energy.hdl_inverter");//逆变器spk
|
// typeList.add("sensor.mmv_sleep");//睡眠毫米波spk
|
// typeList.add("sensor.mmv_pose");//姿态毫米波spk
|
// typeList.add("sensor.hdl_mmw_pose");//Wi-Fi毫米波ZT版本
|
// return typeList;
|
// }
|
//
|
public static List<String> getMillimeterTypeList() {
|
List<String> typeList = new ArrayList<>();
|
// typeList.add("AGATEWAY");//网关
|
typeList.add("sensor.mmv_sleep");//睡眠毫米波spk
|
typeList.add("sensor.mmv_pose");//姿态毫米波spk
|
typeList.add("sensor.hdl_mmw_pose");//Wi-Fi毫米波ZT版本
|
typeList.add("sensor.hdl_mmw_sleep");//Wi-Fi毫米波睡眠版
|
return typeList;
|
}
|
|
public static List<String> getGatewayModelList() {
|
List<String> typeList = new ArrayList<>();
|
typeList.add("MIR01R-LK.10");//睡眠毫米波spk
|
typeList.add("MSMWP-LK.30");//姿态毫米波spk
|
typeList.add("MSMWH-LK.30");//姿态毫米波spk
|
return typeList;
|
}
|
|
/**
|
* 获取除了网关的其它网络设备,上面写的那两个方法不一致getGatewayTypeList,getMillimeterTypeList,统一了下,以免后期出问题
|
* @return
|
*/
|
public static List<String> getNotGatewayTypeList(){
|
List<String> typeList = new ArrayList<>();
|
typeList.add("energy.hdl_inverter");//逆变器spk
|
typeList.add("sensor.mmv_sleep");//睡眠毫米波spk
|
typeList.add("sensor.mmv_pose");//姿态毫米波spk
|
typeList.add("sensor.hdl_mmw_pose");//Wi-Fi毫米波ZT版本
|
typeList.add("sensor.hdl_mmw_sleep");//Wi-Fi毫米波睡眠版
|
return typeList;
|
}
|
|
public static List<String> getNewMillimeterTypeList() {
|
List<String> typeList = new ArrayList<>();
|
typeList.add("sensor.hdl_mmw_pose");//Wi-Fi毫米波ZT版本
|
typeList.add("sensor.hdl_mmw_sleep");//Wi-Fi毫米波睡眠版
|
return typeList;
|
}
|
|
private void notifySucceed(Object msg) {
|
//移除监听
|
removeListener();
|
if(scheduledFuture!=null){
|
scheduledFuture.cancel(false);
|
}
|
// if (sendThread != null) {
|
// sendThread.shutdownNow();
|
// sendThread = null;
|
// }
|
if (listener != null) {
|
listener.onSucceed(msg);
|
listener = null;
|
}
|
}
|
|
|
|
public static boolean isInverterTopic(String topic) {
|
if (TextUtils.isEmpty(topic)) {
|
return false;
|
}
|
return topic.endsWith("custom/native/inverter/down_reply")
|
|| topic.endsWith("custom/native/inverter/down")
|
|| topic.endsWith("custom/native/inverter/up");
|
}
|
|
/**
|
* byte数组转换成int数组(为了打印出无符号的btye数组数据)
|
*
|
* @param bytes 数组
|
* @return
|
*/
|
public static Integer[] byteArrayConvertIntArray(byte[] bytes) {
|
if (bytes == null || bytes.length == 0) {
|
return new Integer[]{};
|
}
|
Integer[] arr = new Integer[bytes.length];
|
for (int i = 0; i < bytes.length; i++) {
|
arr[i] = bytes[i] & 0xff;
|
}
|
return arr;
|
}
|
|
public boolean isUseSubThread() {
|
return useSubThread;
|
}
|
|
public void setUseSubThread(boolean useSubThread) {
|
this.useSubThread = useSubThread;
|
}
|
}
|