-
Notifications
You must be signed in to change notification settings - Fork 126
Concept Event
该库支持在一个项目中(同时)配置多个 Kafka
和 RabbitMQ
如有必要还可扩展 ActiveMQ
或 RocketMQ
等任何符合事件模型的组件
同时以简单的事件模型作为抽象,支持不对任何中间件强绑定的场景
支持可视化动态添加(未实现,考虑后续版本实现)
implementation 'com.github.linyuzai:concept-event-spring-boot-starter:1.1.1'
或者
<dependency>
<groupId>com.github.linyuzai</groupId>
<artifactId>concept-event-spring-boot-starter</artifactId>
<version>1.1.1</version>
</dependency>
在项目中使用@EnableEventConcept
启用功能
@EnableEventConcept
@SpringBootApplication
public class ConceptSampleApplication {
public static void main(String[] args) {
SpringApplication.run(ConceptSampleApplication.class, args);
}
}
concept:
event:
kafka:
enabled: true #需要手动开启
endpoints: #在该节点下配置多个kafka,属性同spring.kafka
kafka1: #端点名称-kafka1
inherit: parent #继承名称为parent的端点配置
bootstrap-servers:
- 192.168.30.100:9092
- 192.168.30.101:9092
- 192.168.30.102:9092
consumer:
group-id: kafka1
kafka2: #端点名称-kafka2
inherit: parent #继承名称为parent的端点配置
bootstrap-servers:
- 192.168.60.200:9092
- 192.168.60.201:9092
- 192.168.60.202:9092
consumer:
group-id: kafka2
parent: #作为其他端点的父配置
enabled: false #是否启用该端点,这里作为父配置不启用
producer:
retries: 0
acks: 1
consumer:
enable-auto-commit: false
template:
default-topic: sample
listener:
ack-mode: manual_immediate
kafka
的配置属性同spring.kafka
concept:
event:
rabbitmq:
enabled: true #需要手动开启
endpoints: #在该节点下配置多个rabbitmq,属性同spring.rabbitmq
rabbitmq1: #端点名称-rabbitmq1
inherit: parent #继承名称为parent的端点配置
host: 192.168.30.140
template:
routing-key: rabbitmq1.dev
exchange: rabbitmq1
rabbitmq2: #端点名称-rabbitmq2
inherit: parent #继承名称为parent的端点配置
host: 192.168.30.141
template:
routing-key: rabbitmq2.dev
exchange: rabbitmq2
parent:
enabled: false #是否启用该端点,这里作为父配置不启用
username: admin
password: 123456
port: 5672
rabbitmq
的配置属性同spring.rabbitmq
额外提供配置继承可将一些相同的配置提取出来,使用inherit
属性指定继承的端点
@RestController
@RequestMapping("/concept-event")
public class EventController {
@Autowired
private EventConcept concept;
@GetMapping("/send")
public void send() {
concept.template().publish(Object);//发布事件
}
}
需要注意该方式需要提前通过 事件引擎自定义配置/事件端点自定义配置 配置组件 事件发布器/事件订阅器
@RestController
@RequestMapping("/concept-event")
public class EventController {
@Autowired
private EventConcept concept;
@GetMapping("/send")
public void send() {
concept.template()
.context(KeyValue) //配置上下文(用于满足自定义数据传递)
.exchange(EventExchange) //指定发布到哪些端点(多个Kafka中哪几个)
.encoder(EventEncoder) //指定事件编码器(如把对象转成json)
.error(EventErrorHandler) //指定异常处理器(发布异常的后续操作)
.publisher(EventPublisher) //指定事件发布器(如使用KafkaTemplate给指定的Topic发消息)
.publish(Object); //事件对象
}
}
@Configuration
public class EventSubscriberRegister implements ApplicationRunner {
@Autowired
public EventConcept concept;
@Override
public void run(ApplicationArguments args) throws Exception {
concept.template().subscribe(EventListener);//监听事件
}
}
需要注意该方式需要提前通过 事件引擎自定义配置/事件端点自定义配置 配置组件 事件发布器/事件订阅器
@Configuration
public class EventSubscriberRegister implements ApplicationRunner {
@Autowired
public EventConcept concept;
@Override
public void run(ApplicationArguments args) throws Exception {
concept.template()
.context(KeyValue) //配置上下文(用于满足自定义数据传递)
.exchange(EventExchange) //指定订阅哪些端点(多个Kafka中哪几个)
.decoder(EventDecoder) //指定事件解码器(如把json转成对象)
.error(EventErrorHandler) //指定异常处理器(订阅或消费异常的后续操作)
.subscriber(EventSubscriber) //指定事件订阅器(如订阅哪个Topic)
.subscribe(EventListener); //监听事件
}
}
抽象为EventEngine
表示中间件类型,目前支持 Kafka
和 RabbitMQ
//如有需要可以使用该方法获得Kafka事件引擎
KafkaEventEngine.get(EventConcept);
//如有需要可以使用该方法获得RabbitMQ事件引擎
RabbitEventEngine.get(EventConcept);
抽象为EventEngineFactory
Kakfa
事件引擎工厂KafkaEventEngineFactory
,默认实现KafkaEventEngineFactoryImpl
可自定义并注入Spring
容器生效
@Component
public class CustomKafkaEventEngineFactory implements KafkaEventEngineFactory {
@Override
public KafkaEventEngine create(KafkaEventProperties config) {
//自定义
return null;
}
}
RabbitMQ
事件引擎工厂RabbitEventEngineFactory
,默认实现RabbitEventEngineFactoryImpl
可自定义并注入Spring
容器生效
@Component
public class CustomRabbitEventEngineFactory implements RabbitEventEngineFactory {
@Override
public RabbitEventEngine create(RabbitEventProperties config) {
//自定义
return null;
}
}
抽象为EventEngineConfigurer
Kakfa
使用KafkaEventEngineConfigurer
扩展配置
可自定义并注入Spring
容器生效
@Configuration
public class CustomKafkaEventEngineConfigurer implements KafkaEventEngineConfigurer {
@Override
public void configure(KafkaEventEngine engine) {
//自定义配置
}
}
RabbitMQ
使用RabbitEventEngineConfigurer
扩展配置
可自定义并注入Spring
容器生效
@Configuration
public class CustomRabbitEventEngineConfigurer implements RabbitEventEngineConfigurer {
@Override
public void configure(RabbitEventEngine engine) {
//自定义配置
}
}
抽象为EventEndpoint
表示多个Kafka
和RabbitMQ
服务(集群)
抽象为EventEndpointFactory
Kakfa
事件引擎工厂KafkaEventEndpointFactory
,默认实现KafkaEventEndpointFactoryImpl
可自定义并注入Spring
容器生效
@Component
public class CustomKafkaEventEndpointFactory implements KafkaEventEndpointFactory {
@Override
public KafkaEventEndpoint create(String name, KafkaEventProperties.ExtendedKafkaProperties config, KafkaEventEngine engine) {
//自定义
return null;
}
}
RabbitMQ
事件引擎工厂RabbitEventEndpointFactory
,默认实现RabbitEventEndpointFactoryImpl
可自定义并注入Spring
容器生效
@Component
public class CustomRabbitEventEndpointFactory implements RabbitEventEndpointFactory {
@Override
public RabbitEventEndpoint create(String name, RabbitEventProperties.ExtendedRabbitProperties config, RabbitEventEngine engine) {
//自定义
return null;
}
}
抽象为EventEndpointConfigurer
Kakfa
使用KafkaEventEndpointConfigurer
扩展配置
可自定义并注入Spring
容器生效
@Configuration
public class CustomKafkaEventEndpointConfigurer implements KafkaEventEndpointConfigurer {
@Override
public void configure(KafkaEventEndpoint endpoint) {
//自定义配置
}
}
RabbitMQ
使用RabbitEventEndpointConfigurer
扩展配置
可自定义并注入Spring
容器生效
@Configuration
public class CustomRabbitEventEndpointConfigurer implements RabbitEventEndpointConfigurer {
@Override
public void configure(RabbitEventEndpoint endpoint) {
//自定义配置
}
}
抽象为EventContext
用于事件发布和事件订阅的过程中的数据交互
同时方便用户自定义扩展处理
抽象为EventContextFactory
,默认实现MapEventContextFactory
可自定义并注入Spring
容器生效
@Component
public class CustomEventContextFactory implements EventContextFactory {
@Override
public EventContext create() {
//自定义
return null;
}
}
抽象为EventExchange
用于在发布事件或订阅事件时指定对应的事件端点
可自定义并注入Spring
容器全局生效
@Component
public class CustomEventExchange implements EventExchange {
@Override
public Collection<? extends EventEndpoint> exchange(Collection<? extends EventEngine> engines, EventContext context) {
//自定义筛选
return null;
}
}
手动指定优先级高于全局配置
默认提供的事件交换机
事件交换机 | 说明 |
---|---|
EngineExchange |
指定一个或多个引擎下的所有端点 |
EndpointExchange |
指定一个引擎下的一个或多个端点 |
KafkaEngineExchange |
指定Kafka 所有端点 |
KafkaEndpointExchange |
指定Kafka 一个或多个端点 |
RabbitEngineExchange |
指定RabbitMQ 所有端点 |
RabbitEndpointExchange |
指定RabbitMQ 一个或多个端点 |
ComposeEventExchange |
组合多个事件交换机 |
抽象为EventPublisher
用于指定事件的发布逻辑(一般调用KafkaTemplate
或RabbitTemplate
来发送消息)
可基于KafkaEventPublisher
或AbstractKafkaEventPublisher
和RabbitEventPublisher
或AbstractRabbitEventPublisher
自定义事件发布器
@Configuration
public class CustomKafkaEventEngineConfigurer implements KafkaEventEngineConfigurer {
@Override
public void configure(KafkaEventEngine engine) {
engine.setPublisher(new CustomKafkaEventPublisher());
}
}
或者
@Configuration
public class CustomKafkaEventEndpointConfigurer implements KafkaEventEndpointConfigurer {
@Override
public void configure(KafkaEventEndpoint endpoint) {
endpoint.setPublisher(new CustomKafkaEventPublisher());
}
}
手动指定的优先级高于事件端点中的配置
事件端点中的配置优先级高于事件引擎中的配置
默认提供的事件发布器
事件发布器 | 说明 |
---|---|
TopicKafkaEventPublisher |
指定Topic 的事件发布器 |
ConfigurableKafkaEventPublisher |
可配置Topic ,Partition ,Timestamp ,Key 的事件发布器 |
DefaultKafkaEventPublisher |
基于KafkaTemplate#sendDefault 的事件发布器 |
RoutingRabbitEventPublisher |
指定exchange 和routingKey 的事件发布器 |
ConfigurableRabbitEventPublisher |
可配置exchange ,routingKey 和correlationData 的事件发布器 |
DefaultRabbitEventPublisher |
基于RabbitTemplate#convertAndSend 的事件发布器 |
ComposeEventPublisher |
组合多个事件发布器 |
AbstractRabbitEventPublisher
提供#binding
方法在发布时创建Exchange/Queue/Binding
,用法同BindingBuilder
public class CustomRabbitEventPublisher extends AbstractRabbitEventPublisher {
@Override
public void send(Object event, RabbitEventEndpoint endpoint, EventContext context) {
endpoint.getTemplate().convertAndSend(endpoint);
}
@Override
public void binding(RabbitBinding binding) {
//创建和绑定
binding.bind(new Queue("queue"))
.to(new TopicExchange("topic"))
.with("routingKey");
}
}
抽象为EventSubscriber
用于指定事件的订阅逻辑
可基于KafkaEventSubscriber
或AbstractKafkaEventSubscriber
和RabbitEventSubscriber
或AbstractRabbitEventSubscriber
自定义事件订阅器
@Configuration
public class CustomKafkaEventEngineConfigurer implements KafkaEventEngineConfigurer {
@Override
public void configure(KafkaEventEngine engine) {
engine.setSubscriber(new CustomKafkaEventSubscriber());
}
}
或者
@Configuration
public class CustomKafkaEventEndpointConfigurer implements KafkaEventEndpointConfigurer {
@Override
public void configure(KafkaEventEndpoint endpoint) {
endpoint.setSubscriber(new CustomKafkaEventSubscriber());
}
}
手动指定的优先级高于事件端点中的配置
事件端点中的配置优先级高于事件引擎中的配置
默认提供的事件订阅器
事件订阅器 | 说明 |
---|---|
TopicKafkaEventSubscriber |
指定Topic 的事件订阅器 |
TopicPatternKafkaEventSubscriber |
指定Topic Pattern 的事件订阅器 |
TopicPartitionOffsetKafkaEventSubscriber |
指定TopicPartitionOffset 的事件订阅器 |
DefaultKafkaEventSubscriber |
基于KafkaListenerEndpoint 的事件订阅器 |
QueueRabbitEventSubscriber |
指定Queue 的事件订阅器 |
DefaultRabbitEventSubscriber |
基于RabbitListenerEndpoint 的事件订阅器 |
ComposeEventSubscriber |
组合多个事件订阅器 |
抽象为Subscription
事件订阅器订阅之后会返回一个订阅句柄
可通过订阅句柄取消订阅Subscription#unsubscribe()
AbstractRabbitEventSubscriber
提供#binding
方法在订阅时创建Exchange/Queue/Binding
,用法同BindingBuilder
public class CustomRabbitEventSubscriber extends AbstractRabbitEventSubscriber {
@Override
public MessageListenerContainer createMessageListenerContainer(RabbitEventEndpoint endpoint, EventContext context, MessageListener messageListener) {
return endpoint.getListenerContainerFactory().createListenerContainer();
}
@Override
public void binding(RabbitBinding binding) {
//创建和绑定
binding.bind(new Queue("queue"))
.to(new TopicExchange("topic"))
.with("routingKey");
}
}
抽象为EventEncoder
用于在事件发布时对事件进行编码,默认为null
,不进行编码处理
可自定义并注入Spring
容器全局生效
@Configuration
public class EventConfig {
@Bean
public EventEncoder eventEncoder() {
return new JacksonEventEncoder();
}
}
也可通过 事件引擎自定义配置/事件端点自定义配置 的方式配置
@Configuration
public class CustomKafkaEventEngineConfigurer implements KafkaEventEngineConfigurer {
@Override
public void configure(KafkaEventEngine engine) {
engine.setEncoder(new JacksonEventEncoder());
}
}
或者
@Configuration
public class CustomKafkaEventEndpointConfigurer implements KafkaEventEndpointConfigurer {
@Override
public void configure(KafkaEventEndpoint endpoint) {
endpoint.setEncoder(new JacksonEventEncoder());
}
}
手动指定的优先级高于事件端点中的配置
事件端点中的配置优先级高于事件引擎中的配置
事件引擎中的配置优先级高于全局配置
默认提供的事件编码器
事件编码器 | 说明 |
---|---|
JacksonEventEncoder |
基于Jackson 的json 编码 |
SerializationEventDecoder |
基于jdk 序列化的编码 |
抽象为EventDecoder
用于在监听到事件时对事件进行解码,默认为null
,不进行解码处理
可自定义并注入Spring
容器全局生效
@Configuration
public class EventConfig {
@Bean
public EventDecoder eventDecoder() {
return new JacksonEventDecoder();
}
}
也可通过 事件引擎自定义配置/事件端点自定义配置 的方式配置
@Configuration
public class CustomKafkaEventEngineConfigurer implements KafkaEventEngineConfigurer {
@Override
public void configure(KafkaEventEngine engine) {
engine.setDecoder(new JacksonEventDecoder());
}
}
或者
@Configuration
public class CustomKafkaEventEndpointConfigurer implements KafkaEventEndpointConfigurer {
@Override
public void configure(KafkaEventEndpoint endpoint) {
endpoint.setDecoder(new JacksonEventDecoder());
}
}
手动指定的优先级高于事件端点中的配置
事件端点中的配置优先级高于事件引擎中的配置
事件引擎中的配置优先级高于全局配置
默认提供的事件解码器
事件解码器 | 说明 |
---|---|
JacksonEventDecoder |
基于Jackson 的json 解码 |
SerializationEventDecoder |
基于jdk 序列化的解码 |
抽象为EventListener
用于在事件订阅时监听数据
JacksonEventDecoder
可结合#getType()
方法解析数据
提供GenericEventListener<T>
自动提取泛型作为#getType()
返回值
@Configuration
public class KafkaEventSubscriberRegister implements ApplicationRunner {
@Autowired
public EventConcept concept;
@Override
public void run(ApplicationArguments args) throws Exception {
concept.template().subscribe(new GenericEventListener<KafkaData>() {
@Override
public void onGenericEvent(KafkaData event, EventEndpoint endpoint, EventContext context) {
System.out.println(event);
}
});
}
}
抽象为EventErrorHandler
用于发布或订阅时的异常处理,默认实现LoggerEventErrorHandler
可自定义并注入Spring
容器全局生效
@Component
public class CustomEventErrorHandler implements EventErrorHandler {
@Override
public void onError(Throwable e, EventEndpoint endpoint, EventContext context) {
//自定义异常处理
}
}
也可通过 事件引擎自定义配置/事件端点自定义配置 的方式配置
@Configuration
public class CustomKafkaEventEngineConfigurer implements KafkaEventEngineConfigurer {
@Override
public void configure(KafkaEventEngine engine) {
engine.setErrorHandler(new CustomEventErrorHandler());
}
}
或者
@Configuration
public class CustomKafkaEventEndpointConfigurer implements KafkaEventEndpointConfigurer {
@Override
public void configure(KafkaEventEndpoint endpoint) {
endpoint.setErrorHandler(new CustomEventErrorHandler());
}
}
手动指定的优先级高于事件端点中的配置
事件端点中的配置优先级高于事件引擎中的配置
事件引擎中的配置优先级高于全局配置
抽象为EventTemplate
用于不同场景的配置复用
可以持有事件模版直接进行发布事件
@RestController
public class Business1Controller {
private final EventTemplate template;
@Autowired
public Business1Controller(EventConcept concept) {
this.template = concept.template()
.exchange(new Business1EventExchange())
.publisher(new Business1EventPublisher());
}
@PostMapping("/business1")
public void business1() {
template.publish(new Business1Event());
}
}
抽象为ConfigInheritHandler
Kafka
配置继承处理器KafkaConfigInheritHandler
,默认实现ReflectionKafkaConfigInheritHandler
可自定义并注入Spring
容器生效
@Component
public class CustomKafkaConfigInheritHandler implements KafkaConfigInheritHandler {
@Override
public void inherit(KafkaEventProperties config) {
}
}
RabbitMQ
配置继承处理器RabbitConfigInheritHandler
,默认实现ReflectionRabbitConfigInheritHandler
可自定义并注入Spring
容器生效
@Component
public class CustomRabbitConfigInheritHandler implements RabbitConfigInheritHandler {
@Override
public void inherit(RabbitEventProperties config) {
}
}
通过实现EventConceptLifecycleListener
接口可以在EventConcept
初始化和销毁时扩展额外的逻辑
@Component
public class CustomEventConceptLifecycleListener implements EventConceptLifecycleListener {
@Override
public void onInitialize(EventConcept concept) {
//初始化
}
@Override
public void onDestroy(EventConcept concept) {
//销毁
}
}
可通过指定@Qualifier
获取,名称为:${事件端点名称}KafkaTemplate
@RequestMapping("/concept-event/kafka")
public class KafkaEventController {
@Autowired
@Qualifier("devKafkaTemplate")
private KafkaTemplate<Object, Object> kafkaTemplate;
}
可通过指定containerFactory
使用,名称为:${事件端点名称}KafkaListenerContainerFactory
@Configuration
public class KafkaEventSubscriberRegister {
@KafkaListener(topics = "sample", containerFactory = "devKafkaListenerContainerFactory")
public void receiveLocal(String msg, Acknowledgment acknowledgment) {
System.out.println("dev-" + msg);
acknowledgment.acknowledge();
}
}
可通过指定@Qualifier
获取,名称为:${事件端点名称}KafkaTemplate
@RequestMapping("/concept-event/rabbit")
public class RabbitEventController {
@Autowired
@Qualifier("devRabbitTemplate")
private RabbitTemplate rabbitTemplate;
}
可通过指定containerFactory
使用,名称为:${事件端点名称}RabbitListenerContainerFactory
@Configuration
public class KafkaEventSubscriberRegister {
@SneakyThrows
@RabbitListener(queues = "queue", containerFactory = "devRabbitListenerContainerFactory")
public void receiveDev(Message message, Channel channel) {
System.out.println("dev-" + new String(message.getBody()));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
下列Bean
全部使用ConfigurableBeanFactory#registerSingleton
注册
- Kafka
Bean | Name |
---|---|
ProducerFactory |
${事件端点名称}KafkaProducerFactory |
ProducerListener |
${事件端点名称}KafkaProducerListener |
KafkaTemplate |
${事件端点名称}KafkaTemplate |
ConsumerFactory |
${事件端点名称}KafkaConsumerFactory |
KafkaTransactionManager |
${事件端点名称}KafkaTransactionManager |
KafkaListenerContainerFactory |
${事件端点名称}KafkaListenerContainerFactory |
KafkaAdmin |
${事件端点名称}KafkaAdmin |
KafkaEventEndpoint |
${事件端点名称}KafkaEventEndpoint |
- RabbitMQ
Bean | Name |
---|---|
ConnectionFactory |
${事件端点名称}RabbitConnectionFactory |
RabbitListenerContainerFactory |
${事件端点名称}RabbitListenerContainerFactory |
RabbitTemplate |
${事件端点名称}RabbitTemplate |
RabbitAdmin |
${事件端点名称}RabbitAdmin |
RabbitEventEndpoint |
${事件端点名称}RabbitEventEndpoint |
- 代码结构优化
-
InheritHandler
改为ConfigInheritHandler
- 漏注册
RabbitEventEndpoint
的问题