Consumers not reading from Kafka

Hi everyone!

I’m new to Axon. I’m trying to test it out but I’ve hit a blocker with Kafka. I’ve read through https://docs.axoniq.io/reference-guide/extensions/kafka as well as the example project on git hub, and several threads here and on SO.

Use case: Postgres event store and Kafka for the event bus. No Axon server. There are 2 Spring Boot microservices (account-service & user-service) with Java. Events generated in the account-service should also be consumed in the user-service.

Problem: Kafka messages are not being received/consumed across services. The account-service can generate and consume its own events but they aren’t making it over to the user-service.

At the moment I’m using the code based configuration. Here is how it looks:

account-service:

application.yml

axon:
  axonserver:
    enabled: false
  kafka:
    bootstrap-servers: localhost:9092
    client-id: accountservice
    producer:
      event-processor-mode: subscribing
    consumer:
      event-processor-mode: subscribing
      properties:
        group-id: accountservice
  distributed:
    enabled: true
  serializer:
    messages: jackson
    events: jackson
    general: jackson

pom.xml

        <dependency>
            <groupId>org.axonframework</groupId>
            <artifactId>axon-spring-boot-starter</artifactId>
            <version>4.5.8</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.axonframework.extensions.kafka/axon-kafka-spring-boot-starter -->
        <dependency>
            <groupId>org.axonframework.extensions.kafka</groupId>
            <artifactId>axon-kafka-spring-boot-starter</artifactId>
            <version>4.5.3</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.8.3</version>
        </dependency>
        
        <dependency>
            <groupId>org.axonframework</groupId>
            <artifactId>axon-test</artifactId>
            <version>4.5.8</version>
            <scope>test</scope>
        </dependency>

...

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.axonframework</groupId>
                <artifactId>axon-bom</artifactId>
                <version>4.5.8</version>
            </dependency>
        </dependencies>
    </dependencyManagement>

config

@Configuration
public class KafkaConfig {

	@Autowired
	private EventProcessingConfigurer eventProcessingConfigurer;

	private String groupId = "accountservice";

	private String processorName = "accountservice";
	

	@Bean
	public ProducerFactory<String, byte[]> producerFactory(KafkaProperties producerConfiguration) {
		return DefaultProducerFactory.<String, byte[]>builder()
				.configuration(producerConfiguration.buildProducerProperties()) // Hard requirement
				.build();
	}


	@Bean
	public KafkaPublisher<String, byte[]> kafkaPublisher(ProducerFactory<String, byte[]> producerFactory) {
		KafkaPublisher<String, byte[]> kafkaPublisher = KafkaPublisher.<String, byte[]>builder()
				.topic("topic1")                               // Defaults to "Axon.Events"
				.producerFactory(producerFactory)           // Hard requirement
				.build();
		return kafkaPublisher;
	}


	@Bean
	public KafkaEventPublisher<String, byte[]> kafkaEventPublisher(KafkaPublisher<String, byte[]> kafkaPublisher) {
		KafkaEventPublisher<String, byte[]> kafkaEventPublisher = KafkaEventPublisher.<String, byte[]>builder()
				.kafkaPublisher(kafkaPublisher)             // Hard requirement
				.build();

		registerPublisherToEventProcessor(eventProcessingConfigurer, kafkaEventPublisher);
		return kafkaEventPublisher;
	}

	private void registerPublisherToEventProcessor(EventProcessingConfigurer eventProcessingConfigurer,
												   KafkaEventPublisher<String, byte[]> kafkaEventPublisher) {
		String processingGroup = KafkaEventPublisher.DEFAULT_PROCESSING_GROUP;
		eventProcessingConfigurer.registerEventHandler(configuration -> kafkaEventPublisher)
				.assignHandlerTypesMatching(
						processingGroup,
						clazz -> clazz.isAssignableFrom(KafkaEventPublisher.class)
				)
				.registerSubscribingEventProcessor(processingGroup);
		// Replace `registerSubscribingEventProcessor` for `registerTrackingEventProcessor` to use a tracking processor
	}

	@Bean
	public ConsumerFactory<String, byte[]> consumerFactory(KafkaProperties consumerConfiguration) {
		return new DefaultConsumerFactory<>(consumerConfiguration.buildConsumerProperties());
	}

	@Bean
	public Fetcher<?, ?, ?> fetcher() {
		return AsyncFetcher.builder()
				.build();
	}

	@Bean
	public KafkaMessageSourceConfigurer kafkaMessageSourceConfigurer(Configurer configurer) {
		KafkaMessageSourceConfigurer kafkaMessageSourceConfigurer = new KafkaMessageSourceConfigurer();
		configurer.registerModule(kafkaMessageSourceConfigurer);
		return kafkaMessageSourceConfigurer;
	}

