徒手撸一个简易消息总线EventBus

燃着的半支烟 2020年04月12日 74次浏览

平时在写Android或者Java进程内应用时,发送消息一般会选择Guava的EventBus,这样可以做到代码松耦合,业务解耦。今天半支烟简单分析下消息总线原理,然后写个简易的消息总线。说白了,消息总线其实就是个观察者模式的典型应用。

EventBus的一般使用

流程图和大致原理

消息发布者(或者叫被观察者),发送了某个事件消息,EventBus会根据Subcriber注解以及Event事件,找出合适的订阅者(或者叫观察者),然后通过Java反射执行订阅者的响应方法。

1797490-88b4a064b9723ef6

使用步骤

一般我们在项目用使用分以下几步,具体使用小伙伴们可自行谷歌:

  1. 定义一个事件类

    public class AnyEventType {
         public AnyEventType(){}
     }
    
  2. 注册观察者

    EventBus.getDefault().register(this);
    
  3. 订阅观察者的方法

    @Subscribe
    public void onEvent(AnyEventType event) {/* Do something */};
    
  4. 被观察者发送消息

    EventBus.getDefault().post(event);
    

分析EventBus&设计核心类

上面介绍了EventBus的基本用法,下面开始分析EventBus的核心类。

从上面的使用方式来看,EventBus共有2个核心方法,register() 和 post() 。弄懂了他们的使用方法,也就基本明白了:register()是注册订阅者,post()是发布者发送消息。现在应该明白了EventBus的核心了吧。下面用2张图来描述下这2个核心方法:

​ 从图中可以看出,最关键的数据结构是Observer注册表,记录了消息类型和可接受函数的对应关系。当调用 register() 函数注册观察者的时候,EventBus 通过解析 @Subscribe 注解,生成 Observer 注册表。当调用 post() 函数发送消息的时候,EventBus 通过注册表找到相应的可接收消息的函数,然后通过 Java 的反射语法来动态地创建对象、执行函数。对于同步阻塞模式,EventBus 在一个线程内依次执行相应的函数。对于异步非阻塞模式,EventBus 通过一个线程池来执行相应的函数。

​ 弄懂了原理,就开始代码实现了。实现包括 5 个类:EventBus、AsyncEventBus、Subscribe、ObserverAction、ObserverRegistry。

代码实现

开始实现5个核心类:EventBus、AsyncEventBus、Subcribe、ObserverAction、ObserverRegistry。

1.Subscribe

Subscribe是一个注解,用来标识观察者的哪些方法可以接受消息。

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface Subscribe {}
2.ObserverAction

ObserverAction类用来执行观察者的方法,@Subscribe注解的方法,会在ObserverRegistry类中进行统一注册,target表示观察者类,method表示被执行的方法。

public class ObserverAction {

    private Object target;
    private Method method;

    public ObserverAction(Object target, Method method) {
        this.target = target;
        this.method = method;
        this.method.setAccessible(true);
    }

    public void execute(Object event) {
        try {
            method.invoke(target, event);
        } catch (IllegalAccessException | InvocationTargetException e) {
            e.printStackTrace();
        }
    }
}
3.ObserverRegistry

ObserverRegistry类用来注册观察者,使用了大量Java反射,核心逻辑包括:

1、将@Subscribe注解的事件和方法,统一注册到map中

2、提供根据事件找出对应的类方法

避免并发冲突,所以使用了ConcurrentHashMap 和 CopyOnWriteArraySet。注册单个观察者的时候,也是先取出观察者的事件和方法,然后再统一放到全局并发容器中。这样逻辑清晰,也避免了并发冲突。

public class ObserverRegistry {

    private ConcurrentHashMap<Class<?>, CopyOnWriteArraySet<ObserverAction>> registry = new ConcurrentHashMap<>();

