Skip to content

Instantly share code, notes, and snippets.

@scofier
Last active October 26, 2024 01:38
Show Gist options
  • Select an option

  • Save scofier/c977dceca5af5febcbaf9a014984b0c3 to your computer and use it in GitHub Desktop.

Select an option

Save scofier/c977dceca5af5febcbaf9a014984b0c3 to your computer and use it in GitHub Desktop.
实现ApplicationEvent事件持久化(使用阿里云RocketMQ)

需求

1、如何对业务透明实现消息持久化,常规的Spring的Event使用方式,业务无感知
2、延时消息、异常重试、分布式等等也需要
3、代码改造要少(只需要一个类)

Spring事件的常规使用

1、定义自己的Event,这里使用继承ApplicationEvent方式

public Class MyEvent extends ApplicationEvent {
   public MyEvent(String msg) {
       super(msg);
   }
}

2、订阅事件,这里使用注解方式

@Component
public class MyEventListener {

    @EventListener
    public void myEvent(MyEvent u) {
        log.info("MyEvent--->: {}", u);
    }

}

3、发布事件

@RestController
public class EventManager {

    @Resource
    private ApplicationEventPublisher publisher;
    
    @GetMapping("/publishMyEvent")
    public void publishMyEvent(String msg) {
       MyEvent myEvent = new MyEvent(msg);
       publisher.publishEvent(myEvent);
    }
}

上面代码是手敲的,没有运行,大概思路如此了。

思考如何实现ApplicationEvent持久化

实现ApplicationEvent事件持久化,首先我们要找到持久化的切入点。 我们可以想象有这些方式可以做持久化。

1、新建一个统一的监听,监听所有的事件,然后持久化到MQ之后,监听到MQ的消息之后,再以Event的方式发送出来
2、改写Spring的事件发布过程,发布事件直接投递到MQ,然后监听再转Event

上述方案1,在Alibaba的cloud bus里面是这么使用的。

本文实现的是方案2,只需要参考 SimpleApplicationEventMulticaster 自定义一个 applicationEventMulticaster bean就可以。

核心代码

核心代码如下:

@Component("applicationEventMulticaster")
public class MyApplicationEventMulticaster extends SimpleApplicationEventMulticaster implements CommandLineRunner {

   	private Producer producer;
   	// rocketmq的topic
	private String topic = "X_TOPIC_TEMP_TEST";
   	// 缓存Event映射
	private Map<String, Class<? extends BaseEvent<?,?>>> events = new ConcurrentHashMap<>(250);

	public Properties getPro() {
		Properties properties = new Properties();
      		// 配置rocketmq
		properties.setProperty(PropertyKeyConst.GROUP_ID, "xxx");
		properties.setProperty(PropertyKeyConst.AccessKey, "xxx");
		properties.setProperty(PropertyKeyConst.SecretKey, "xxx");
		properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, "xxx");
		properties.setProperty(PropertyKeyConst.ConsumeTimeout, "3");
		properties.setProperty(PropertyKeyConst.InstanceName, UUID.randomUUID().toString());
		properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
		return properties;
	}

	/**
	 * spring容器初始化之后,再执行,不影响spring启动
	 */
	@Override
	public void run(String[] args) {
		Consumer consumer = ONSFactory.createConsumer(getPro());
		producer = ONSFactory.createProducer(getPro());
		producer.start();
		// 注册消息监听
		consumer.subscribe(topic, "*", new MessageListener() {
			@Override
			public Action consume(Message message, ConsumeContext consumeContext) {
				String tag = message.getTag();
				String body = new String(message.getBody());
				BaseEvent<?,?> event = JSON.toObject(body, getEventClass(tag));

				exchangeEvent(event, null);
				return Action.CommitMessage;
			}
		});
		consumer.start();
	}

	@Override
	public void multicastEvent(@NonNull final ApplicationEvent event, @Nullable ResolvableType eventType) {

		if(event instanceof BaseEvent) {

			BaseEvent<?,?> baseEvent = (BaseEvent<?,?>)event;
			// 判断持久化消息,执行mq投递
			if(baseEvent.isStore()) {
				Message msg = new Message();
				msg.setKey(baseEvent.getUuid());
				msg.setTag(event.getClass().getName());
				msg.setMsgID(baseEvent.getUuid());
				msg.setTopic(topic);
				String msgString = JSON.toJSONString(event);
				// 空消息事件也同样投递
				if(Objects.nonNull(msgString)) {
					msg.setBody(msgString.getBytes());
				}
				// 延时投递
				if(baseEvent.getStartDeliverTime() > 0) {
					msg.setStartDeliverTime(baseEvent.getStartDeliverTime());
				}
				producer.send(msg);

				return;
			}
		}

		exchangeEvent(event, eventType);
	}

	@SuppressWarnings("all")
	private Class<? extends BaseEvent<?,?>> getEventClass(String event) {
		if(!events.containsKey(event)) {
			try {
				// 强制类型转换,根据event的完整类名,转换对应的Class
				Class<? extends BaseEvent<?,?>> clazz = Class.forName(event)
						.asSubclass((Class<BaseEvent<?,?>>)(Class)BaseEvent.class);
				events.put(event, clazz);
			} catch (ClassNotFoundException e) {
				log.error("get event class error: {}", e.getMessage());
			}
		}
		return events.get(event);
	}


}

基础的事件Event

public static abstract class BaseEvent<S, R> extends ApplicationEvent {

        private boolean store = false;
        /** 延时投递,绝对时间,单位毫秒 **/
        private long startDeliverTime;

        /**
         * 事件返回结果
         */
        private R result;

        /**
         * 事件唯一ID
         */
        private String uuid = UUID.randomUUID().toString();

        @JsonCreator
        public BaseEvent(@JsonProperty("source") S source) {
            super(source);
        }

        public String getUuid() {
            return uuid;
        }

        public void setUuid(String uuid) {
            this.uuid = uuid;
        }

        public R getResult() {
            return result;
        }

        public void setResult(R o) {
            this.result = o;
        }

        public void setSource(S data) {
            this.source = data;
        }

        @SuppressWarnings("unchecked")
        @Override
        public S getSource() {
            if (super.getSource() == null) {
                return null;
            }
            return (S) super.getSource();
        }

        public boolean isStore() {
            return store;
        }

        public void setStore(boolean store) {
            this.store = store;
        }

        public long getStartDeliverTime() {
            return startDeliverTime;
        }

        public void setStartDeliverTime(long startDeliverTime) {
            this.startDeliverTime = startDeliverTime;
        }
    }

完整代码

代码仓库地址: https://github.com/scofier/SpringEventStore

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment