1、如何对业务透明实现消息持久化,常规的Spring的Event使用方式,业务无感知
2、延时消息、异常重试、分布式等等也需要
3、代码改造要少(只需要一个类)
1、定义自己的Event,这里使用继承ApplicationEvent方式
public Class MyEvent extends ApplicationEvent {
public MyEvent(String msg) {
super(msg);
}
}2、订阅事件,这里使用注解方式
@Component
public class MyEventListener {
@EventListener
public void myEvent(MyEvent u) {
log.info("MyEvent--->: {}", u);
}
}3、发布事件
@RestController
public class EventManager {
@Resource
private ApplicationEventPublisher publisher;
@GetMapping("/publishMyEvent")
public void publishMyEvent(String msg) {
MyEvent myEvent = new MyEvent(msg);
publisher.publishEvent(myEvent);
}
}上面代码是手敲的,没有运行,大概思路如此了。
实现ApplicationEvent事件持久化,首先我们要找到持久化的切入点。 我们可以想象有这些方式可以做持久化。
1、新建一个统一的监听,监听所有的事件,然后持久化到MQ之后,监听到MQ的消息之后,再以Event的方式发送出来
2、改写Spring的事件发布过程,发布事件直接投递到MQ,然后监听再转Event
上述方案1,在Alibaba的cloud bus里面是这么使用的。
本文实现的是方案2,只需要参考 SimpleApplicationEventMulticaster 自定义一个 applicationEventMulticaster bean就可以。
核心代码如下:
@Component("applicationEventMulticaster")
public class MyApplicationEventMulticaster extends SimpleApplicationEventMulticaster implements CommandLineRunner {
private Producer producer;
// rocketmq的topic
private String topic = "X_TOPIC_TEMP_TEST";
// 缓存Event映射
private Map<String, Class<? extends BaseEvent<?,?>>> events = new ConcurrentHashMap<>(250);
public Properties getPro() {
Properties properties = new Properties();
// 配置rocketmq
properties.setProperty(PropertyKeyConst.GROUP_ID, "xxx");
properties.setProperty(PropertyKeyConst.AccessKey, "xxx");
properties.setProperty(PropertyKeyConst.SecretKey, "xxx");
properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, "xxx");
properties.setProperty(PropertyKeyConst.ConsumeTimeout, "3");
properties.setProperty(PropertyKeyConst.InstanceName, UUID.randomUUID().toString());
properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
return properties;
}
/**
* spring容器初始化之后,再执行,不影响spring启动
*/
@Override
public void run(String[] args) {
Consumer consumer = ONSFactory.createConsumer(getPro());
producer = ONSFactory.createProducer(getPro());
producer.start();
// 注册消息监听
consumer.subscribe(topic, "*", new MessageListener() {
@Override
public Action consume(Message message, ConsumeContext consumeContext) {
String tag = message.getTag();
String body = new String(message.getBody());
BaseEvent<?,?> event = JSON.toObject(body, getEventClass(tag));
exchangeEvent(event, null);
return Action.CommitMessage;
}
});
consumer.start();
}
@Override
public void multicastEvent(@NonNull final ApplicationEvent event, @Nullable ResolvableType eventType) {
if(event instanceof BaseEvent) {
BaseEvent<?,?> baseEvent = (BaseEvent<?,?>)event;
// 判断持久化消息,执行mq投递
if(baseEvent.isStore()) {
Message msg = new Message();
msg.setKey(baseEvent.getUuid());
msg.setTag(event.getClass().getName());
msg.setMsgID(baseEvent.getUuid());
msg.setTopic(topic);
String msgString = JSON.toJSONString(event);
// 空消息事件也同样投递
if(Objects.nonNull(msgString)) {
msg.setBody(msgString.getBytes());
}
// 延时投递
if(baseEvent.getStartDeliverTime() > 0) {
msg.setStartDeliverTime(baseEvent.getStartDeliverTime());
}
producer.send(msg);
return;
}
}
exchangeEvent(event, eventType);
}
@SuppressWarnings("all")
private Class<? extends BaseEvent<?,?>> getEventClass(String event) {
if(!events.containsKey(event)) {
try {
// 强制类型转换,根据event的完整类名,转换对应的Class
Class<? extends BaseEvent<?,?>> clazz = Class.forName(event)
.asSubclass((Class<BaseEvent<?,?>>)(Class)BaseEvent.class);
events.put(event, clazz);
} catch (ClassNotFoundException e) {
log.error("get event class error: {}", e.getMessage());
}
}
return events.get(event);
}
}基础的事件Event
public static abstract class BaseEvent<S, R> extends ApplicationEvent {
private boolean store = false;
/** 延时投递,绝对时间,单位毫秒 **/
private long startDeliverTime;
/**
* 事件返回结果
*/
private R result;
/**
* 事件唯一ID
*/
private String uuid = UUID.randomUUID().toString();
@JsonCreator
public BaseEvent(@JsonProperty("source") S source) {
super(source);
}
public String getUuid() {
return uuid;
}
public void setUuid(String uuid) {
this.uuid = uuid;
}
public R getResult() {
return result;
}
public void setResult(R o) {
this.result = o;
}
public void setSource(S data) {
this.source = data;
}
@SuppressWarnings("unchecked")
@Override
public S getSource() {
if (super.getSource() == null) {
return null;
}
return (S) super.getSource();
}
public boolean isStore() {
return store;
}
public void setStore(boolean store) {
this.store = store;
}
public long getStartDeliverTime() {
return startDeliverTime;
}
public void setStartDeliverTime(long startDeliverTime) {
this.startDeliverTime = startDeliverTime;
}
}