package com.hdl.sdk.link.common.event; import androidx.annotation.NonNull; import androidx.collection.ArrayMap; import com.hdl.sdk.link.common.utils.LockArrayMap; import com.hdl.sdk.link.common.utils.LockList; import com.hdl.sdk.link.common.utils.LogUtils; import com.hdl.sdk.link.common.utils.ThreadToolUtils; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; /** * Created by Tong on 2021/9/22. * 事件分发 */ public class EventDispatcher { private static final LockList ALL_TOPICS_EVENT = new LockList();//所有主题消息 // private static final ArrayMap> EVENT = new ArrayMap<>(); private static final LockArrayMap> EVENT = new LockArrayMap>(); //需异步回调的接口 private static final LockList ASYNC_EVENT = new LockList<>(); private static final ExecutorService ioThread = ThreadToolUtils.getInstance().newFixedThreadPool(5); private EventDispatcher() { } private static class SingletonInstance { private static final EventDispatcher INSTANCE = new EventDispatcher(); } public static EventDispatcher getInstance() { return SingletonInstance.INSTANCE; } public synchronized void register(String tag, EventListener listener) { try { if (!EVENT.containsKey(tag)) { EVENT.put(tag, new ArrayList<>()); } List events = EVENT.get(tag); if (events != null && !events.contains(listener)) { events.add(listener); LogUtils.i(String.format("增加订阅主题:%s,当前回调数量:%s", tag, events.size())); } } catch (Exception e) { LogUtils.e(e.getMessage()); } } // public synchronized void registerIo(String tag, EventListener listener) { // try { // if (!EVENT.containsKey(tag)) { // EVENT.put(tag, new ArrayList<>()); // } // List events = EVENT.get(tag); // if (events != null && !events.contains(listener)) { // events.add(listener); // } // } catch (Exception e) { // LogUtils.e(e.getMessage()); // } // } /** * 需要异步回调的接口 * @param tag * @param listener */ public synchronized void asyncRegister(String tag, EventListener listener) { register(tag, listener); if (!ASYNC_EVENT.contains(listener)) { ASYNC_EVENT.add(listener); } } public synchronized void remove(String tag) { ioThread.execute(new Runnable() { @Override public void run() { try { if (EVENT.containsKey(tag)) { // List list = EVENT.get(tag); // for (EventListener eventListener : list) { // TYPE.remove(eventListener); // } EVENT.remove(tag); } } catch (Exception e) { LogUtils.e(e.getMessage()); } } }); } public synchronized void remove(Object tag, EventListener listener) { ioThread.execute(new Runnable() { @Override public void run() { try { if (EVENT.containsKey(tag)) { List ev = EVENT.get(tag); if (ev != null && !ev.isEmpty()) { // TYPE.remove(listener); ev.remove(listener); ASYNC_EVENT.remove(listener); LogUtils.i(String.format("移除订阅主题:%s,当前回调数量:%s",tag,ev.size())); } } } catch (Exception e) { LogUtils.e(e.getMessage()); } } }); } /** * 两个主题是否匹配 * @param desString 字典中的主题 * @param sourceString 接收到的主题 * @return */ boolean isMatch(String desString,String sourceString) { String[] des = desString.split("/"); String[] source = sourceString.split("/"); if (des.length != source.length) { return false; } for (int i = 0; i < des.length; i++) { if (!(des[i].equals(source[i]) || des[i].equals("+"))) { if (i != 2) { //网关id不判断,可能是Oid,也可能是mac return false; } } } return true; } /** * 事件分发器,分发所有在接口列表中的事件 * @param topicTag * @param o */ public synchronized void post(String topicTag, @NonNull Object o) { try { // LogUtils.e("EventDispatcher", "===15"); for (String key : EVENT.keySet()) { if (!isMatch(key, topicTag)) { continue; } // LogUtils.e("EventDispatcher", "===16"); List list = EVENT.get(key); if (list != null && !list.isEmpty()) { for (EventListener listener : list) { if(listener==null){ continue; } // LogUtils.e("EventDispatcher", "===17"); //需要异步回调的 if(ASYNC_EVENT.contains(listener)) { // LogUtils.e("EventDispatcher", "===18"); runOnSubThread(listener,o); } else { // LogUtils.e("EventDispatcher", "===19"); runOnUIThread(listener,o); } } } } //所有主题的Listener通知 if (ALL_TOPICS_EVENT == null || ALL_TOPICS_EVENT.isEmpty()) { return; } // LogUtils.e("EventDispatcher", "===20"); //开发分发事件 for (EventListener listener : ALL_TOPICS_EVENT) { runOnUIThread(listener, o); } // LogUtils.e("EventDispatcher", "===21"); }catch (Exception e){ LogUtils.e(e.getMessage()); } } /** * 运行在子线程 * @param eventListener * @param o */ private void runOnSubThread(EventListener eventListener,Object o) { if (eventListener == null) { return; } ioThread.execute(new Runnable() { @Override public void run() { try { // LogUtils.e("EventDispatcher", "===22"); eventListener.onMessage(o); } catch (Exception e) { LogUtils.e("runOnSubThread数据异常", o + " " + e.getMessage()); } } }); } /** * 运行到主线程 * @param eventListener * @param o */ private void runOnUIThread(EventListener eventListener,Object o) { if (eventListener == null) { return; } ThreadToolUtils.getInstance().runOnUiThread(new Runnable() { @Override public void run() { try { // LogUtils.e("EventDispatcher", "===23"); eventListener.onMessage(o); } catch (Exception e) { LogUtils.e("runOnUIThread数据异常", o + " " + e.getMessage()); } } }); } /** * 文件发送通知 wxr 2022-03-08 15:38:59 */ public synchronized void filePost() { //TODO } /** * 注册所有主题消息的监听 * @param listener */ public synchronized void registerAllTopicsListener(EventListener listener) { try { if (ALL_TOPICS_EVENT != null && !ALL_TOPICS_EVENT.contains(listener)) { ALL_TOPICS_EVENT.add(listener); } // TYPE.put(listener, MAIN_TYPE); } catch (Exception e) { LogUtils.e(e.getMessage()); } } /** * 取消所有主题消息的监听 * @param listener */ public synchronized void removeAllTopicsListener(EventListener listener) { ioThread.execute(new Runnable() { @Override public void run() { try { if (ALL_TOPICS_EVENT != null && !ALL_TOPICS_EVENT.isEmpty()) { // TYPE.remove(listener); ALL_TOPICS_EVENT.remove(listener); } } catch (Exception e) { LogUtils.e(e.getMessage()); } } }); } public synchronized void clear() { ALL_TOPICS_EVENT.clear(); EVENT.clear(); ASYNC_EVENT.clear(); // TYPE.clear(); } public synchronized void release() { clear(); ioThread.shutdownNow(); } }