package com.hdl.sdk.link.core.connect; import android.text.TextUtils; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.hdl.link.error.ErrorUtils; import com.hdl.link.error.HDLLinkCode; import com.hdl.sdk.link.common.event.EventDispatcher; import com.hdl.sdk.link.common.event.EventListener; import com.hdl.sdk.link.common.utils.LogUtils; import com.hdl.sdk.link.common.utils.ThreadToolUtils; import com.hdl.sdk.link.core.bean.BusProResponse; import com.hdl.sdk.link.core.bean.LinkRequest; import com.hdl.sdk.link.core.bean.LinkResponse; import com.hdl.sdk.link.core.bean.ModbusResponse; import com.hdl.sdk.link.core.bean.RealLinkResponse; import com.hdl.sdk.link.core.bean.ZigbeeResponse; import com.hdl.sdk.link.core.bean.gateway.GatewayBean; import com.hdl.sdk.link.core.config.HDLLinkConfig; import com.hdl.sdk.link.core.utils.ByteUtils; import com.hdl.sdk.link.core.utils.EncryptUtil; import com.hdl.sdk.link.core.utils.JsonUtils; import com.hdl.sdk.link.core.utils.mqtt.MqttRecvClient; import com.hdl.sdk.link.gateway.HDLLinkLocalGateway; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; /** * Created by Tong on 2021/11/11. */ public class HDLConnectHelper { private static final String TAG="HDLConnectHelper"; private static final Long DEF_SEND_TIMEOUT = 8000L; private static final int DEF_MAX_RETRY = 1;//最大重发数 private static final int DEF_SEND_ONE = 1; private static final int TCP_PORT = 8586; private static final int UDP_PORT = 8585; private final Long sendAwaitTime; private final int maxRetry; /** * 是否tcp发送类型 */ private boolean isTcp; /** * 设备mac */ private String mac; /** * 发送的目标IP */ private String ipAddress; /** * 发送的目标地址 */ private int port; private final LinkRequest linkRequest; private final EventListener eventListener; private final AtomicInteger sendNumber = new AtomicInteger(0); private final AtomicBoolean isSend = new AtomicBoolean(false); private HdlSocketListener listener; private static ScheduledExecutorService scheduledExecutorService; private ScheduledFuture scheduledFuture; private String replyTopic; private boolean useSubThread=false; public interface HdlSocketListener { void onSucceed(Object msg); void onFailure(HDLLinkCode hdlLinkCode); } /** * 发送UDP或者TCP数据 * * @param sendAwaitTime 每次发送等待时间 * @param maxRetry 重试次数 * @param ipAddress 发送目标IP * @param port 发送目标端口 * @param linkRequest 发送对象 * @param listener 回调 * @param isTcp 是否TCP */ public HDLConnectHelper(Long sendAwaitTime, int maxRetry, String ipAddress, int port, LinkRequest linkRequest, HdlSocketListener listener, boolean isTcp) { this.sendAwaitTime = sendAwaitTime; this.maxRetry = maxRetry; this.ipAddress = ipAddress; this.port = port; this.linkRequest = linkRequest; this.replyTopic = linkRequest.getReplyTopic(); this.listener = listener; this.isTcp = isTcp; eventListener = new EventListener() { @Override public void onMessage(Object object) { isSend.set(true); try { if (object instanceof LinkResponse) {//这个是兼容以前使用主题做为回调的方式,后续可以不需要使用,link还是用信息id合理些 LinkResponse linkResponse = (LinkResponse) object; JSONObject jsonObject = JSON.parseObject(linkResponse.getData()); String id = null; Integer code=null; if(jsonObject!=null) { id = jsonObject.getString("id"); code = jsonObject.getInteger("code"); } /** * 可能返回code属性可能没有 没有的话直接成功 有的话只有200才会成功 */ if (code == null || code.intValue() == 200 || code.intValue() == 0) { notifySucceed(object); } else { notifyFailure(ErrorUtils.getByCode(code)); } } else if (object instanceof ZigbeeResponse) { notifySucceed(object); } else if (object instanceof ModbusResponse) { notifySucceed(object); } else if (object instanceof BusProResponse) { notifySucceed(object); } else if(object instanceof RealLinkResponse){ //真实的link数据,不包含透传的原生数据 notifySucceed(object); } else { notifyFailure(new HDLLinkCode(HDLLinkCode.HDL_OBJECT_NOT_SUPPORT.getCode(), "Object Name:" + object)); } } catch (Exception e) { notifyFailure(new HDLLinkCode(HDLLinkCode.HDL_APPLICATION_CODE.getCode(), e.getMessage())); } } }; //注册监听 registerListener(); } /** * 发送UDP或者TCP数据(参数有mac) * * @param sendAwaitTime 每次发送等待时间 * @param maxRetry 重试次数 * @param ipAddress 发送目标IP * @param port 发送目标端口 * @param linkRequest 发送对象 * @param listener 回调 * @param isTcp 是否TCP * @param mac 设备mac */ public HDLConnectHelper(Long sendAwaitTime, int maxRetry, String ipAddress, int port, LinkRequest linkRequest, HdlSocketListener listener, boolean isTcp, String mac) { this(sendAwaitTime, maxRetry, ipAddress, port, linkRequest, listener, isTcp); this.mac = mac; } /** * 按照指定次数发,回调 * * @param maxRetry 重试次数 * @param ipAddress 发送目标IP * @param port 发送目标端口 * @param linkRequest 发送对象 * @param listener 回调 * @param isTcp 是否TCP */ public HDLConnectHelper(int maxRetry, String ipAddress, int port, LinkRequest linkRequest, HdlSocketListener listener, boolean isTcp) { this(DEF_SEND_TIMEOUT, maxRetry, ipAddress, port, linkRequest, listener, isTcp); } /** * 按照指定次数发,回调 * * @param maxRetry 重试次数 * @param ipAddress 发送目标IP * @param linkRequest 发送对象 * @param listener 回调 * @param isTcp 是否TCP */ public HDLConnectHelper(int maxRetry, String ipAddress, LinkRequest linkRequest, HdlSocketListener listener, boolean isTcp) { this(maxRetry, ipAddress, isTcp ? TCP_PORT : UDP_PORT, linkRequest, listener, isTcp); } /** * 按照指定次数发,不回调 * * @param maxRetry 重试次数 * @param ipAddress 发送目标IP * @param linkRequest 发送对象 * @param isTcp 是否TCP */ public HDLConnectHelper(int maxRetry, String ipAddress, LinkRequest linkRequest, boolean isTcp) { this(maxRetry, ipAddress, linkRequest, null, isTcp); } /** * 按照默认重发机制发送 * * @param ipAddress 发送目标IP * @param port 发送目标端口 * @param linkRequest 发送对象 * @param listener 回调 * @param isTcp 是否TCP */ public HDLConnectHelper(String ipAddress, int port, LinkRequest linkRequest, HdlSocketListener listener, boolean isTcp) { this(DEF_MAX_RETRY, ipAddress, port, linkRequest, listener, isTcp); } /** * 默认端口发送 * * @param ipAddress 发送目标IP * @param linkRequest 发送对象 * @param listener 回调 * @param isTcp 是否TCP */ public HDLConnectHelper(String ipAddress, LinkRequest linkRequest, HdlSocketListener listener, boolean isTcp) { this(DEF_SEND_TIMEOUT, DEF_MAX_RETRY, ipAddress, isTcp ? TCP_PORT : UDP_PORT, linkRequest, listener, isTcp); } /** * 默认端口发送(参数有mac) * * @param ipAddress 发送目标IP * @param linkRequest 发送对象 * @param listener 回调 * @param isTcp 是否TCP * @param mac 设备mac */ public HDLConnectHelper(String ipAddress, LinkRequest linkRequest, HdlSocketListener listener, boolean isTcp, String mac) { this(DEF_SEND_TIMEOUT, DEF_MAX_RETRY, ipAddress, isTcp ? TCP_PORT : UDP_PORT, linkRequest, listener, isTcp, mac); } /** * 发送一次 * * @param ipAddress 发送目标IP * @param linkRequest 发送对象 * @param isTcp 是否TCP */ public HDLConnectHelper(String ipAddress, LinkRequest linkRequest, boolean isTcp) { this(DEF_SEND_TIMEOUT, DEF_SEND_ONE, ipAddress, isTcp ? TCP_PORT : UDP_PORT, linkRequest, null, isTcp); } /** * 发送一次 * * @param ipAddress 发送目标IP * @param linkRequest 发送对象 * @param isTcp 是否TCP */ public HDLConnectHelper(Long timeout, String ipAddress, LinkRequest linkRequest, boolean isTcp) { this(timeout, DEF_SEND_ONE, ipAddress, isTcp ? TCP_PORT : UDP_PORT, linkRequest, null, isTcp); } /** * 注册监听 */ private void registerListener() { if (!TextUtils.isEmpty(replyTopic) && null != listener) { if(useSubThread){//使用子线程回调 EventDispatcher.getInstance().asyncRegister(replyTopic, eventListener); } else { //默认用主线程 EventDispatcher.getInstance().register(replyTopic, eventListener); } } } /** * 移除监听 */ private void removeListener() { if (!TextUtils.isEmpty(replyTopic)) { EventDispatcher.getInstance().remove(replyTopic, eventListener); } } public static boolean isLocal() { String ip = HDLLinkConfig.getInstance().getIpAddress(); if (ip == null) { //如是本地是可以搜索到ip的 return false; } //本地是可以远程成功的 return HDLTcpConnect.getTcpSocketBoot(ip).isConnected(); } public void send() { scheduledFuture= getSendThread().scheduleWithFixedDelay(new Runnable() { @Override public void run() { try { //发送次数小于重发次数 if ((sendNumber.get() < maxRetry)) { try { //还没有收到回复,再发送 if (!isSend.get()) { sendNumber.set(sendNumber.get() + 1); //如是tcp或者mqtt if (isTcp) { //mqtt if (TextUtils.isEmpty(ipAddress) || !HDLTcpConnect.getTcpSocketBoot(ipAddress).isConnected()) { if (!linkRequest.getTopic().endsWith("heartbeat")) {//心跳主题数据过多,过滤下 //LogUtils.i("心跳包发送数据:\r\n" + new String(linkRequest.getCloudSendBytes())); } else { return;//云端情况下,心跳可以不用 } String requestTopic = linkRequest.getCloudTopic(); GatewayBean gatewayBean = HDLLinkLocalGateway.getInstance().getLocalGateway(mac); //网关的数据 String aes = HDLLinkConfig.getInstance().getAesKey();//默认用主网关的密钥 if (gatewayBean != null) { aes = gatewayBean.getAesKey(); } //可能对象中没有设置密钥,不应该存在这种情况。 if (TextUtils.isEmpty(aes)) { LogUtils.e(TAG, "找不到远程加密的密钥,这个问题要排期解决"); return; } byte[] encryBytes = EncryptUtil.encryBytes(linkRequest.getCloudSendBytes(), aes); if (MqttRecvClient.getInstance() != null) { MqttRecvClient.getInstance().send(requestTopic, encryBytes); if (HDLConnectHelper.isInverterTopic(linkRequest.getCloudTopic())) { LogUtils.i(TAG, "远程发送数据,主题:" + linkRequest.getCloudTopic() + "\r\nPayload:" + ByteUtils.encodeHexString(linkRequest.getData())); } else { LogUtils.i(TAG, "远程发送数据,主题:" + linkRequest.getCloudTopic() + "\r\nPayload:" + new String(linkRequest.getCloudSendBytes())); } } } //本地TCP else { if (!linkRequest.getTopic().endsWith("heartbeat")) {//心跳主题数据过多,过滤下 if (JsonUtils.isJson(linkRequest.getData())) { LogUtils.i(TAG, "本地发送数据\r\n" + new String(linkRequest.getSendBytes())); } else { LogUtils.i(TAG, String.format("本地发送主题:%s\r\nPayload:%s", linkRequest.getTopic(), ByteUtils.encodeHexString(linkRequest.getData()))); } } HDLTcpConnect.getTcpSocketBoot(ipAddress).sendMsg(EncryptUtil.getEncryBytes(linkRequest)); } } else { //如果是udp LogUtils.i(TAG, "本地发送UDP数据\r\n" + new String(linkRequest.getSendBytes())); HDLUdpConnect.getInstance().getUdpBoot().sendMsg(ipAddress, port, EncryptUtil.getEncryBytes(linkRequest)); } } } catch (Exception e) { LogUtils.e(TAG, "数据发送异常:" + e.getMessage()); } } else { //超出重发次数并没有收到回复 if (!isSend.get()) { if (linkRequest.getTopic().endsWith("heartbeat")) {//心跳主题先不通知 notifyFailure(null); } else { if (!TextUtils.isEmpty(replyTopic) && null != listener) {//需要打印出失败的日志 LogUtils.e(TAG, "发送失败数据主题:" + linkRequest.getTopic()); } if (isTcp) { //mqtt if (TextUtils.isEmpty(ipAddress) || !HDLTcpConnect.getTcpSocketBoot(ipAddress).isConnected()) { notifyFailure(HDLLinkCode.HDL_GATEWAY_REMOTE_NOT_RESPONSE); } //本地TCP,并是连接状态 else { notifyFailure(HDLLinkCode.HDL_TIMEOUT_ERROR); } } else { notifyFailure(HDLLinkCode.HDL_TIMEOUT_ERROR); } } } } }catch (Exception e){ e.printStackTrace(); } } }, 0, sendAwaitTime, TimeUnit.MILLISECONDS); //initialdelay - 首次执行的延迟时间 0 //delay - 一次执行终止和下一次执行开始之间的延迟 } /** * 获取发送线程 * * @return 返回获取到的线程 */ private synchronized ScheduledExecutorService getSendThread() { if (scheduledExecutorService == null) { scheduledExecutorService = ThreadToolUtils.getInstance().newScheduledThreadPool(3); } return scheduledExecutorService; } /** * 发送失败 */ private void notifyFailure(HDLLinkCode hdlLinkCode) { //移除监听 removeListener(); if(scheduledFuture!=null){ scheduledFuture.cancel(false); } // if (sendThread != null) { // sendThread.shutdownNow(); // sendThread = null; // } if (listener != null && hdlLinkCode != null) { listener.onFailure(hdlLinkCode); listener = null; } } // /** // * 支持毫米类型 // * // * @return 类型列表 // */ // public static List getGatewayTypeList() { // List typeList = new ArrayList<>(); //// typeList.add("sensor.mmv_sleep");//睡眠毫米波spk //// typeList.add("sensor.mmv_pose");//姿态毫米波spk // typeList.add("energy.hdl_inverter");//逆变器spk // typeList.add("sensor.mmv_sleep");//睡眠毫米波spk // typeList.add("sensor.mmv_pose");//姿态毫米波spk // typeList.add("sensor.hdl_mmw_pose");//Wi-Fi毫米波ZT版本 // return typeList; // } // public static List getMillimeterTypeList() { List typeList = new ArrayList<>(); // typeList.add("AGATEWAY");//网关 typeList.add("sensor.mmv_sleep");//睡眠毫米波spk typeList.add("sensor.mmv_pose");//姿态毫米波spk typeList.add("sensor.hdl_mmw_pose");//Wi-Fi毫米波ZT版本 typeList.add("sensor.hdl_mmw_sleep");//Wi-Fi毫米波睡眠版 return typeList; } public static List getGatewayModelList() { List typeList = new ArrayList<>(); typeList.add("MIR01R-LK.10");//睡眠毫米波spk typeList.add("MSMWP-LK.30");//姿态毫米波spk typeList.add("MSMWH-LK.30");//姿态毫米波spk return typeList; } /** * 获取除了网关的其它网络设备,上面写的那两个方法不一致getGatewayTypeList,getMillimeterTypeList,统一了下,以免后期出问题 * @return */ public static List getNotGatewayTypeList(){ List typeList = new ArrayList<>(); typeList.add("energy.hdl_inverter");//逆变器spk typeList.add("sensor.mmv_sleep");//睡眠毫米波spk typeList.add("sensor.mmv_pose");//姿态毫米波spk typeList.add("sensor.hdl_mmw_pose");//Wi-Fi毫米波ZT版本 typeList.add("sensor.hdl_mmw_sleep");//Wi-Fi毫米波睡眠版 return typeList; } public static List getNewMillimeterTypeList() { List typeList = new ArrayList<>(); typeList.add("sensor.hdl_mmw_pose");//Wi-Fi毫米波ZT版本 typeList.add("sensor.hdl_mmw_sleep");//Wi-Fi毫米波睡眠版 return typeList; } private void notifySucceed(Object msg) { //移除监听 removeListener(); if(scheduledFuture!=null){ scheduledFuture.cancel(false); } // if (sendThread != null) { // sendThread.shutdownNow(); // sendThread = null; // } if (listener != null) { listener.onSucceed(msg); listener = null; } } public static boolean isInverterTopic(String topic) { if (TextUtils.isEmpty(topic)) { return false; } return topic.endsWith("custom/native/inverter/down_reply") || topic.endsWith("custom/native/inverter/down") || topic.endsWith("custom/native/inverter/up"); } /** * byte数组转换成int数组(为了打印出无符号的btye数组数据) * * @param bytes 数组 * @return */ public static Integer[] byteArrayConvertIntArray(byte[] bytes) { if (bytes == null || bytes.length == 0) { return new Integer[]{}; } Integer[] arr = new Integer[bytes.length]; for (int i = 0; i < bytes.length; i++) { arr[i] = bytes[i] & 0xff; } return arr; } public boolean isUseSubThread() { return useSubThread; } public void setUseSubThread(boolean useSubThread) { this.useSubThread = useSubThread; } }