wjc
2023-06-28 14de918a79943e4961b09fa01ed320c6cad41f2e
HDLLinkLocalSdk/src/main/java/com/hdl/sdk/link/core/connect/HDLConnectHelper.java
New file
@@ -0,0 +1,432 @@
package com.hdl.sdk.link.core.connect;
import android.text.TextUtils;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.hdl.sdk.link.common.event.EventDispatcher;
import com.hdl.sdk.link.common.event.EventListener;
import com.hdl.sdk.link.common.exception.HDLLinkCode;
import com.hdl.sdk.link.common.utils.ErrorUtils;
import com.hdl.sdk.link.common.utils.LogUtils;
import com.hdl.sdk.link.common.utils.ThreadToolUtils;
import com.hdl.sdk.link.core.bean.LinkRequest;
import com.hdl.sdk.link.core.bean.LinkResponse;
import com.hdl.sdk.link.core.bean.ZigbeeResponse;
import com.hdl.sdk.link.core.bean.gateway.GatewayBean;
import com.hdl.sdk.link.core.config.HDLLinkConfig;
import com.hdl.sdk.link.core.utils.EncryptUtil;
import com.hdl.sdk.link.core.utils.mqtt.MqttRecvClient;
import com.hdl.sdk.link.gateway.HDLLinkLocalGateway;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
/**
 * Created by Tong on 2021/11/11.
 */