	@Bean
	public SubscribableKafkaMessageSource<String, byte[]> subscribableKafkaMessageSource(//List<String> topics,
																						 //String groupId,
																						 ConsumerFactory<String, byte[]> consumerFactory,
																						 Fetcher<String, byte[], EventMessage<?>> fetcher,
																						 //KafkaMessageConverter<String, byte[]> messageConverter,
																						 //int consumerCount,
																						 KafkaMessageSourceConfigurer kafkaMessageSourceConfigurer) {
		SubscribableKafkaMessageSource<String, byte[]> subscribableKafkaMessageSource = SubscribableKafkaMessageSource.<String, byte[]>builder()
				.groupId(groupId)                   // Hard requirement
				.consumerFactory(consumerFactory)   // Hard requirement
				.fetcher(fetcher)                   // Hard requirement
				.autoStart()
				.build();
		// Registering the source is required to tie into the Configurers lifecycle to start the source at the right stage
		kafkaMessageSourceConfigurer.configureSubscribableSource(configuration -> subscribableKafkaMessageSource);
		//this.configureSubscribableKafkaSource(eventProcessingConfigurer, processorName, subscribableKafkaMessageSource);
		return subscribableKafkaMessageSource;
	}


	private void configureSubscribableKafkaSource(EventProcessingConfigurer eventProcessingConfigurer,
												  String processorName,
												  SubscribableKafkaMessageSource<String, byte[]> subscribableKafkaMessageSource) {
		eventProcessingConfigurer.registerSubscribingEventProcessor(
				processorName,
				configuration -> subscribableKafkaMessageSource
		);
	}

Note: I commented out the call to configureSubscribableKafkaSource(). When I uncomment it, not even account-service can receive its own messages. Therefore I’m highly suspicious of the implementation here.

user-service:

The pom is exactly the same. The differences in the application.yml are only that client-id and group-id are set to userserivce. Similarly with the KafkaConfig class, it is exactly the same as the account-service, except the values for groupId and processorName are set to userservice.

Any help is much appreciated, thank you!

Could you try to add a KafkaMessageConverter bean, or set it in the SubscribableKafkaMessageSource builder?

Something like

    @Bean
    KafkaMessageConverter<String, byte[]> kafkaMessageConverter() {
        JacksonSerializer serializer = JacksonSerializer.defaultSerializer();
        return DefaultKafkaMessageConverter.builder()
                                           .serializer(serializer)
                                           .build();
    }

Hi @gklijs !

Thank you for your suggestion! I tried it out but unfortunately the behavior didn’t change. Only account-service receives the Kafka messages when I comment out this.configureSubscribableKafkaSource(eventProcessingConfigurer, processorName, subscribableKafkaMessageSource);. If I leave this line in then nobody receives any messages.

I added the bean code like you posted, and changed the subscribableKafkaMessageSource methods to:

@Bean
public SubscribableKafkaMessageSource<String, byte[]> subscribableKafkaMessageSource(//List<String> topics,
																						 //String groupId,
																						 ConsumerFactory<String, byte[]> consumerFactory,
																						 Fetcher<String, byte[], EventMessage<?>> fetcher,
																						 KafkaMessageConverter<String, byte[]> messageConverter,
																						 //int consumerCount,
																						 KafkaMessageSourceConfigurer kafkaMessageSourceConfigurer) {
		SubscribableKafkaMessageSource<String, byte[]> subscribableKafkaMessageSource = SubscribableKafkaMessageSource.<String, byte[]>builder()
				.groupId(groupId)                   // Hard requirement
				.consumerFactory(consumerFactory)   // Hard requirement
				.fetcher(fetcher)                   // Hard requirement
				.messageConverter(messageConverter) // Defaults to a "DefaultKafkaMessageConverter"
				.autoStart()
				.build();
		// Registering the source is required to tie into the Configurers lifecycle to start the source at the right stage
		kafkaMessageSourceConfigurer.configureSubscribableSource(configuration -> subscribableKafkaMessageSource);
		//this.configureSubscribableKafkaSource(eventProcessingConfigurer, processorName, subscribableKafkaMessageSource);
		return subscribableKafkaMessageSource;
	}

After this didn’t work, I tried changing the kafkaPublisher() methods as well to this with all combinations of configureSubscribableKafkaSource() commented/uncommented:

@Bean
public KafkaPublisher<String, byte[]> kafkaPublisher(ProducerFactory<String, byte[]> producerFactory,
														 KafkaMessageConverter<String, byte[]> kafkaMessageConverter) {
		KafkaPublisher<String, byte[]> kafkaPublisher = KafkaPublisher.<String, byte[]>builder()
				.topic("topic1")                               // Defaults to "Axon.Events"
				.producerFactory(producerFactory)           // Hard requirement
				.messageConverter(kafkaMessageConverter)    // Defaults to a "DefaultKafkaMessageConverter"
				.build();
		return kafkaPublisher;
	}

Do you have any other ideas?

One thing that is confusing me is should I be calling configureSubscribableKafkaSource on the subscribableKafkaMessageSource? If I don’t, then at least the producing service (account-service) can consume its own messages. But If I do call it then it doesn’t even receive its own messages.

One thing you could try is to replace the spring-kafka dependency with the kafka-clients dependency. I don’t think that changes anything, but you just need the kafka-clients for the integration, not the rest of spring-kafka. So use:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.1.0</version>
</dependency>

to replace the spring-kafka one.

Hi @gklijs !

There is still no difference in behavior after replacing the dependency. :frowning:

I can verify from the docker containers that my topic is being created and messages are being published. But they still aren’t being consumed.

Here is my project along with a readme that explains how to reproduce my tests:

Update:

When I change the value of processorName from "accountservice" to be "com.jeszenka.eventdrivenmicroservices.userservice.query.handler" then I can uncomment the call to configureSubscribableKafkaSource() in account-service and it can consume its own messages.

Unfortunately, user-service still isn’t getting these messages. From my command line I can still view my topic (topic1) and see all messages showing up there. However, listing the consuming groups doesn’t show the consumer groups I am trying to define for my services. Am I setting this properly?

Another update:

Instead of autowiring the EventProcessingConfigurer and directly calling registerPublisherToEventProcessor() and configureSubscribableKafkaSource(), I autowired these two methods similar to the Kafka extension guide:

