Some more details on my setup:
On the producer Spring Boot application, I force the use of subscribing processors in my Axon configuration:
@Autowired
public void configureEventSubscribers(EventProcessingConfigurer configurer) {
configurer.usingSubscribingEventProcessors();
}
application.properties for the producer:
spring:
application:
name: producer
datasource:
url: jdbc:h2:tcp://${H2_DB_HOST:localhost}:9090/./axondb;DB_CLOSE_ON_EXIT=FALSE;IFEXISTS=TRUE;
axon:
serializer:
general: jackson
events: jackson
messages: jackson
kafka:
client-id: producer
default-topic: axon.topic
bootstrap-servers: localhost:9092
producer:
event-processor-mode: subscribing
On the consumer side, I use a TRACKING event processor. This is my Spring Boot application.properties.
spring:
application:
name: consumer
datasource:
url: jdbc:h2:tcp://${H2_DB_HOST:localhost}:9090/./axondb;DB_CLOSE_ON_EXIT=FALSE;IFEXISTS=TRUE;
axon:
serializer:
general: jackson
events: jackson
messages: jackson
eventhandling:
processors:
"[myGroup]":
source: streamableKafkaMessageSource
mode: TRACKING
kafka:
client-id: consumer
default-topic: axon.topic
bootstrap-servers: localhost:9092
consumer:
client-id: consumer
event-processor-mode: TRACKING
properties:
"group.id": "myGroup"
The database is a H2 database server running on port 9090 as a Windows service (always running, NOT in-memory).
The consumer has only a single EventHandler:
@Component
@ProcessingGroup("myGroup")
public class ConsumerEvents {
@EventHandler
public void on(SomeEvent event) {
log.info("event: {}", event);
}
}
With the above setup, if I only start the “producer” app (consumer is NOT running), and publish an event, I can see the message in the Kafka topic (i.e. message count is 1).
Then if I start the “consumer” app, it reads the event stream from the beginning and consumes the event (@EventHandler gets invoked) BUT I see the Kafka message count is now 2… it appears that the consumer also publishes ANOTHER copy of the event message to Kafka when the EventHandler gets called.
Also, in the Token store, I see TWO event processors registered:
- org.axonframework.extensions.kafka.eventhandling.producer
- myGroup
I do not have any configuration for a TRACKING Kafka producer, so how did the org.axonframework.extensions.kafka.eventhandling.producer EVENT processor get registered??
Ideally, what I need is a SUBSCRIBING producer and a TRACKING consumer. Am I doing anything wrong? Any help would be greatly appreciated!
Thanks!