public class HDLConnectHelper {
    private static final Long DEF_SEND_TIMEOUT = 8000L;
    private static final int DEF_MAX_RETRY = 1;//最大重发数
    private static final int DEF_SEND_ONE = 1;
    private static final int TCP_PORT = 8586;
    private static final int UDP_PORT = 8585;
    private final Long sendAwaitTime;
    private final int maxRetry;
    /**
     * 是否tcp发送类型
     */
    private boolean isTcp;
    /**
     * 设备mac
     */
    private String mac;
    /**
     * 发送的目标IP
     */
    private String ipAddress;
    /**
     * 发送的目标地址
     */
    private int port;
    private final LinkRequest linkRequest;
    private final EventListener eventListener;
    private final AtomicInteger sendNumber = new AtomicInteger(0);
    private final AtomicBoolean isSend = new AtomicBoolean(false);
    private HdlSocketListener listener;
    private ScheduledExecutorService sendThread;
    private String replyTopic;
    public interface HdlSocketListener {
        void onSucceed(Object msg);
        void onFailure(HDLLinkCode hdlLinkCode);
    }
    /**
     * 发送UDP或者TCP数据
     *
     * @param sendAwaitTime 每次发送等待时间
     * @param maxRetry      重试次数
     * @param ipAddress     发送目标IP
     * @param port          发送目标端口
     * @param linkRequest   发送对象
     * @param listener      回调
     * @param isTcp         是否TCP
     */
    public HDLConnectHelper(Long sendAwaitTime, int maxRetry, String ipAddress, int port,
                            LinkRequest linkRequest, HdlSocketListener listener, boolean isTcp) {
        this.sendAwaitTime = sendAwaitTime;
        this.maxRetry = maxRetry;
        this.ipAddress = ipAddress;
        this.port = port;
        this.linkRequest = linkRequest;
        this.replyTopic = linkRequest.getReplyTopic();
        this.listener = listener;
        this.isTcp = isTcp;
        eventListener = new EventListener() {
            @Override
            public void onMessage(Object msg) {
                isSend.set(true);
                try {
                    if (msg instanceof LinkResponse) {
                        LinkResponse linkResponse = (LinkResponse) msg;
                        JSONObject jsonObject = JSON.parseObject(linkResponse.getData());
                        String id = jsonObject.getString("id");
                        Integer code = jsonObject.getInteger("code");
                        /**
                         * 可能返回code属性可能没有   没有的话直接成功  有的话只有200才会成功
                         */
                        if (code == null || code.intValue() == 200 || code.intValue() == 0) {
                            notifySucceed(msg);
                        }else {
                            notifyFailure(ErrorUtils.getByCode(code));
                        }
                    }
                    else if (msg instanceof ZigbeeResponse) {
                        ZigbeeResponse linkResponse = (ZigbeeResponse) msg;
                        //TODO 如果配置从网关的信息,通过主网关转达,这里oid要判断下
                        if (replyTopic.equals(linkResponse.getTopic())) {
                            notifySucceed(linkResponse.getData());
                        }
                        else{
                            notifyFailure(HDLLinkCode.HDL_TOPIC_NOT_RIGHT);
                        }
                    }
                    else{
                        notifyFailure(new HDLLinkCode(HDLLinkCode.HDL_OBJECT_NOT_SUPPORT.getCode(), "Object Name:" + msg));
                    }
                } catch (Exception e) {
                    notifyFailure(new HDLLinkCode(HDLLinkCode.HDL_APPLICATION_CODE.getCode(), e.getMessage()));
                }
            }
        };
        //注册监听
        registerListener();
    }
    /**
     * 发送UDP或者TCP数据(参数有mac)
     *
     * @param sendAwaitTime 每次发送等待时间
     * @param maxRetry      重试次数
     * @param ipAddress     发送目标IP
     * @param port          发送目标端口
     * @param linkRequest   发送对象
     * @param listener      回调
     * @param isTcp         是否TCP
     * @param mac           设备mac
     */
    public HDLConnectHelper(Long sendAwaitTime, int maxRetry, String ipAddress, int port,
                            LinkRequest linkRequest, HdlSocketListener listener, boolean isTcp,String mac) {
        this(sendAwaitTime,maxRetry,ipAddress,port,linkRequest,listener,isTcp);
        this.mac = mac;
    }
    /**
     * 按照指定次数发,回调
     *
     * @param maxRetry    重试次数
     * @param ipAddress   发送目标IP
     * @param port        发送目标端口
     * @param linkRequest 发送对象
     * @param listener    回调
     * @param isTcp       是否TCP
     */
    public HDLConnectHelper(int maxRetry, String ipAddress, int port,
                            LinkRequest linkRequest, HdlSocketListener listener, boolean isTcp) {
        this(DEF_SEND_TIMEOUT, maxRetry, ipAddress, port, linkRequest, listener, isTcp);
    }
    /**
     * 按照指定次数发,回调
     *
     * @param maxRetry    重试次数
     * @param ipAddress   发送目标IP
     * @param linkRequest 发送对象
     * @param listener    回调
     * @param isTcp       是否TCP
     */
    public HDLConnectHelper(int maxRetry, String ipAddress,
                            LinkRequest linkRequest, HdlSocketListener listener, boolean isTcp) {
        this(maxRetry, ipAddress, isTcp ? TCP_PORT : UDP_PORT, linkRequest, listener, isTcp);
    }
    /**
     * 按照指定次数发,不回调
     *
     * @param maxRetry    重试次数
     * @param ipAddress   发送目标IP
     * @param linkRequest 发送对象
     * @param isTcp       是否TCP
     */
    public HDLConnectHelper(int maxRetry, String ipAddress,
                            LinkRequest linkRequest, boolean isTcp) {
        this(maxRetry, ipAddress, linkRequest, null, isTcp);
    }
    /**
     * 按照默认重发机制发送
     *
     * @param ipAddress   发送目标IP
     * @param port        发送目标端口
     * @param linkRequest 发送对象
     * @param listener    回调
     * @param isTcp       是否TCP
     */
    public HDLConnectHelper(String ipAddress, int port,
                            LinkRequest linkRequest, HdlSocketListener listener, boolean isTcp) {
        this(DEF_MAX_RETRY, ipAddress, port, linkRequest, listener, isTcp);
    }
    /**
     * 默认端口发送
     *
     * @param ipAddress   发送目标IP
     * @param linkRequest 发送对象
     * @param listener    回调
     * @param isTcp       是否TCP
     */
    public HDLConnectHelper(String ipAddress,
                            LinkRequest linkRequest, HdlSocketListener listener, boolean isTcp) {
        this(DEF_SEND_TIMEOUT, DEF_MAX_RETRY, ipAddress, isTcp ? TCP_PORT : UDP_PORT, linkRequest, listener, isTcp);
    }
    /**
     * 默认端口发送(参数有mac)
     *
     * @param ipAddress   发送目标IP
     * @param linkRequest 发送对象
     * @param listener    回调
     * @param isTcp       是否TCP
     * @param mac         设备mac
     */
    public HDLConnectHelper(String ipAddress,
                            LinkRequest linkRequest, HdlSocketListener listener, boolean isTcp,String mac) {
        this(DEF_SEND_TIMEOUT, DEF_MAX_RETRY, ipAddress, isTcp ? TCP_PORT : UDP_PORT, linkRequest, listener, isTcp,mac);
    }
    /**
     * 发送一次
     *
     * @param ipAddress   发送目标IP
     * @param linkRequest 发送对象
     * @param isTcp       是否TCP
     */
    public HDLConnectHelper(String ipAddress, LinkRequest linkRequest, boolean isTcp) {
        this(DEF_SEND_TIMEOUT, DEF_SEND_ONE, ipAddress, isTcp ? TCP_PORT : UDP_PORT, linkRequest, null, isTcp);
    }
    /**
     * 发送一次
     *
     * @param ipAddress   发送目标IP
     * @param linkRequest 发送对象
     * @param isTcp       是否TCP
     */
    public HDLConnectHelper(Long timeout,String ipAddress, LinkRequest linkRequest, boolean isTcp) {
        this(timeout, DEF_SEND_ONE, ipAddress, isTcp ? TCP_PORT : UDP_PORT, linkRequest, null, isTcp);
    }
    /**
     * 注册监听
     */
    private void registerListener() {
        if (!TextUtils.isEmpty(replyTopic) && null != listener) {
            EventDispatcher.getInstance().register(replyTopic, eventListener);
        }
    }
    /**
     * 移除监听
     */
    private void removeListener() {
        if (!TextUtils.isEmpty(replyTopic)) {
            EventDispatcher.getInstance().remove(replyTopic, eventListener);
        }
    }
    public static boolean isLocal() {
        String ip = HDLLinkConfig.getInstance().getIpAddress();
        if (ip == null) {
            //如是本地是可以搜索到ip的
            return false;
        }
        //本地是可以远程成功的
        return HDLTcpConnect.getTcpSocketBoot(ip).isConnected();
    }
    public void send() {
        getSendThread().scheduleWithFixedDelay(new Runnable() {
            @Override
            public void run() {
                //发送次数小于重发次数
                if ((sendNumber.get() < maxRetry)) {
                    try {
                        //还没有收到回复,再发送
                        if (!isSend.get()) {
                            sendNumber.set(sendNumber.get() + 1);
                            //如是tcp或者mqtt
                            if (isTcp) {
                                //mqtt
                                if (TextUtils.isEmpty(ipAddress) || !HDLTcpConnect.getTcpSocketBoot(ipAddress).isConnected()) {
                                    if (!linkRequest.getTopic().endsWith("heartbeat")) {//心跳主题数据过多,过滤下
                                        //LogUtils.i("心跳包发送数据:\r\n" + new String(linkRequest.getCloudSendBytes()));
                                    } else {
                                        return;//云端情况下,心跳可以不用
                                    }
                                    String requestTopic = linkRequest.getCloudTopic();
                                    byte[] encryBytes = null;
                                    GatewayBean gatewayBean = HDLLinkLocalGateway.getInstance().getLocalGateway(mac);
                                    if (gatewayBean != null && getGatewayTypeList().contains(gatewayBean.getGatewayType())) {
                                        //毫米波远程mqtt秘钥不一样
                                        encryBytes = EncryptUtil.encryBytes(linkRequest.getCloudSendBytes(), gatewayBean.getAesKey());
                                    } else {
                                        encryBytes = EncryptUtil.encryBytes(linkRequest.getCloudSendBytes(), HDLLinkConfig.getInstance().getAesKey());
                                    }
                                    if (MqttRecvClient.getInstance() != null) {
                                        MqttRecvClient.getInstance().send(requestTopic, encryBytes);
                                    }
                                    LogUtils.i("远程发送数据:" + linkRequest.getCloudTopic() + "\r\n" + new String(linkRequest.getCloudSendBytes()));
                                }
                                //本地TCP
                                else {
                                    if (!linkRequest.getTopic().endsWith("heartbeat")) {//心跳主题数据过多,过滤下
                                        LogUtils.i("本地发送数据:\r\n" + new String(linkRequest.getSendBytes()));
                                    }
                                    HDLTcpConnect.getTcpSocketBoot(ipAddress).sendMsg(EncryptUtil.getEncryBytes(linkRequest));
                                }
                            } else {
                                //如果是udp
                                LogUtils.i("本地发送数据UDP:" + new String(linkRequest.getSendBytes()));
                                HDLUdpConnect.getInstance().getUdpBoot().sendMsg(ipAddress, port, EncryptUtil.getEncryBytes(linkRequest));
                            }
                        }
                    } catch (Exception e) {
                        LogUtils.e("数据发送异常:", e.getMessage());
                    }
                } else {
                    //超出重发次数并没有收到回复
                    if (!isSend.get()) {
                        if (linkRequest.getTopic().endsWith("heartbeat")) {//心跳主题先不通知
                            notifyFailure(null);
                        } else {
                            if (!TextUtils.isEmpty(replyTopic) && null != listener) {//需要打印出失败的日志
                                LogUtils.e("发送失败数据主题:" + linkRequest.getTopic());
                            }
                            if (isTcp) {
                                //mqtt
                                if (TextUtils.isEmpty(ipAddress) || !HDLTcpConnect.getTcpSocketBoot(ipAddress).isConnected()) {
                                    notifyFailure(HDLLinkCode.HDL_GATEWAY_REMOTE_NOT_RESPONSE);
                                }
                                //本地TCP,并是连接状态
                                else {
                                    notifyFailure(HDLLinkCode.HDL_TIMEOUT_ERROR);
                                }
                            } else {
                                notifyFailure(HDLLinkCode.HDL_TIMEOUT_ERROR);
                            }
                        }
                    }
                }
            }
        }, 0, sendAwaitTime, TimeUnit.MILLISECONDS);
        //initialdelay - 首次执行的延迟时间 0
        //delay - 一次执行终止和下一次执行开始之间的延迟
    }
    /**
     * 获取发送线程
     *
     * @return 返回获取到的线程
     */
    private ScheduledExecutorService getSendThread() {
        if (sendThread == null) {
            sendThread = ThreadToolUtils.getInstance().newScheduledThreadPool(1);
        }
        return sendThread;
    }
    /**
     * 发送失败
     */
    private void notifyFailure(HDLLinkCode hdlLinkCode) {
        //移除监听
        removeListener();
        if (sendThread != null) {
            sendThread.shutdownNow();
            sendThread = null;
        }
        if (listener != null && hdlLinkCode != null) {
            listener.onFailure(hdlLinkCode);
            listener = null;
        }
    }
    /**
     * 支持毫米类型
     *
     * @return 类型列表
     */
    public static List<String> getGatewayTypeList() {
        List<String> typeList = new ArrayList<>();
        typeList.add("sensor.mmv_sleep");//睡眠毫米波spk
        typeList.add("sensor.mmv_pose");//姿态毫米波spk
        return typeList;
    }
    private void notifySucceed(Object msg) {
        //移除监听
        removeListener();
        if (sendThread != null) {
            sendThread.shutdownNow();
            sendThread = null;
        }
        if (listener != null) {
            listener.onSucceed(msg);
            listener = null;
        }
    }
}