在 Spring 中提供了 ApplicationEventPublisher,用于发布事件,这在一些场景下十分有用,比如,当我创建一个货物,而另一个服务监听货物的创建并为其创建库存。所以在Spring MVC中很实用,我也常常用它来解耦,但是当我切换为 WebFlux 时,就尴尬了。因为 ApplicationEventPublisher 是同步操作,它并不支持响应式,即流操作。
所以,我在 WebFlux 中实现一个类似的发布订阅模式,以替代 ApplicationEventPublisher
实现 Event 服务
我们的事件服务分为3个部分
- 对发布对象的封装
- 设计通用的监听接口
- 提供类似 ApplicationEventPublisher 的对象,发布事件
ObjectEventNotifier
首先先对发布的对象进行封装,这步就算是 ApplicationEventPublisher,也是无法避免的,不然,你监听到一个对象,但不知道这对象是创建还是删除,这又该做什么处理呢。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| public class ObjectEventNotifier<T> implements ResolvableTypeProvider {
private final T object; private final Type type;
private ObjectEventNotifier(T object, Type type) { this.object = object; this.type = type; }
public static <T> ObjectEventNotifier<T> from(T object, Type type) { return new ObjectEventNotifier<>(object, type); }
@Override public ResolvableType getResolvableType() { return ResolvableType.forClassWithGenerics(getClass(), ResolvableType.forInstance(object)); }
public T getObject() { return this.object; }
public Type getType() { return this.type; }
public enum Type { Create, Update, Delete } }
|
上面的类,依据枚举 Type,对对象进行分类,以便在不同情况下,做不同处理
EventListener
接下来设计监听接口,一般而言,我们依据类的类型,选择不同的消费者,所以,简单的监听接口如下
1 2 3 4 5 6 7
| public interface EventListener<T> {
Class<T> target();
Publisher<Void> consume(ObjectEventNotifier<T> consumer);
}
|
但是这样,存在一个问题,当我们创建监听服务时,就会像下面那样
1 2 3 4 5 6 7 8 9 10 11 12 13
| @Bean public EventListener<Goods> listener() { return new EventListener<>() { @Override public Class<Goods> target() { return Goods.class; } @Override public Publisher<Void> consume(ObjectEventNotifier<Goods> consumer) { } }; }
|
这代码真的是一言难尽,所以,对于监听接口必须提供一个简化版的创建方式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| public interface EventListener<T> { static <T> EventListener<T> register(Class<T> target, Function<ObjectEventNotifier<T>, Publisher<Void>> fn) { return new EventListener<>() { @Override public Class<T> target() { return target; } @Override public Publisher<Void> consume(ObjectEventNotifier<T> consumer) { return fn.apply(consumer); } }; } }
@Bean public EventListener<Goods> listener() { return EventListener.register(Goods.class, notifier -> ...); }
|
这样就好多了
EventService
最后,实现我们最重要的事件服务,它分为两个部分
- 接受监听服务,并基于类的类型进行分类
- 接受发布对象,封装并依据类型,选择对应的监听服务消费
第一部分,由于我们使用Spring boot,所以,我们可以让spring给我们要的已经创好的监听服务,我们只需要将 EventListener 根据类型不同,放在不同的Map里存储
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| @Service public class EventService {
public Map<String, List<Object>> store = new ConcurrentHashMap<>();
@SuppressWarnings({"unchecked", "rawtypes"}) public EventService(List<EventListener> listeners) { listeners.forEach(listener -> { this.register(listener.target(), listener::consume); }); }
public <T> void register(Class<T> target, Function<ObjectEventNotifier<T>, Publisher<Void>> consumer) { String name = target.getName(); List<Object> consumers = store.getOrDefault(name, null); if (consumers == null) { consumers = new ArrayList<>(); store.put(name, consumers); } consumers.add(consumer); } }
|
第二部分,这是响应式中的难点,响应式是非阻塞的,所以消费者也需要返回一个非阻塞的结果,之前的接口返回是 Publisher<Void>
,就是因为这是个未处理完的结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| @Service public class EventService {
public Map<String, List<Object>> store = new ConcurrentHashMap<>();
@SuppressWarnings({"unchecked"}) public <T> Flux<Void> publish(ObjectEventNotifier.Type type, T target) { String name = target.getClass().getName(); List<Object> consumers = store.getOrDefault(name, null); if (consumers == null) { return Flux.empty(); } return Flux .fromIterable(consumers) .flatMap(obj -> { Function<ObjectEventNotifier<T>, Publisher<Void>> consumer = (Function<ObjectEventNotifier<T>, Publisher<Void>>) obj; Publisher<Void> apply = consumer.apply(ObjectEventNotifier.from(target, type)); return apply == null ? Mono.empty() : apply; }); }
public <T> Mono<T> publishCreate(T target) { return this.publish(ObjectEventNotifier.Type.Create, target) .then(Mono.just(target)); }
public <T> Mono<T> publishUpdate(T target) { return this.publish(ObjectEventNotifier.Type.Update, target) .then(Mono.just(target)); }
public <T> Mono<T> publishDelete(T target) { return this.publish(ObjectEventNotifier.Type.Delete, target) .then(Mono.just(target)); }
}
|
主要是publish部分的代码,下面的publishCreate等,是为了快捷操作
试试写好的 Event 服务
创建一个货物服务
1 2 3 4 5 6 7 8 9 10 11 12
| public class GoodsService {
@Resource private EventService eventService;
public Mono<Goods> createGoods() { Goods apple = Goods.of(1, "苹果"); return Mono.just(apple) .flatMap(eventService::publishCreate); }
}
|
为 Goods 创建监听服务(创了两个,毕竟这种监听一般都是一对多的嘛)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| @Configuration public class ServerConfiguration {
@Bean public GoodsService goodsService() { return new GoodsService(); }
@Bean public StockService stockService() { return new StockService(); }
@Bean public EventListener<Goods> listener1(StockService stockService) { return EventListener.register(Goods.class, stockService::initStockWithGoods1); }
@Bean public EventListener<Goods> listener2(StockService stockService) { return EventListener.register(Goods.class, stockService::initStockWithGoods2); }
}
@Slf4j public class StockService {
public Publisher<Void> initStockWithGoods1(ObjectEventNotifier<Goods> notifier) { ObjectEventNotifier.Type type = notifier.getType(); if (type == ObjectEventNotifier.Type.Create) { log.error("Create stock for: " + notifier.getObject().getName()); return Mono.just(Stock.of(1, 0)).then(); } return Mono.empty(); }
public Publisher<Void> initStockWithGoods2(ObjectEventNotifier<Goods> notifier) { log.error("Another listener for Goods: " + notifier.getObject().getName()); return Mono.empty(); }
}
|
创建一个测试用例,调用创建苹果
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| @SpringBootTest class GoodsServiceTest {
@Resource GoodsService goodsService;
@Test void createGoods() { goodsService.createGoods() .as(StepVerifier::create) .expectNextCount(1) .verifyComplete(); } }
|
运行,我们可以看到以下结果,说明在创建苹果的时候,调用了库存中的两个方法
1 2
| Create stock for: 苹果 Another listener for Goods: 苹果
|
源码在这里
源码: https://github.com/jiangtj-lab/ex-flux-event
Spring 对 ApplicationEventPublisher 支持响应式技术栈的进展可以看这个Issue: https://github.com/spring-projects/spring-framework/issues/21025