Java知识分享
热爱技术,分享技术

多线程设计模式之:Event Bus设计模式

相信每一位读者都有使用过消息中间件的经历,比如Apache ActiveMQ和Apache
Kafka等,某subscriber在消息中间件上注册了某个topic (主题),当有消息发送到了该topic.上之后,注册在该topic.上的所有subscriber都将会收到消息。

多线程设计模式之:Event Bus设计模式插图

消息中间件提供了系统之间的异步处理机制,比如在某电商网站上支付订单之后,会触发库存计算、物流调度计算,甚至是营销人员绩效计算、报表统计等的,诸如此类的操作耗费比订单购买商品本身更多的时间,加之这样的操作没有即时的时效性要求,用户在下单之后完全没有必要等待电商后端做完所有的操作才算成功,那么此时消息中间件便是一常好的解决方案,用户下单成功支付之后即可向用户返回购买成功的通知,然后提交各种消息至消息中间件,这样注册在消息中间件的其他系统就可以顺利地接收订单通知了,然后执行各自的业务逻辑。消息中间件主要用于解决进程之间消息异步处理的解决方案,在本章中,我们使用消息中间件的思想设计一个Java进程内部的消息中间件一Event Bus。

1. Event Bus设计

多线程设计模式之:Event Bus设计模式插图(1)

Bus接口对外提供了几种主要的使用方式,比如post方法用来发送Event,register
方法用来注册Event接收者( Subscriber)接受响应事件,EventBus 采用同步的方式推送Event, AsyncEventBus 采用异步的方式(Thread-Per Message)推送Event。

Registry注册表,主要用来记录对应的Subscriber以及受理消息的回调方法,回调方法我们用注解@Subscribe来标识。

Dispatcher主要用来将event 广播给注册表中监听了topic 的Subscriber。

2. Bus接口详解

package cn.hackcloud.concurrency.bus;

/**
* Bus 接口定义了EventBus 的所有使用方法
*/
public interface Bus {
/**
* ★将某个对象注册到Bus上,从此之后该类就成为Subscriber
*/
void register(Object subscriber);

/**
* ★将某个对象从Bus.上取消注册,取消注册之后就不会再接收到来自Bus的任何消息
*/
void unregister(Object subscriber);

/**
* 提交Event到默认的topic
*/
void post(Object event);

/**
* 提交Event到指定的topic
*/
void post(Object Event, String topic);

/**
* 关闭该bus
*/
void close();

/**
* 返回Bus的名称标识
*/
String getBusName();

}

Bus接口中定义了注册topic的方法和Event发送的方法,具体如下。

  • register (Object subscriber): 将某个对象实例注册给Event Bus。
  • unregister ( Object subscriber):取消对该对象实例的注册,会在Event Bus的注册表(Registry)中将其移除。
  • post ( Object event):提交Event到Event Bus中,如果未指定topic 则会将event广播给Event Bus默认的topic。
  • post (Object Event, String topic):提交Event的同时指定了topic。
  • close():销毁该Event Bus。
  • getBusName():返回该Event Bus的名称。
  • 注册对象给Event Bus的时候需要指定接收消息时的回调方法,我们采用注解的方式进行Event回调。
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface Subscribe {
String topic() default "default-topic";
}

2. 同步EventBus详解

同步EventBus是最核心的-一个类,它实现了Bus的所有功能,但是该类对Event的广播推送采用的是同步的方式,如果想要使用异步的方式进行推送,可使用EventBus的子类AsyncEventBus。

package cn.hackcloud.concurrency.bus;
public class EventBus implements Bus {
    //用于维护Subscriber的注册表
    private final Registry registry = new Registry();
    //Event Bus 的名字
    private String busName;
    //默认的Event Bus 的名字
    private final static String DEFAULT_BUS_NAME = "default";
    //默认的topic的名字.
    private final static String DEFAULT_TOPIC = "default-topic";
    //用于分发广播消息到各个subscriber的类
    private final Dispatcher dispatcher;

    public EventBus(String busName) {
        this(busName, null, Dispatcher.SEQ_ EXECUTOR_ SERVICE);
    }

