package com.hdl.sdk.socket; import android.text.TextUtils; import android.util.Log; import androidx.collection.ArrayMap; import com.hdl.sdk.common.utils.LogUtils; import com.hdl.sdk.common.utils.ThreadToolUtils; import com.hdl.sdk.socket.annotation.ConnectStatus; import com.hdl.sdk.socket.client.IClient; import com.hdl.sdk.socket.listener.SendListener; import java.net.ConnectException; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; /** * Created by Tong on 2021/9/26. * Tcp/Udp 启动器 */ public class SocketBoot { private ExecutorService connectThread; private ExecutorService sendThread; private ExecutorService receiveThread; private final IClient client; /** * tcp是否已经连接 */ private boolean connected=false; public IClient getClient() { return client; } private final BlockingQueue mMessageQueue = new LinkedBlockingDeque<>(); private final ArrayMap sendMap = new ArrayMap<>(); public SocketBoot(IClient client) { this.client = client; initConnectThread(); initReceiveThread(); initSendThread(); } /** * 连接tcp,内部维护掉,可以不用开放外部,根据这个业务我特性处理好 */ private synchronized void connect() { try { LogUtils.i("TCP连接"); client.onConnectStatus(ConnectStatus.CONNECTING); Thread.sleep(700); client.connect(); connected=true; client.onConnectStatus(ConnectStatus.CONNECTED); }catch(Exception e) { LogUtils.e(e.getMessage()); } } /** * 初始化发送线程,只需要初始化一次 */ private void initSendThread() { if (sendThread == null) { sendThread = ThreadToolUtils.getInstance().newFixedThreadPool(1); sendThread.execute(new Runnable() { @Override public void run() { while (true) { try { SocketRequest socketRequest = mMessageQueue.take(); final String sendStr = new String(socketRequest.getData(), 0, socketRequest.getData().length); LogUtils.i("发送数据:" + sendStr); final String action = socketRequest.getAction(); try { client.sendMsg(socketRequest.getData()); if (!TextUtils.isEmpty(action)) { SendListener sendListener = sendMap.get(action); if (sendListener != null) { sendListener.onSucceed(); } } } catch (Exception e) { connected = false; LogUtils.e("发送失败:" + e.getMessage()); if (!TextUtils.isEmpty(action)) { SendListener sendListener = sendMap.get(action); if (sendListener != null) { sendListener.onError(); } } } } catch (Exception e) { } } } }); } } /** * 初始化接收线程,只需要初始化一次 */ public void initReceiveThread() { if (receiveThread == null) { receiveThread = ThreadToolUtils.getInstance().newFixedThreadPool(1); receiveThread.execute(new Runnable() { @Override public void run() { while (true) { try { if (connected) { //读取数据 client.onHandleResponse(); } else { try { Thread.sleep(1000); } catch (Exception ee) { } } } catch (Exception e) { connected = false; LogUtils.e("接收数据线程异常" + e.getMessage()); } } } }); } } /** * 初始化重新连接线程 */ private void initConnectThread() { if (connectThread == null) { connectThread = ThreadToolUtils.getInstance().newFixedThreadPool(1); //一定时间检测一次连接情况,没有连接就执行连接,连接统一由这里维护 connectThread.execute(new Runnable() { @Override public void run() { while (true) { try { if (!connected) { reconect(); } Thread.sleep(10*1000); } catch (Exception e) { } } } }); } } /** * 重新连接 */ private void reconect() { disconnect(); connect(); } /** * 发送无需回调 * @param msg 发送的数据 */ public void sendMsg(byte[] msg) { sendMsg(msg, null); } /** * @param listener 一般情况无需监听 */ public void sendMsg(byte[] msg, SendListener listener) { try { SocketRequest request = new SocketRequest(msg); if (listener != null && !TextUtils.isEmpty(request.getAction())) { sendMap.put(request.getAction(), listener); } mMessageQueue.put(request); } catch (Exception e) { } } /** * 关闭连接 */ private synchronized void disconnect() { try { client.disconnect(); //断开连接 client.onConnectStatus(ConnectStatus.DISCONNECT); } catch (Exception e) { } } }