hxb
2021-12-21 93f13e7b2e046c6d86d10e7abc2ecaa4c4adabc3
HDLSDK/hdl-socket/src/main/java/com/hdl/sdk/socket/SocketBoot.java
@@ -5,6 +5,7 @@
import androidx.collection.ArrayMap;
import com.hdl.sdk.common.utils.LogUtils;
import com.hdl.sdk.common.utils.ThreadToolUtils;
import com.hdl.sdk.socket.annotation.ConnectStatus;
import com.hdl.sdk.socket.client.IClient;
@@ -26,21 +27,19 @@
public class SocketBoot {
    private ExecutorService connectThread;
    private ScheduledExecutorService heartbeatThread;
    private ExecutorService sendThread;
    private ExecutorService receiveThread;
    private ScheduledExecutorService delayThread;
    private final IClient client;
    /**
     * socket是否在运行
     * tcp是否已经连接
     */
    private final AtomicBoolean isRun = new AtomicBoolean(false);
    private boolean connected=false;
    private final AtomicBoolean isOpenRetry = new AtomicBoolean(false);
    private final AtomicInteger resendCount = new AtomicInteger(0);
    public IClient getClient() {
        return client;
    }
    private final BlockingQueue<SocketRequest> mMessageQueue = new LinkedBlockingDeque<>();
@@ -48,87 +47,42 @@
    public SocketBoot(IClient client) {
        this.client = client;
        initConnectThread();
        initReceiveThread();
        initSendThread();
    }
    public ScheduledExecutorService getHeartBeat() {
        if (heartbeatThread == null) {
            heartbeatThread = ThreadToolUtils.getInstance().newScheduledThreadPool(1);
        }
        return heartbeatThread;
    }
    public void connect() {
        resendCount.set(0);
        resetConnect(true);
        isOpenRetry.set(true);
    }
    public synchronized void resetConnect(boolean isFirst) {
        final int maxRetry = client.getOptions().getMaxRetry();
        if (maxRetry == 0 && resendCount.get() > 0 ||
                (maxRetry > 0 && maxRetry + 1 < resendCount.get())) {
            Log.d("====", "===重连次数达到最大==");
            return;
        }
        if (!client.isConnect()) {
            if (connectThread == null) {
                connectThread = ThreadToolUtils.getInstance().newFixedThreadPool(1);
            }
            connectThread.execute(new Runnable() {
                @Override
                public void run() {
                    client.onConnectStatus(ConnectStatus.CONNECTING);
                    if (!isFirst) {
                        try {
                            resendCount.set(resendCount.get() + 1);
                            Thread.sleep(300L);
                            Log.d("====", "==重连第" + resendCount + "次==");
                        } catch (Exception ignored) {
                        }
                    }
                    try {
                        client.connect();
                        isRun.set(true);
                        if (client.isConnect()) {
                            Log.d("====", "====连接成功====");
                            startHeartbeat();
                            initSendThread();
                            initReceiveThread();
                            client.onConnectStatus(ConnectStatus.CONNECTED);
                            resendCount.set(0);
                        } else {
                            throw new ConnectException();
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                        Log.d("====", "===连接失败===" + e);
                        //再判断一下有没有连接
                        if (!client.isConnect()) {
                            isRun.set(false);
                            client.onConnectStatus(ConnectStatus.DISCONNECT);
                            stopHeartbeat();
                            disconnectError();
                        }
                    }
                }
            });
    /**
     * 连接tcp,内部维护掉,可以不用开放外部,根据这个业务我特性处理好
     */
    private synchronized void connect() {
        try {
            LogUtils.i("TCP连接");
            client.onConnectStatus(ConnectStatus.CONNECTING);
            Thread.sleep(700);
            client.connect();
            connected=true;
            client.onConnectStatus(ConnectStatus.CONNECTED);
        }catch(Exception e) {
            LogUtils.e(e.getMessage());
        }
    }
    public void initSendThread() {
    /**
     * 初始化发送线程,只需要初始化一次
     */
    private void initSendThread() {
        if (sendThread == null) {
            sendThread = ThreadToolUtils.getInstance().newFixedThreadPool(1);
        }
        sendThread.execute(new Runnable() {
            @Override
            public void run() {
                while (isRun.get()) {
                    if (client.isConnect()) {
                        Log.d("=====", "==发送数据==");
            sendThread.execute(new Runnable() {
                @Override
                public void run() {
                    while (true) {
                        try {
                            SocketRequest socketRequest = mMessageQueue.take();
                            final String sendStr = new String(socketRequest.getData(), 0, socketRequest.getData().length);
                            LogUtils.i("发送数据:" + sendStr);
                            final String action = socketRequest.getAction();
                            try {
                                client.sendMsg(socketRequest.getData());
@@ -139,189 +93,121 @@
                                    }
                                }
                            } catch (Exception e) {
                                connected = false;
                                LogUtils.e("发送失败:" + e.getMessage());
                                if (!TextUtils.isEmpty(action)) {
                                    SendListener sendListener = sendMap.get(action);
                                    if (sendListener != null) {
                                        sendListener.onError();
                                    }
                                }
                                stopHeartbeat();
                                if (sendThread != null) {
                                    sendThread.shutdownNow();
                                }
                                if (isRun.get()) {
                                    disconnectError();
                                }
                            }
                        } catch (InterruptedException ignored) {
                        } catch (Exception e) {
                        }
                    }
                }
                Log.d("=====", "==发送线程关闭==");
            }
        });
            });
        }
    }
    /**
     * 初始化接收线程,只需要初始化一次
     */
    public void initReceiveThread() {
        if (receiveThread == null) {
            receiveThread = ThreadToolUtils.getInstance().newFixedThreadPool(1);
        }
        receiveThread.execute(new Runnable() {
            @Override
            public void run() {
                while (isRun.get()) {
                    if (client.isConnect()) {
            receiveThread.execute(new Runnable() {
                @Override
                public void run() {
                    while (true) {
                        try {
                            //读取数据
                            client.onHandleResponse();
                            if (connected) {
                                //读取数据
                                client.onHandleResponse();
                            } else {
                                try {
                                    Thread.sleep(1000);
                                } catch (Exception ee) {
                                }
                            }
                        } catch (Exception e) {
                            e.printStackTrace();
                            Log.d("====", "断开连接" + e.getMessage());
                            disconnectError();
                            connected = false;
                            LogUtils.e("接收数据线程异常" + e.getMessage());
                        }
                    }
                }
            }
        });
            });
        }
    }
    /**
     * 初始化重新连接线程
     */
    private void initConnectThread() {
        if (connectThread == null) {
            connectThread = ThreadToolUtils.getInstance().newFixedThreadPool(1);
            //一定时间检测一次连接情况,没有连接就执行连接,连接统一由这里维护
            connectThread.execute(new Runnable() {
                @Override
                public void run() {
                    while (true) {
                        try {
                            if (!connected) {
                                reconect();
                            }
                            Thread.sleep(10*1000);
                        } catch (Exception e) {
    public void startHeartbeat() {
        if (heartbeatThread != null) {
            heartbeatThread.shutdownNow();
            heartbeatThread = null;
        }
        if (client.getOptions() == null || client.getOptions().getHeartbeatTimeInterval() <= 0 || !client.getOptions().isEnabledHeartbeat()) {
            return;
        }
        getHeartBeat().scheduleWithFixedDelay(new Runnable() {
            @Override
            public void run() {
                if (isRun.get()) {
                    Log.d("====", "===发送心跳包===");
                    if (client.getOptions() != null) {
                        final byte[] heartBeat = client.getOptions().getHeartbeatData();
                        if (heartBeat != null) {
                            sendMsg(heartBeat, false, null);
                        } else {
                            sendMsg(new byte[0], false, null);
                        }
                    }
                }
            }
        }, client.getOptions().getHeartbeatTimeInterval(), client.getOptions().getHeartbeatTimeInterval(), TimeUnit.MILLISECONDS);
    }
    public void stopHeartbeat() {
        if (heartbeatThread != null) {
            heartbeatThread.shutdownNow();
            heartbeatThread = null;
            });
        }
    }
    /**
     * 重新连接
     */
    private void reconect() {
        disconnect();
        connect();
    }
    /**
     * 发送无需回调
     * @param msg 发送的数据
     */
    public void sendMsg(byte[] msg) {
        sendMsg(msg, true, null);
        sendMsg(msg, null);
    }
    public void sendMsg(byte[] msg, SendListener listener) {
        sendMsg(msg, true, listener);
    }
    /**
     * @param listener 一般情况无需监听
     */
    private void sendMsg(byte[] msg, boolean isRefreshRetry, SendListener listener) {
        if (isRefreshRetry) {
            //重置连接次数
            resendCount.set(0);
        }
    public void sendMsg(byte[] msg, SendListener listener) {
        try {
            SocketRequest request = new SocketRequest(msg);
            if (listener != null && !TextUtils.isEmpty(request.getAction())) {
                sendMap.put(request.getAction(), listener);
            }
            mMessageQueue.put(request);
        } catch (InterruptedException ignored) {
        } catch (Exception e) {
        }
        if (!client.isConnect()) {
            resetConnect(false);
        }
    }
    /**
     * 发生错误,重连
     * 关闭连接
     */
    private void disconnectError() {
        disconnect();
        isRun.set(false);
        if (isOpenRetry.get()) {
            if (delayThread != null) {
                delayThread.shutdownNow();
            }
            delayThread = ThreadToolUtils.getInstance().newScheduledThreadPool(1);
            delayThread.schedule(new Runnable() {
                @Override
                public void run() {
                    if (!client.isConnect() && isOpenRetry.get()) {
                        resetConnect(false);
                    }
                }
            }, 3000, TimeUnit.MILLISECONDS);
        }
    }
    private synchronized void disconnect() {
        if (client.isConnect()) {
        try {
            client.disconnect();
            //断开连接
            client.onConnectStatus(ConnectStatus.DISCONNECT);
        }
    }
        } catch (Exception e) {
    public synchronized void close() {
        isOpenRetry.set(false);
        isRun.set(false);
        if (connectThread != null) {
            connectThread.shutdownNow();
            connectThread = null;
        }
        if (heartbeatThread != null) {
            heartbeatThread.shutdownNow();
            heartbeatThread = null;
        }
        if (sendThread != null) {
            sendThread.shutdownNow();
            sendThread = null;
        }
        if (receiveThread != null) {
            receiveThread.shutdownNow();
            receiveThread = null;
        }
        sendMap.clear();
        client.disconnect();
        mMessageQueue.clear();
    }
    public synchronized void release() {
        close();
        if (client != null && client.getOptions() != null) {
            client.getOptions().clearConnectStatusListener();
        }
    }
    public boolean isConnect() {
        return client.isConnect();
    }
}