    EventBus(String busName, EventExceptionHandler exceptionHandler, Executor executor) {
        this.busName = busName;
        this.dispatcher = Dispatcher.newDispatcher(exceptionHandler, executor);
    }

    public EventBus(EventExceptionHandler exceptionHandler) {
        this(DEFAULT_BUS_NAME, exceptionHandler, Dispatcher.SEQ_EXECUTOR_SERVICE);
    }


    //将注册Subscriber的动作直接委托给Registry
    @Override
    public void register(Object subscriber) {
        this.registry.bind(subscriber);
    }

    //接触注册同样委托给Registry
    @Override
    public void unregister(Object subscriber) {
        this.registry.unbind(subscriber);
    }

    //提交Event到默认的topic
    @Override
    public void post(Object event) {
        this.post(event, DEFAULT_TOPIC);
    }

    //提交Event到指定的topic,具体的动作是由Dispatcher来完成的
    @Override
    public void post(Object event, String topic) {
        this.dispatcher.dispatch(this, registry, event, topic);
    }

    //关闭销毁bus
    @Override
    public void close() {
        this.dispatcher.close();
    }

    //返回Bus的名称
    @Override
    public String getBusName() {
        return this.busName;


    }
  1. EventBus的构造除了名称之外,还需要有ExceptionHandler 和Executor,后两个主要是给Dispatcher 使用的。
  2. registry 和unregister都是通过Subscriber注册表来完成的。
  3. Event的提交则是由Dispatcher 来完成的。
  4. Executor并没有使用我们在第8章中开发的线程池,而是使用JDK中的Executor接口,自己开发的ThreadPool天生就是多线程并发执行任务的线程池,自带异步处理能力,但是无法做到同步任务处理,因此我们使用Executor可以任意扩展同步、异步的任务处理方式。

2. 异步EventBus详解

异步的EventBus比较简单,继承自同步Bus,然后将Thread-Per-Message用异步处理任务的Executor替换EventBus中的同步Executor 即可。

package cn.hackcloud.concurrency.bus;

import java.util.concurrent.ThreadPoolExecutor;

public class AsyncEventBus extends EventBus {
AsyncEventBus(String busName, EventExceptionHandler exceptionHandler,
ThreadPoolExecutor executor) {
super(busName, exceptionHandler, executor);
}

public AsyncEventBus(String busName, ThreadPoolExecutor executor) {
this(busName, null, executor);
}

public AsyncEventBus(ThreadPoolExecutor executor) {
this("default-async", null, executor);
}

public AsyncEventBus(EventExceptionHandler exceptionHandler, ThreadPoolExecutor executor) {
this("default-async", exceptionHandler, executor);

}
}

3. Subscriber注册表Registry详解

注册表维护了topic和subscriber之间的关系,当有Event被post之后Dispatcher 需要知道该消息应该发送给哪个Subscriber的实例和对应的方法,Subscriber对象没有任何特殊要求,就是普通的类不需要继承任何父类或者实现任何接口,注册表Registry的代码如下。

package cn.hackcloud.concurrency.bus;

import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;

class Registry {
//存储Subscriber集合和topic之间关系的map
private final ConcurrentHashMap<String, ConcurrentLinkedQueue<Subscriber>> subscriberContainer = new ConcurrentHashMap<>();

public void bind(Object subscriber) {
//获取Subscriber object 的方法集合然后进行绑定
List<Method> subscribeMethods = getSubscribeMethods(subscriber);
subscribeMethods.forEach(m -> tierSubscriber(subscriber, m));
}

public void unbind(Object subscriber) {
//unbind为了提高速度,只对Subscriber进行失效操作}
subscriberContainer.forEach((key, queue) ->
queue.forEach(s ->
{
if (s.getsubscribeObject() == subscriber) {
s.setDisable(true);
}
}));
}

public ConcurrentLinkedQueue<Subscriber> scanSubscriber(final String topic) {
return subscriberContainer.get(topic);
}

private void tierSubscriber(Object subscriber, Method method) {
final Subscribe subscribe = method.getDeclaredAnnotation(Subscribe.class);
String topic = subscribe.topic();
//当某topic没有Subscriber Queue的时候创建一个
subscriberContainer.computeIfAbsent(topic, key -> new ConcurrentLinkedQueue<>());
//创建一个Subscriber并且加入Subscriber列表中
subscriberContainer.get(topic).add(new Subscriber(subscriber, method));
}

private List<Method> getSubscribeMethods(Object subscriber) {
final List<Method> methods = new ArrayList<>();
Class<?> temp = subscriber.getClass();
//不断获取当前类和父类的所有@Subscribe方法
while (temp != null) {
//获取所有的方法
Method[] declaredMethods = temp.getDeclaredMethods();
//只有public方法&& 有一个入参&& 最重要的是被@Subscribe标记的方法才符合回调方法
Arrays.stream(declaredMethods).filter(m -> m.isAnnotationPresent(Subscribe.class) && m.getParameterCount() == 1
&& m.getModifiers() == Modifier.PUBLIC)
.forEach(methods::add);
temp = temp.getSuperclass();
}
return methods;
}
}

由于Registry是在Bus中使用的,不能暴露给外部,因此Registry被设计成了包可见的类,我们所设计的EventBus对Subscriber没有做任何限制,但是要接event的回调则需要将方法使用注解@Subscribe进行标记(可指定topic),同一个Subscriber的不同方法通过@Subscribe注解之后可接受来自两个不同的topic消息,代码如下所示:

public class SimpleObject {
/**
* subscribe方法,比如使用@Subscribe标记,并且是void类型且有一个参数
*/
@Subscribe(topic = "alex-topic")
public void test2(Integer x) {
}

@Subscribe(topic = "test-topic")
public void test3(Integer x) {

}
}

SimpleObject的实例被注册到了Event Bus之后,test2 和test3这两个方法将会被加入到注册表中,分别用来接受来自alex-topic 和test-topic的event。

4. Event广播Dispatcher详解

前文中已经说过,Dispatcher 的主要作用是将EventBus post 的event推送给每一个注册到topic上的subscriber.上,具体的推送其实就是执行被@Subscribe注解的方法。

package cn.hackcloud.concurrency.bus;

import java.lang.reflect.Method;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;

public class Dispatcher {
private final Executor executorService;
private final EventExceptionHandler exceptionHandler;
public static final Executor SEQ_EXECUTOR_SERVICE = SeqExecutorService.
INSTANCE;
public static final Executor PRE_THREAD_EXECUTOR_SERVICE = PreThreadExecutorService.INSTANCE;

private Dispatcher(Executor executorService, EventExceptionHandler exceptionHandler) {
this.executorService = executorService;
this.exceptionHandler = exceptionHandler;

}

public void dispatch(Bus bus, Registry registry, Object event, String topic) {
//根据topic获取所有的subscriber列表
ConcurrentLinkedQueue<Subscriber> subscribers = registry.
scanSubscriber(topic);
if (null == subscribers) {
if (exceptionHandler != null) {
exceptionHandler.handle(new IllegalArgumentException("The topic" + topic + " not bind yet"), new BaseEventContext(bus.getBusName(), null, event));

}
return;
}
//遍历所有的方法,并且通过反射的方式进行方法调用
subscribers.stream()
.filter(subscriber -> !subscriber.isDisable())
.filter(subscriber -> {
Method subscribeMethod = subscriber.getSubscribeMethod();
Class<?> aClass = subscribeMethod.getParameterTypes()[0];
return (aClass.isAssignableFrom(event.getClass()));
}).forEach(subscriber -> realInvokeSubscribe(subscriber, event, bus));
}

private void realInvokeSubscribe(Subscriber subscriber, object event, Bus bus) {
Method subscribeMethod = subscriber.getSubscribeMethod();
Object subscribeObject = subscriber.getSubscribeObject();
executorService.execute(() ->
{
try {
subscribeMethod, invoke(subscribeObject, event);
} catch (Exception e) {
if (null != exceptionHandler)
exceptionHandler.handle(e, new BaseEventContext(bus.getBusName(), subscriber, event));
}
});
}

public void close() {
if (executorService instanceof ExecutorService)
((ExecutorService) executorService).shutdown();
}

static Dispatcher newDispatcher(EventExceptionHandler exceptionHandler, Executor executor) {
return new Dispatcher(executor, exceptionHandler);
}

static Dispatcher seqDispatcher(EventExceptionHandler exceptionHandler) {
return new Dispatcher(SEQ_EXECUTOR_SERVICE, exceptionHandler);
}

static Dispatcher perThreadDispatcher(EventExceptionHandler exceptionHandler) {
return new Dispatcher(PRE_THREAD_EXECUTOR_SERVICE, exceptionHandler);
}

//顺序执行的ExecutorService
private static class SeqExecutorService implements Executor {

private final static SeqExecutorService INSTANCE = new SeqExecutorService();

@Override
public void execute(Runnable command) {
command.run();
}
}

//每个线程负责一次消息推送
private static class PreThreadExecutorService implements Executor {
private final static PreThreadExecutorService INSTANCE = new PreThreadExecutorService();

@Override
public void execute(Runnable command) {
new Thread(command).start();
}
}

private static class BaseEventContext implements EventContext {
private final String eventBusName;
private final Subscriber subscriber;
private final Object event;

private BaseEventContext(String eventBusName, Subscriber subscriber,
Object event) {
this.eventBusName = eventBusName;
this.subscriber = subscriber;
this.event = event;
}

@Override
public String getSource() {
return this.eventBusName;
}

@Override
public Object getSubscriber() {
return subscriber != null ? subscriber.getSubscribe0bject() : null;
}

@Override
public Method getSubscribe() {
return subscriber != null ? subscriber.getSubscribeMethod() : null;
}

@Override
public Object getEvent() {
return this.event;
}
}
}

5. 其他类接口设计

package cn.hackcloud.concurrency.bus;
import java.lang.reflect.Method;
public class Subscriber {
    private final Object subscribeObject;
    private final Method subscribeMethod;
    private boolean disable = false;

