package com.hdl.sdk.link.socket; import android.text.TextUtils; import androidx.collection.ArrayMap; import com.hdl.sdk.link.BuildConfig; import com.hdl.sdk.link.HDLLinkLocalSdk; import com.hdl.sdk.link.bean.LoginRequest; import com.hdl.sdk.link.common.config.TopicConstant; import com.hdl.sdk.link.common.utils.IdUtils; import com.hdl.sdk.link.common.utils.LogUtils; import com.hdl.sdk.link.common.utils.ThreadToolUtils; import com.hdl.sdk.link.common.utils.gson.GsonConvert; import com.hdl.sdk.link.core.bean.LinkRequest; import com.hdl.sdk.link.core.bean.response.BaseLocalResponse; import com.hdl.sdk.link.core.callback.HDLLinkCallBack; import com.hdl.sdk.link.core.config.HDLLinkConfig; import com.hdl.sdk.link.core.connect.HDLConnectHelper; import com.hdl.sdk.link.socket.client.IClient; import com.hdl.sdk.link.socket.client.IHeartbeat; import com.hdl.sdk.link.socket.listener.SendListener; import com.hdl.sdk.link.socket.annotation.ConnectStatus; import java.util.ArrayList; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingDeque; /** * Created by Tong on 2021/9/26. * Tcp/Udp 启动器 */ public class TcpSocketBoot { private ExecutorService connectThread; private ExecutorService sendThread; private ExecutorService receiveThread; private ExecutorService heartbeatThread; private final IClient client; private IHeartbeat iHeartbeat; public void SetHeartbeat(IHeartbeat iHeartbeat){ this.iHeartbeat=iHeartbeat; } /** * 当前接收到数据的时间 */ private long time=System.currentTimeMillis(); /** * tcp是否已经连接 */ private boolean connected=false; public IClient getClient() { return client; } public boolean isConnected() { return connected; } private final BlockingQueue mMessageQueue = new LinkedBlockingDeque<>(); private final ArrayMap sendMap = new ArrayMap<>(); public TcpSocketBoot(IClient client) { TCP_SOCKET_BOOT_LIST.add(this); this.client = client; initConnectThread(); initReceiveThread(); initSendThread(); initHeartbeat(); } /** * 记录所有SocketBoot */ final static List TCP_SOCKET_BOOT_LIST = new ArrayList(); /** * 根据IP地址及端口获取当前socketBoot * @param ipAddress * @param port * @return */ public static TcpSocketBoot getByEndPoint(String ipAddress, int port){ if(ipAddress==null){ return null; } for(TcpSocketBoot tcpSocketBoot : TCP_SOCKET_BOOT_LIST){ if(ipAddress.equals(tcpSocketBoot.getClient().getOptions().getIp())&& tcpSocketBoot.getClient().getOptions().getPort()==port) { return tcpSocketBoot; } } return null; } /** * 连接tcp,内部维护掉,可以不用开放外部,根据这个业务我特性处理好 */ private synchronized void connect() { try { LogUtils.i("TCP连接:"+this.getClient().getOptions().getIp()); client.onConnectStatus(ConnectStatus.CONNECTING); // Thread.sleep(700); client.connect(); LogUtils.i("TCP连接成功:"+this.getClient().getOptions().getIp()); connected=true; tcpLogin("program", BuildConfig.SDKVersion); client.onConnectStatus(ConnectStatus.CONNECTED); }catch(Exception e) { LogUtils.e(e.getMessage()); } } /** * tcp登录方便获取状态 * * @param clientType 客户端类型 * 应用:app; * 调试软件:program; * 第三方:third_party; * 网关:gateway; * 其它:other * @param version 协议版本 */ public void tcpLogin(String clientType, String version) { final String msgId = IdUtils.getUUId(); String time = String.valueOf(System.currentTimeMillis()); final BaseLocalResponse data = new BaseLocalResponse<>(); data.setId(msgId); data.setTime_stamp(time); final LoginRequest request = new LoginRequest(); request.setVersion(version); request.setClientType(clientType); data.setObjects(request); String topic = String.format(TopicConstant.GATEWAY_LOGIN, HDLLinkConfig.getInstance().getGatewayId()); LinkRequest messageRequest = new LinkRequest(topic, GsonConvert.getGson().toJson(data),false); new HDLConnectHelper(this.getClient().getOptions().getIp(),messageRequest,true).send(); } /** * 初始化发送线程,只需要初始化一次 */ private void initSendThread() { if (sendThread == null) { sendThread = ThreadToolUtils.getInstance().newFixedThreadPool(1); sendThread.execute(new Runnable() { @Override public void run() { while (true) { try { if(connected==false){ Thread.sleep(100); continue; } SocketRequest socketRequest = mMessageQueue.take(); final String action = socketRequest.getAction(); try { client.sendMsg(socketRequest.getData()); if (!TextUtils.isEmpty(action)) { SendListener sendListener = sendMap.get(action); if (sendListener != null) { sendListener.onSucceed(); } } } catch (Exception e) { connected = false; LogUtils.e("发送失败:" + e.getMessage()); if (!TextUtils.isEmpty(action)) { SendListener sendListener = sendMap.get(action); if (sendListener != null) { sendListener.onError(); } } } } catch (Exception e) { LogUtils.e("发送失败1:" + e.getMessage()); } } } }); } } /** * 初始化接收线程,只需要初始化一次 */ public void initReceiveThread() { if (receiveThread == null) { receiveThread = ThreadToolUtils.getInstance().newFixedThreadPool(1); receiveThread.execute(new Runnable() { @Override public void run() { while (true) { try { if (connected) { //读取数据 client.onHandleResponse(); time= System.currentTimeMillis(); } else { try { Thread.sleep(1000); } catch (Exception ee) { } } } catch (Exception e) { connected = false; LogUtils.e("接收数据线程异常" + e.getMessage()); } } } }); } } /** * 初始化重新连接线程 */ private void initConnectThread() { if (connectThread == null) { connectThread = ThreadToolUtils.getInstance().newFixedThreadPool(1); //一定时间检测一次连接情况,没有连接就执行连接,连接统一由这里维护 connectThread.execute(new Runnable() { @Override public void run() { while (true) { try { if (!connected) { reconect(); } Thread.sleep(2*1000); } catch (Exception e) { LogUtils.e("定时连接线程异常:" + e.getMessage()); } } } }); } } /** * 初始化重新心跳线程 */ private void initHeartbeat() { if (heartbeatThread == null) { heartbeatThread = ThreadToolUtils.getInstance().newFixedThreadPool(1); heartbeatThread.execute(new Runnable() { @Override public void run() { while (true) { try { //5秒 if (connected && 5 * 1000 < (System.currentTimeMillis() - time)) { time = System.currentTimeMillis(); //心跳检测 if (iHeartbeat != null) iHeartbeat.heartbeat(); } Thread.sleep(10); } catch (Exception e) { LogUtils.e("定时心跳检测网关异常:" + e.getMessage()); } } } }); } } /** * 重新连接 */ private void reconect() { disconnect(); connect(); } /** * 发送无需回调 * @param msg 发送的数据 */ public void sendMsg(byte[] msg) { sendMsg(msg, null); } /** * @param listener 一般情况无需监听 */ public void sendMsg(byte[] msg, SendListener listener) { try { SocketRequest request = new SocketRequest(msg); if (listener != null && !TextUtils.isEmpty(request.getAction())) { sendMap.put(request.getAction(), listener); } mMessageQueue.put(request); } catch (Exception e) { e.printStackTrace(); } } /** * 关闭连接 */ private synchronized void disconnect() { try { client.disconnect(); //断开连接 client.onConnectStatus(ConnectStatus.DISCONNECT); } catch (Exception e) { } } // /** // * 断开全部的Link网关连接 // */ // public static void stopAllConnectLinkGateway() { // for (TcpSocketBoot data : TCP_SOCKET_BOOT_LIST) { // //断开指定的link网关连接 // stopConnectLinkGateway(data); // } // // } // // /** // * 断开指定的link网关连接 // * // * @param tcpSocketBoot tcp对象 // */ // public static void stopConnectLinkGateway(TcpSocketBoot tcpSocketBoot) { // synchronized (TCP_SOCKET_BOOT_LIST) { // try { // TCP_SOCKET_BOOT_LIST.remove(tcpSocketBoot); // tcpSocketBoot.client.disconnect(); // } catch (Exception ignored) { // } // } // } }