Hi everyone!
I’m new to Axon. I’m trying to test it out but I’ve hit a blocker with Kafka. I’ve read through https://docs.axoniq.io/reference-guide/extensions/kafka as well as the example project on git hub, and several threads here and on SO.
Use case: Postgres event store and Kafka for the event bus. No Axon server. There are 2 Spring Boot microservices (account-service & user-service) with Java. Events generated in the account-service should also be consumed in the user-service.
Problem: Kafka messages are not being received/consumed across services. The account-service can generate and consume its own events but they aren’t making it over to the user-service.
At the moment I’m using the code based configuration. Here is how it looks:
account-service
:
application.yml
axon:
axonserver:
enabled: false
kafka:
bootstrap-servers: localhost:9092
client-id: accountservice
producer:
event-processor-mode: subscribing
consumer:
event-processor-mode: subscribing
properties:
group-id: accountservice
distributed:
enabled: true
serializer:
messages: jackson
events: jackson
general: jackson
pom.xml
<dependency>
<groupId>org.axonframework</groupId>
<artifactId>axon-spring-boot-starter</artifactId>
<version>4.5.8</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.axonframework.extensions.kafka/axon-kafka-spring-boot-starter -->
<dependency>
<groupId>org.axonframework.extensions.kafka</groupId>
<artifactId>axon-kafka-spring-boot-starter</artifactId>
<version>4.5.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.8.3</version>
</dependency>
<dependency>
<groupId>org.axonframework</groupId>
<artifactId>axon-test</artifactId>
<version>4.5.8</version>
<scope>test</scope>
</dependency>
...
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.axonframework</groupId>
<artifactId>axon-bom</artifactId>
<version>4.5.8</version>
</dependency>
</dependencies>
</dependencyManagement>
config
@Configuration
public class KafkaConfig {
@Autowired
private EventProcessingConfigurer eventProcessingConfigurer;
private String groupId = "accountservice";
private String processorName = "accountservice";
@Bean
public ProducerFactory<String, byte[]> producerFactory(KafkaProperties producerConfiguration) {
return DefaultProducerFactory.<String, byte[]>builder()
.configuration(producerConfiguration.buildProducerProperties()) // Hard requirement
.build();
}
@Bean
public KafkaPublisher<String, byte[]> kafkaPublisher(ProducerFactory<String, byte[]> producerFactory) {
KafkaPublisher<String, byte[]> kafkaPublisher = KafkaPublisher.<String, byte[]>builder()
.topic("topic1") // Defaults to "Axon.Events"
.producerFactory(producerFactory) // Hard requirement
.build();
return kafkaPublisher;
}
@Bean
public KafkaEventPublisher<String, byte[]> kafkaEventPublisher(KafkaPublisher<String, byte[]> kafkaPublisher) {
KafkaEventPublisher<String, byte[]> kafkaEventPublisher = KafkaEventPublisher.<String, byte[]>builder()
.kafkaPublisher(kafkaPublisher) // Hard requirement
.build();
registerPublisherToEventProcessor(eventProcessingConfigurer, kafkaEventPublisher);
return kafkaEventPublisher;
}
private void registerPublisherToEventProcessor(EventProcessingConfigurer eventProcessingConfigurer,
KafkaEventPublisher<String, byte[]> kafkaEventPublisher) {
String processingGroup = KafkaEventPublisher.DEFAULT_PROCESSING_GROUP;
eventProcessingConfigurer.registerEventHandler(configuration -> kafkaEventPublisher)
.assignHandlerTypesMatching(
processingGroup,
clazz -> clazz.isAssignableFrom(KafkaEventPublisher.class)
)
.registerSubscribingEventProcessor(processingGroup);
// Replace `registerSubscribingEventProcessor` for `registerTrackingEventProcessor` to use a tracking processor
}
@Bean
public ConsumerFactory<String, byte[]> consumerFactory(KafkaProperties consumerConfiguration) {
return new DefaultConsumerFactory<>(consumerConfiguration.buildConsumerProperties());
}
@Bean
public Fetcher<?, ?, ?> fetcher() {
return AsyncFetcher.builder()
.build();
}
@Bean
public KafkaMessageSourceConfigurer kafkaMessageSourceConfigurer(Configurer configurer) {
KafkaMessageSourceConfigurer kafkaMessageSourceConfigurer = new KafkaMessageSourceConfigurer();
configurer.registerModule(kafkaMessageSourceConfigurer);
return kafkaMessageSourceConfigurer;
}
@Bean
public SubscribableKafkaMessageSource<String, byte[]> subscribableKafkaMessageSource(//List<String> topics,
//String groupId,
ConsumerFactory<String, byte[]> consumerFactory,
Fetcher<String, byte[], EventMessage<?>> fetcher,
//KafkaMessageConverter<String, byte[]> messageConverter,
//int consumerCount,
KafkaMessageSourceConfigurer kafkaMessageSourceConfigurer) {
SubscribableKafkaMessageSource<String, byte[]> subscribableKafkaMessageSource = SubscribableKafkaMessageSource.<String, byte[]>builder()
.groupId(groupId) // Hard requirement
.consumerFactory(consumerFactory) // Hard requirement
.fetcher(fetcher) // Hard requirement
.autoStart()
.build();
// Registering the source is required to tie into the Configurers lifecycle to start the source at the right stage
kafkaMessageSourceConfigurer.configureSubscribableSource(configuration -> subscribableKafkaMessageSource);
//this.configureSubscribableKafkaSource(eventProcessingConfigurer, processorName, subscribableKafkaMessageSource);
return subscribableKafkaMessageSource;
}
private void configureSubscribableKafkaSource(EventProcessingConfigurer eventProcessingConfigurer,
String processorName,
SubscribableKafkaMessageSource<String, byte[]> subscribableKafkaMessageSource) {
eventProcessingConfigurer.registerSubscribingEventProcessor(
processorName,
configuration -> subscribableKafkaMessageSource
);
}
Note: I commented out the call to configureSubscribableKafkaSource()
. When I uncomment it, not even account-service
can receive its own messages. Therefore I’m highly suspicious of the implementation here.
user-service
:
The pom is exactly the same. The differences in the application.yml are only that client-id
and group-id
are set to userserivce
. Similarly with the KafkaConfig class, it is exactly the same as the account-service
, except the values for groupId
and processorName
are set to userservice
.
Any help is much appreciated, thank you!