package com.hdl.photovoltaic.ui.test; import android.content.Context; import android.os.Build; import androidx.annotation.RequiresApi; import com.alibaba.fastjson.JSON; import com.hdl.photovoltaic.other.HdlUniLogic; import com.hdl.photovoltaic.uni.HDLUniMP; import com.hdl.sdk.link.common.utils.LogUtils; import com.hdl.sdk.link.core.config.HDLLinkConfig; import org.eclipse.paho.android.service.MqttAndroidClient; import org.eclipse.paho.client.mqttv3.IMqttActionListener; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.IMqttToken; import org.eclipse.paho.client.mqttv3.MqttCallbackExtended; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; /** * Created by hxb on 2022/1/6. */ public class UniAppMqtt { private String clientId; /** * 获取标识mqtt的tag值 * * @return - */ public String getClientId() { return clientId; } final String tag = "uniToAndroid->>>MqttRecvClient--->"; /** * 当前 mqtt client */ MqttAndroidClient mClient; /** * instance */ private volatile static UniAppMqtt instance; /** * getInstance * * @return UniAppMqtt */ public static synchronized UniAppMqtt getInstance() { if (instance == null) { synchronized (HDLLinkConfig.class) { if (instance == null) { instance = new UniAppMqtt(); } } } return instance; } /** * 初始化当前mqtt * * @param context 上下文 * @param serverURI 地址,格式tcp://127.0.0.1:1883 * @param clientId 客户端Id * @param topics 主题 */ public void initMqtt(Context context, String serverURI, String clientId, String[] topics) throws MqttException { if (null != mClient) { return; } this.clientId = clientId; mClient = new MqttAndroidClient(context, serverURI, clientId); mClient.setCallback(new MqttCallbackExtended() { @Override public void connectComplete(boolean reconnect, String serverURI) { for (String topic : topics) { try { subscribe(topic); } catch (MqttException e) { LogUtils.e(tag, "订阅主题失败" + topic); } } } @Override public void connectionLost(Throwable cause) { LogUtils.e(tag, "测试mqtt断开"); } @Override public void messageArrived(String topic, MqttMessage message) { //Topic格式 PCToAndroid(或AndroidToPC)/#guid唯一值/uni_event if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.O) { sendMqttToUni(topic, new String(message.getPayload())); } } @Override public void deliveryComplete(IMqttDeliveryToken token) { // LogUtils.i(tag,"发布消息成功"); } }); //配置连接 MqttConnectOptions options = new MqttConnectOptions(); options.setCleanSession(true); //服务端允许匿名 // options.setUserName("username"); // options.setPassword("password".toCharArray()); options.setAutomaticReconnect(true); // //配置客户端离线或者断开连接的选项 // DisconnectedBufferOptions disconnectedBufferOptions = new DisconnectedBufferOptions(); // disconnectedBufferOptions.setBufferEnabled(true); // disconnectedBufferOptions.setBufferSize(5000); // disconnectedBufferOptions.setDeleteOldestMessages(true); // disconnectedBufferOptions.setPersistBuffer(true); // mClient.setBufferOpts(disconnectedBufferOptions); connect(options); } /** * 连接mqtt * * @param options - */ private void connect(MqttConnectOptions options) throws MqttException { try { mClient.connect(options, new IMqttActionListener() { @Override public void onSuccess(IMqttToken asyncActionToken) { LogUtils.i(tag, "mqtt连接成功"); } @Override public void onFailure(IMqttToken asyncActionToken, Throwable exception) { LogUtils.e(tag, "mqtt连接失败" + JSON.toJSONString(asyncActionToken.getException().getMessage()) + " exception=" + JSON.toJSONString(exception.getMessage())); } }); } catch (MqttException e) { LogUtils.e(tag, e.getMessage()); } } /** * 订阅主题 * * @param topic 主题 */ public void subscribe(String topic) throws MqttException { mClient.subscribe(topic, 2, null, new IMqttActionListener() { @Override public void onSuccess(IMqttToken asyncActionToken) { LogUtils.i(tag, "订阅主题" + topic); } @Override public void onFailure(IMqttToken asyncActionToken, Throwable exception) { LogUtils.e(tag, "订阅主题失败" + topic); } }); } /** * 发布数据出去 * * @param topic 主题 * @param payload 内容 */ public void publish(String topic, String payload) throws MqttException { //回复时,mqtt主题中的方向要变化,要做方向替换 topic = topic.replace("PCToAndroid", "AndroidToPC"); String finalTopic = topic; mClient.publish(topic, payload.getBytes(), 1, false, null, new IMqttActionListener() { @Override public void onSuccess(IMqttToken asyncActionToken) { LogUtils.e(tag, "回复主题--->" + finalTopic + "\r\n" + payload); } @Override public void onFailure(IMqttToken asyncActionToken, Throwable exception) { LogUtils.e(tag, "回复失败--->" + finalTopic + "\r\n" + exception.getMessage()); } }); } /** * 取消订阅 */ public void unsubscribe(String topic) throws MqttException { mClient.unsubscribe(topic, null, new IMqttActionListener() { @Override public void onSuccess(IMqttToken asyncActionToken) { } @Override public void onFailure(IMqttToken asyncActionToken, Throwable exception) { } }); } /** * 断开连接 */ public void disconnect() throws MqttException { mClient.disconnect(null, new IMqttActionListener() { @Override public void onSuccess(IMqttToken asyncActionToken) { LogUtils.e(tag, "断开连接"); } @Override public void onFailure(IMqttToken asyncActionToken, Throwable exception) { } }); } /** * uni通过qtt传过来的数据处理 * * @param topic 主题 * @param payload 负载内容 */ @RequiresApi(api = Build.VERSION_CODES.O) void sendMqttToUni(final String topic, String payload) { if (null == topic || null == payload) { return; } String[] split = topic.split("/"); if (split.length < 3) { return; } //表示大类 String request_module_topic = split[2]; MqttDCUniMPJSCallback mqttDCUniMPJSCallback = new MqttDCUniMPJSCallback(); mqttDCUniMPJSCallback.setMqttCallBack(new MqttCallBack() { @Override public void onResult(Object data) { if (null != data) try { //返回数据给uni publish(topic, data.toString()); } catch (Exception ignored) { } } }); // JSON.parse(payload) HdlUniLogic.getInstance().onOtherUniMPEventReceive(HDLUniMP.UNI_APP_ID, request_module_topic, payload, mqttDCUniMPJSCallback); } private boolean mIsUni = false; public void setUni(boolean isUni) { mIsUni = isUni; } public boolean getUni() { return mIsUni; } }