wjc
2025-02-07 a1c880c476327032fe174af8c80d6f78a590dae9
sdk/src/main/java/com/hdl/sdk/link/core/connect/HDLConnectHelper.java
New file
@@ -0,0 +1,564 @@
package com.hdl.sdk.link.core.connect;
import android.text.TextUtils;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.hdl.link.error.ErrorUtils;
import com.hdl.link.error.HDLLinkCode;
import com.hdl.sdk.link.common.event.EventDispatcher;
import com.hdl.sdk.link.common.event.EventListener;
import com.hdl.sdk.link.common.utils.LogUtils;
import com.hdl.sdk.link.common.utils.ThreadToolUtils;
import com.hdl.sdk.link.core.bean.BusProResponse;
import com.hdl.sdk.link.core.bean.LinkRequest;
import com.hdl.sdk.link.core.bean.LinkResponse;
import com.hdl.sdk.link.core.bean.ModbusResponse;
import com.hdl.sdk.link.core.bean.RealLinkResponse;
import com.hdl.sdk.link.core.bean.ZigbeeResponse;
import com.hdl.sdk.link.core.bean.gateway.GatewayBean;
import com.hdl.sdk.link.core.config.HDLLinkConfig;
import com.hdl.sdk.link.core.utils.ByteUtils;
import com.hdl.sdk.link.core.utils.EncryptUtil;
import com.hdl.sdk.link.core.utils.JsonUtils;
import com.hdl.sdk.link.core.utils.mqtt.MqttRecvClient;
import com.hdl.sdk.link.gateway.HDLLinkLocalGateway;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
/**
 * Created by Tong on 2021/11/11.
 */