	@Autowired
	public void registerPublisherToEventProcessor(EventProcessingConfigurer eventProcessingConfigurer,
												   KafkaEventPublisher<String, byte[]> kafkaEventPublisher) {
		String processingGroup = KafkaEventPublisher.DEFAULT_PROCESSING_GROUP;
		eventProcessingConfigurer.registerEventHandler(configuration -> kafkaEventPublisher)
				.assignHandlerTypesMatching(
						processingGroup,
						clazz -> clazz.isAssignableFrom(KafkaEventPublisher.class)
				)
				.registerSubscribingEventProcessor(processingGroup);
		// Replace `registerSubscribingEventProcessor` for `registerTrackingEventProcessor` to use a tracking processor
	}

	@Autowired
	public void configureSubscribableKafkaSource(EventProcessingConfigurer eventProcessingConfigurer,
												 SubscribableKafkaMessageSource<String, byte[]> subscribableKafkaMessageSource) {
		eventProcessingConfigurer.usingSubscribingEventProcessors();
		eventProcessingConfigurer.registerSubscribingEventProcessor(
				processorName,
				configuration -> subscribableKafkaMessageSource
		);
	}

After doing this I can now see the consumer groups being created in Kafka, but the behavior doesn’t change. Still services can only consume their own events. Looking at Kafka, topic1 is created with 1 partition.

Just to test, I added the spring-kafka dependency back to my pom in user-service and added this to the user-service event processor class:

	@KafkaListener(topics = "topic1", groupId = "userservice")
	public void listen(ConsumerRecord<String, String> event) {
		log.info("Received message from account-service: {}", event);
	}

This successfully receives the Kafka messages from account-service! But this doesn’t:

	@EventHandler
	public void on(AccountCreatedEvent event) {
		log.info("Received message from account-service: {}", event);
	}

Stepping through some of the Axon Kafka extension code, I noticed that messages are indeed being received but not actually making it to my @EventHandler for some reason.

I also noticed that my SubscribableKafkaMessageSource.eventProcessors is empty. Does this need to get set somehow?

OK, I got it working. The missing piece of the puzzle was eventProcessingConfigurer.assignProcessingGroup(groupId, processorName);. I added it to the configureSubscribableKafkaSource() method:

	@Autowired
	public void configureSubscribableKafkaSource(EventProcessingConfigurer eventProcessingConfigurer,
												 SubscribableKafkaMessageSource<String, byte[]> subscribableKafkaMessageSource) {

		eventProcessingConfigurer.assignProcessingGroup(groupId, processorName);
		eventProcessingConfigurer.registerSubscribingEventProcessor(
				processorName,
				configuration -> subscribableKafkaMessageSource
		);

	}

Now both services are receiving the Kafka messages in their respective @EventHandler methods. :slight_smile:

1 Like