    public Subscriber(Object subscribeObject, Method subscribeMethod) {
        this.subscribeObject = subscribeObject;
        this.subscribeMethod = subscribeMethod;
    }

    public Object getSubscribeObject() {
        return subscribeObject;
    }

    public Method getSubscribeMethod() {
        return subscribeMethod;
    }

    public boolean isDisable() {
        return disable;
    }

    public void setDisable(boolean disable) {
        this.disable = disable;
    }
}

Subscriber类封装了对象实例和被@Subscribe标记的方法,也就是说一个对象实例有可能会被封装成若千个Subscriber。
(2 ) EventExceptionHandler接口EventBus会将方法的调用交给Runnable接口去执行,我们都知道Runnable接口不能抛出checked异常信息,并且在每一个subscribe方法中,也不允许将异常抛出从而影响EventBus对后续Subscriber进行消息推送,但是异常信息又不能被忽略掉,因此注册一个异常回调接口就可以知道在进行消息广播推送时都发生了什么。

public class EventExceptionHandler {
public void handle(Exception e, EventContext baseEventContext) {
}
}

(3) EventContext接口Event接口提供了获取消息源、消息体,以及该消息是由哪一个Subseriber的哪个subscribe方法所接受,主要用于消息推送出错时被回调接口EventExceptionHandler使用。

public interface EventContext {
    String getSource();

    Object getSubscriber();

    Method getSubscribe();

    Object getEvent();

}

6. 测试代码

public static void main(String[] args) {
asyncEventBus();
}

private static void syncEventBus() {
Bus bus = new EventBus("TestBus");
bus.register(new SimpleObject());
bus.register(new SimpleObject());
bus.post("Hello");
System.out.println("-----------");
bus.post("Hello2", "test");
}

private static void asyncEventBus() {
Bus bus = new AsyncEventBus("TestBus", (ThreadPoolExecutor) Executors.newFixedThreadPool(10));
bus.register(new SimpleObject());
bus.register(new SimpleObject());
bus.post("Hello");
System.out.println("-----------");
bus.post("Hello2", "test");
}
打赏
本站所有资源均来源于网络,仅供学习使用,请支持正版!Java技术开源 » 多线程设计模式之:Event Bus设计模式

评论 抢沙发

评论前必须登录!

 

觉得文章有用就打赏一下文章作者

非常感谢你的打赏,我们将继续提供更多优质内容,让我们一起创建更加美好的网络世界!

支付宝扫一扫

微信扫一扫

登录

找回密码

注册