    /**
     * 注册
     */
    public void register(Object observer) {
        //遍历带有注解的方法,将事件和对应的多个处理方法,存储到map中
        Map<Class<?>, Collection<ObserverAction>> observerActions = findAllObserverActions(observer);

        //将获取到的单个观察者的可执行方法,放到如全局的map中,使用并发类
        for (Map.Entry<Class<?>, Collection<ObserverAction>> entry : observerActions.entrySet()) {
            Class<?> eventType = entry.getKey();
            Collection<ObserverAction> eventActions = entry.getValue();
            CopyOnWriteArraySet<ObserverAction> registeredEventActions = registry.get(eventType);
            if (registeredEventActions == null) {
                registry.putIfAbsent(eventType, new CopyOnWriteArraySet<>());
                registeredEventActions = registry.get(eventType);
            }
            registeredEventActions.addAll(eventActions);
        }
    }

    /**
     * 遍历带有注解的方法,将事件和对应的多个处理方法,存储到map中
     *
     * @param observer
     * @return
     */
    private Map<Class<?>, Collection<ObserverAction>> findAllObserverActions(Object observer) {
        Class<?> clazz = observer.getClass();
        List<Method> methodList = getAnnotateMethods(clazz);

        Map<Class<?>, Collection<ObserverAction>> observerActions = new HashMap<>();
        for (Method method : methodList) {
            Class<?>[] parameterTypes = method.getParameterTypes();
            Class<?> eventType = parameterTypes[0];
            if (!observerActions.containsKey(eventType)) {
                observerActions.put(eventType, new ArrayList<>());
            }
            observerActions.get(eventType).add(new ObserverAction(observer, method));
        }
        return observerActions;
    }

    /**
     * 获取观察者中含有注解的方法
     *
     * @param clazz
     * @return
     */
    private List<Method> getAnnotateMethods(Class<?> clazz) {
        List<Method> annotateMethods = new ArrayList<>();
        for (Method method : clazz.getDeclaredMethods()) {
            if (method.isAnnotationPresent(Subscribe.class)) {
//                Class<?>[] parameterTypes = method.getParameterTypes();
                annotateMethods.add(method);
            }
        }

        return annotateMethods;
    }

    /**
     * 根据事件获取合适的观察者方法
     *
     * @param event
     * @return
     */
    public List<ObserverAction> getMatchedObserverActions(Object event) {
        List<ObserverAction> matchedObservers = new ArrayList<>();
        Class<?> postedEventType = event.getClass();
        for (Map.Entry<Class<?>, CopyOnWriteArraySet<ObserverAction>> entry : registry.entrySet()) {
            Class<?> eventType = entry.getKey();
            Collection<ObserverAction> eventActions = entry.getValue();
            //判断有入参的事件,是否是容器里的时间的子类,可以说是一个类是否可以被强制转换为另外一个实例对象
            //父类 和 子类,判断都会为true
            if (eventType.isAssignableFrom(postedEventType)) {
                matchedObservers.addAll(eventActions);
            }
        }
        return matchedObservers;
    }

}
4.EventBus

消息总线入口方法,提供了register注册观察者,也提供了post让被观察者发送消息。

public class EventBus {

    private ObserverRegistry registry = new ObserverRegistry();

    private Executor executor;

    public EventBus() {

    }

    public EventBus(Executor executor) {
        this.executor = executor;
    }

    /**
     * 注册观察者
     */
    public void register(Object observer) {
        registry.register(observer);
    }

    /**
     * 发布者-发送消息
     */
    public void post(Object event) {
        List<ObserverAction> observerActions = registry.getMatchedObserverActions(event);
        for (ObserverAction observerAction : observerActions) {
            if (executor == null) {
                observerAction.execute(event);
            } else {
                executor.execute(() -> {
                    observerAction.execute(event);
                });
            }
        }
    }
}
5.AsyncEventBus

异步消息总线入口方法

public class AsyncEventBus extends EventBus {
    public AsyncEventBus(Executor executor) {
        super(executor);
    }
}
整理代码

完整代码

https://gitee.com/yclxiao/specialty/blob/master/javacore/src/main/java/com/ycl/blog/designmode/eventbus/EventBus.java

总结

总体来说,EventBus是基于观察者模式实现的消息总线,实现代码和业务的解耦。

框架的作用是:隐藏实现细节、降低开发难度、实现代码复用,解耦业务代码,让开发人员聚焦业务开发。