public class HDLConnectHelper {
    private static final String TAG="HDLConnectHelper";
    private static final Long DEF_SEND_TIMEOUT = 8000L;
    private static final int DEF_MAX_RETRY = 1;//最大重发数
    private static final int DEF_SEND_ONE = 1;
    private static final int TCP_PORT = 8586;
    private static final int UDP_PORT = 8585;
    private final Long sendAwaitTime;
    private final int maxRetry;
    /**
     * 是否tcp发送类型
     */
    private boolean isTcp;
    /**
     * 设备mac
     */
    private String mac;
    /**
     * 发送的目标IP
     */
    private String ipAddress;
    /**
     * 发送的目标地址
     */
    private int port;
    private final LinkRequest linkRequest;
    private final EventListener eventListener;
    private final AtomicInteger sendNumber = new AtomicInteger(0);
    private final AtomicBoolean isSend = new AtomicBoolean(false);
    private HdlSocketListener listener;
    private static ScheduledExecutorService scheduledExecutorService;
    private ScheduledFuture<?> scheduledFuture;
    private String replyTopic;
    private boolean useSubThread=false;
    public interface HdlSocketListener {
        void onSucceed(Object msg);
        void onFailure(HDLLinkCode hdlLinkCode);
    }
    public HDLConnectHelper(Long sendAwaitTime, int maxRetry, String ipAddress, int port,
                            LinkRequest linkRequest, HdlSocketListener listener, boolean isTcp) {
        this(sendAwaitTime,maxRetry,ipAddress,port,linkRequest,listener,isTcp,false);
    }
    /**
     * 发送UDP或者TCP数据
     *
     * @param sendAwaitTime 每次发送等待时间
     * @param maxRetry      重试次数
     * @param ipAddress     发送目标IP
     * @param port          发送目标端口
     * @param linkRequest   发送对象
     * @param listener      回调
     * @param isTcp         是否TCP
     */
    public HDLConnectHelper(Long sendAwaitTime, int maxRetry, String ipAddress, int port,
                            LinkRequest linkRequest, HdlSocketListener listener, boolean isTcp,boolean useSubThread) {
        this.sendAwaitTime = sendAwaitTime;
        this.maxRetry = maxRetry;
        this.ipAddress = ipAddress;
        this.port = port;
        this.linkRequest = linkRequest;
        this.replyTopic = linkRequest.getReplyTopic();
        this.listener = listener;
        this.isTcp = isTcp;
        this.useSubThread=useSubThread;
        eventListener = new EventListener() {
            @Override
            public void onMessage(Object object) {
                isSend.set(true);
                try {
                    if (object instanceof LinkResponse) {//这个是兼容以前使用主题做为回调的方式,后续可以不需要使用,link还是用信息id合理些
                        LinkResponse linkResponse = (LinkResponse) object;
                        JSONObject jsonObject = JSON.parseObject(linkResponse.getData());
                        String id = null;
                        Integer code=null;
                        if(jsonObject!=null) {
                            id = jsonObject.getString("id");
                            code = jsonObject.getInteger("code");
                        }
                        /**
                         * 可能返回code属性可能没有   没有的话直接成功  有的话只有200才会成功
                         */
                        if (code == null || code.intValue() == 200 || code.intValue() == 0) {
                            notifySucceed(object);
                        } else {
                            notifyFailure(ErrorUtils.getByCode(code));
                        }
                    } else if (object instanceof ZigbeeResponse) {
                        notifySucceed(object);
                    } else if (object instanceof ModbusResponse) {
                        notifySucceed(object);
                    } else if (object instanceof BusProResponse) {
                        notifySucceed(object);
                    } else if(object instanceof RealLinkResponse){
                        //真实的link数据,不包含透传的原生数据
                        notifySucceed(object);
                    }
                    else {
                        notifyFailure(new HDLLinkCode(HDLLinkCode.HDL_OBJECT_NOT_SUPPORT.getCode(), "Object Name:" + object));
                    }
                } catch (Exception e) {
                    notifyFailure(new HDLLinkCode(HDLLinkCode.HDL_APPLICATION_CODE.getCode(), e.getMessage()));
                }
            }
        };
        //注册监听
        registerListener();
    }
    /**
     * 发送UDP或者TCP数据(参数有mac)
     *
     * @param sendAwaitTime 每次发送等待时间
     * @param maxRetry      重试次数
     * @param ipAddress     发送目标IP
     * @param port          发送目标端口
     * @param linkRequest   发送对象
     * @param listener      回调
     * @param isTcp         是否TCP
     * @param mac           设备mac
     */
    public HDLConnectHelper(Long sendAwaitTime, int maxRetry, String ipAddress, int port,
                            LinkRequest linkRequest, HdlSocketListener listener, boolean isTcp, String mac) {
        this(sendAwaitTime, maxRetry, ipAddress, port, linkRequest, listener, isTcp,false);
        this.mac = mac;
    }
    public HDLConnectHelper(Long sendAwaitTime, int maxRetry, String ipAddress, int port,
                            LinkRequest linkRequest, HdlSocketListener listener, boolean isTcp, String mac,boolean subThread) {
        this(sendAwaitTime, maxRetry, ipAddress, port, linkRequest, listener, isTcp,subThread);
        this.mac = mac;
    }
    /**
     * 按照指定次数发,回调
     *
     * @param maxRetry    重试次数
     * @param ipAddress   发送目标IP
     * @param port        发送目标端口
     * @param linkRequest 发送对象
     * @param listener    回调
     * @param isTcp       是否TCP
     */
    public HDLConnectHelper(int maxRetry, String ipAddress, int port,
                            LinkRequest linkRequest, HdlSocketListener listener, boolean isTcp) {
        this(DEF_SEND_TIMEOUT, maxRetry, ipAddress, port, linkRequest, listener, isTcp);
    }
    /**
     * 按照指定次数发,回调
     *
     * @param maxRetry    重试次数
     * @param ipAddress   发送目标IP
     * @param linkRequest 发送对象
     * @param listener    回调
     * @param isTcp       是否TCP
     */
    public HDLConnectHelper(int maxRetry, String ipAddress,
                            LinkRequest linkRequest, HdlSocketListener listener, boolean isTcp) {
        this(maxRetry, ipAddress, isTcp ? TCP_PORT : UDP_PORT, linkRequest, listener, isTcp);
    }
    /**
     * 按照指定次数发,不回调
     *
     * @param maxRetry    重试次数
     * @param ipAddress   发送目标IP
     * @param linkRequest 发送对象
     * @param isTcp       是否TCP
     */
    public HDLConnectHelper(int maxRetry, String ipAddress,
                            LinkRequest linkRequest, boolean isTcp) {
        this(maxRetry, ipAddress, linkRequest, null, isTcp);
    }
    /**
     * 按照默认重发机制发送
     *
     * @param ipAddress   发送目标IP
     * @param port        发送目标端口
     * @param linkRequest 发送对象
     * @param listener    回调
     * @param isTcp       是否TCP
     */
    public HDLConnectHelper(String ipAddress, int port,
                            LinkRequest linkRequest, HdlSocketListener listener, boolean isTcp) {
        this(DEF_MAX_RETRY, ipAddress, port, linkRequest, listener, isTcp);
    }
    /**
     * 默认端口发送
     *
     * @param ipAddress   发送目标IP
     * @param linkRequest 发送对象
     * @param listener    回调
     * @param isTcp       是否TCP
     */
    public HDLConnectHelper(String ipAddress,
                            LinkRequest linkRequest, HdlSocketListener listener, boolean isTcp) {
        this(DEF_SEND_TIMEOUT, DEF_MAX_RETRY, ipAddress, isTcp ? TCP_PORT : UDP_PORT, linkRequest, listener, isTcp);
    }
    /**
     * 默认端口发送(参数有mac)
     *
     * @param ipAddress   发送目标IP
     * @param linkRequest 发送对象
     * @param listener    回调
     * @param isTcp       是否TCP
     * @param mac         设备mac
     */
    public HDLConnectHelper(String ipAddress,
                            LinkRequest linkRequest, HdlSocketListener listener, boolean isTcp, String mac) {
        this(DEF_SEND_TIMEOUT, DEF_MAX_RETRY, ipAddress, isTcp ? TCP_PORT : UDP_PORT, linkRequest, listener, isTcp, mac);
    }
    /**
     * 发送一次
     *
     * @param ipAddress   发送目标IP
     * @param linkRequest 发送对象
     * @param isTcp       是否TCP
     */
    public HDLConnectHelper(String ipAddress, LinkRequest linkRequest, boolean isTcp) {
        this(DEF_SEND_TIMEOUT, DEF_SEND_ONE, ipAddress, isTcp ? TCP_PORT : UDP_PORT, linkRequest, null, isTcp);
    }
    /**
     * 发送一次
     *
     * @param ipAddress   发送目标IP
     * @param linkRequest 发送对象
     * @param isTcp       是否TCP
     */
    public HDLConnectHelper(Long timeout, String ipAddress, LinkRequest linkRequest, boolean isTcp) {
        this(timeout, DEF_SEND_ONE, ipAddress, isTcp ? TCP_PORT : UDP_PORT, linkRequest, null, isTcp);
    }
    /**
     * 注册监听
     */
    private void registerListener() {
        if (!TextUtils.isEmpty(replyTopic) && null != listener) {
            if(useSubThread){//使用子线程回调
                EventDispatcher.getInstance().asyncRegister(replyTopic, eventListener);
            }
            else {
                //默认用主线程
                EventDispatcher.getInstance().register(replyTopic, eventListener);
            }
        }
    }
    /**
     * 移除监听
     */
    private void removeListener() {
        if (!TextUtils.isEmpty(replyTopic)) {
            EventDispatcher.getInstance().remove(replyTopic, eventListener);
        }
    }
    public static boolean isLocal() {
        String ip = HDLLinkConfig.getInstance().getIpAddress();
        if (ip == null) {
            //如是本地是可以搜索到ip的
            return false;
        }
        //本地是可以远程成功的
        return HDLTcpConnect.getTcpSocketBoot(ip).isConnected();
    }
    public void send() {
        scheduledFuture= getSendThread().scheduleWithFixedDelay(new Runnable() {
            @Override
            public void run() {
                try {
                    //发送次数小于重发次数
                    if ((sendNumber.get() < maxRetry)) {
                        try {
                            //还没有收到回复,再发送
                            if (!isSend.get()) {
                                sendNumber.set(sendNumber.get() + 1);
                                //如是tcp或者mqtt
                                if (isTcp) {
                                    //mqtt
                                    if (TextUtils.isEmpty(ipAddress) || !HDLTcpConnect.getTcpSocketBoot(ipAddress).isConnected()) {
                                        if (!linkRequest.getTopic().endsWith("heartbeat")) {//心跳主题数据过多,过滤下
                                            //LogUtils.i("心跳包发送数据:\r\n" + new String(linkRequest.getCloudSendBytes()));
                                        } else {
                                            return;//云端情况下,心跳可以不用
                                        }
                                        String requestTopic = linkRequest.getCloudTopic();
                                        GatewayBean gatewayBean = HDLLinkLocalGateway.getInstance().getLocalGateway(mac);
                                        //网关的数据
                                        String aes = HDLLinkConfig.getInstance().getAesKey();//默认用主网关的密钥
                                        if (gatewayBean != null) {
                                            aes = gatewayBean.getAesKey();
                                        }
                                        //可能对象中没有设置密钥,不应该存在这种情况。
                                        if (TextUtils.isEmpty(aes)) {
                                            LogUtils.e(TAG, "找不到远程加密的密钥,这个问题要排期解决");
                                            return;
                                        }
                                        byte[] encryBytes = EncryptUtil.encryBytes(linkRequest.getCloudSendBytes(), aes);
                                        if (MqttRecvClient.getInstance() != null) {
                                            MqttRecvClient.getInstance().send(requestTopic, encryBytes);
                                            if (HDLConnectHelper.isInverterTopic(linkRequest.getCloudTopic())) {
                                                LogUtils.i(TAG, "远程发送数据,主题:" + linkRequest.getCloudTopic() + "\r\nPayload:" + ByteUtils.encodeHexString(linkRequest.getData()));
                                            } else {
                                                LogUtils.i(TAG, "远程发送数据,主题:" + linkRequest.getCloudTopic() + "\r\nPayload:" + new String(linkRequest.getCloudSendBytes()));
                                            }
                                        }
                                    }
                                    //本地TCP
                                    else {
                                        if (!linkRequest.getTopic().endsWith("heartbeat")) {//心跳主题数据过多,过滤下
                                            if (JsonUtils.isJson(linkRequest.getData())) {
                                                LogUtils.i(TAG, "本地发送数据\r\n" + new String(linkRequest.getSendBytes()));
                                            } else {
                                                LogUtils.i(TAG, String.format("本地发送主题:%s\r\nPayload:%s", linkRequest.getTopic(), ByteUtils.encodeHexString(linkRequest.getData())));
                                            }
                                        }
                                        HDLTcpConnect.getTcpSocketBoot(ipAddress).sendMsg(EncryptUtil.getEncryBytes(linkRequest));
                                    }
                                } else {
                                    //如果是udp
                                    LogUtils.i(TAG, "本地发送UDP数据\r\n" + new String(linkRequest.getSendBytes()));
                                    HDLUdpConnect.getInstance().getUdpBoot().sendMsg(ipAddress, port, EncryptUtil.getEncryBytes(linkRequest));
                                }
                            }
                        } catch (Exception e) {
                            LogUtils.e(TAG, "数据发送异常:" + e.getMessage());
                        }
                    } else {
                        //超出重发次数并没有收到回复
                        if (!isSend.get()) {
                            if (linkRequest.getTopic().endsWith("heartbeat")) {//心跳主题先不通知
                                notifyFailure(null);
                            } else {
                                if (!TextUtils.isEmpty(replyTopic) && null != listener) {//需要打印出失败的日志
                                    LogUtils.e(TAG, "发送失败数据主题:" + linkRequest.getTopic());
                                }
                                if (isTcp) {
                                    //mqtt
                                    if (TextUtils.isEmpty(ipAddress) || !HDLTcpConnect.getTcpSocketBoot(ipAddress).isConnected()) {
                                        notifyFailure(HDLLinkCode.HDL_GATEWAY_REMOTE_NOT_RESPONSE);
                                    }
                                    //本地TCP,并是连接状态
                                    else {
                                        notifyFailure(HDLLinkCode.HDL_TIMEOUT_ERROR);
                                    }
                                } else {
                                    notifyFailure(HDLLinkCode.HDL_TIMEOUT_ERROR);
                                }
                            }
                        }
                    }
                }catch (Exception e){
                    e.printStackTrace();
                }
            }
        }, 0, sendAwaitTime, TimeUnit.MILLISECONDS);
        //initialdelay - 首次执行的延迟时间 0
        //delay - 一次执行终止和下一次执行开始之间的延迟
    }
    /**
     * 获取发送线程
     *
     * @return 返回获取到的线程
     */
    private synchronized ScheduledExecutorService getSendThread() {
        if (scheduledExecutorService == null) {
            scheduledExecutorService = ThreadToolUtils.getInstance().newScheduledThreadPool(3);
        }
        return scheduledExecutorService;
    }
    /**
     * 发送失败
     */
    private void notifyFailure(HDLLinkCode hdlLinkCode) {
        //移除监听
        removeListener();
        if(scheduledFuture!=null){
            scheduledFuture.cancel(false);
        }
//        if (sendThread != null) {
//            sendThread.shutdownNow();
//            sendThread = null;
//        }
        if (listener != null && hdlLinkCode != null) {
            listener.onFailure(hdlLinkCode);
            listener = null;
        }
    }
    //    /**
//     * 支持毫米类型
//     *
//     * @return 类型列表
//     */
//    public static List<String> getGatewayTypeList() {
//        List<String> typeList = new ArrayList<>();
////        typeList.add("sensor.mmv_sleep");//睡眠毫米波spk
////        typeList.add("sensor.mmv_pose");//姿态毫米波spk
//        typeList.add("energy.hdl_inverter");//逆变器spk
//        typeList.add("sensor.mmv_sleep");//睡眠毫米波spk
//        typeList.add("sensor.mmv_pose");//姿态毫米波spk
//        typeList.add("sensor.hdl_mmw_pose");//Wi-Fi毫米波ZT版本
//        return typeList;
//    }
//
    public static List<String> getMillimeterTypeList() {
        List<String> typeList = new ArrayList<>();
//        typeList.add("AGATEWAY");//网关
        typeList.add("sensor.mmv_sleep");//睡眠毫米波spk
        typeList.add("sensor.mmv_pose");//姿态毫米波spk
        typeList.add("sensor.hdl_mmw_pose");//Wi-Fi毫米波ZT版本
        typeList.add("sensor.hdl_mmw_sleep");//Wi-Fi毫米波睡眠版
        return typeList;
    }
    public static List<String> getGatewayModelList() {
        List<String> typeList = new ArrayList<>();
        typeList.add("MIR01R-LK.10");//睡眠毫米波spk
        typeList.add("MSMWP-LK.30");//姿态毫米波spk
        typeList.add("MSMWH-LK.30");//姿态毫米波spk
        return typeList;
    }
    /**
     * 获取除了网关的其它网络设备,上面写的那两个方法不一致getGatewayTypeList,getMillimeterTypeList,统一了下,以免后期出问题
     * @return
     */
    public static List<String> getNotGatewayTypeList(){
        List<String> typeList = new ArrayList<>();
        typeList.add("energy.hdl_inverter");//逆变器spk
        typeList.add("sensor.mmv_sleep");//睡眠毫米波spk
        typeList.add("sensor.mmv_pose");//姿态毫米波spk
        typeList.add("sensor.hdl_mmw_pose");//Wi-Fi毫米波ZT版本
        typeList.add("sensor.hdl_mmw_sleep");//Wi-Fi毫米波睡眠版
        return  typeList;
    }
    public static List<String> getNewMillimeterTypeList() {
        List<String> typeList = new ArrayList<>();
        typeList.add("sensor.hdl_mmw_pose");//Wi-Fi毫米波ZT版本
        typeList.add("sensor.hdl_mmw_sleep");//Wi-Fi毫米波睡眠版
        return typeList;
    }
    private void notifySucceed(Object msg) {
        //移除监听
        removeListener();
        if(scheduledFuture!=null){
            scheduledFuture.cancel(false);
        }
//        if (sendThread != null) {
//            sendThread.shutdownNow();
//            sendThread = null;
//        }
        if (listener != null) {
            listener.onSucceed(msg);
            listener = null;
        }
    }
    public static boolean isInverterTopic(String topic) {
        if (TextUtils.isEmpty(topic)) {
            return false;
        }
        return topic.endsWith("custom/native/inverter/down_reply")
                || topic.endsWith("custom/native/inverter/down")
                || topic.endsWith("custom/native/inverter/up");
    }
    /**
     * byte数组转换成int数组(为了打印出无符号的btye数组数据)
     *
     * @param bytes 数组
     * @return
     */
    public static Integer[] byteArrayConvertIntArray(byte[] bytes) {
        if (bytes == null || bytes.length == 0) {
            return new Integer[]{};
        }
        Integer[] arr = new Integer[bytes.length];
        for (int i = 0; i < bytes.length; i++) {
            arr[i] = bytes[i] & 0xff;
        }
        return arr;
    }
    public boolean isUseSubThread() {
        return useSubThread;
    }
    public void setUseSubThread(boolean useSubThread) {
        this.useSubThread = useSubThread;
    }
}