Axon-kafka extention # 4.5 , consumer not working

Trying to user axon-kafka, producer part works perferctly fine. Not able to get consumer working. Application starts, however does not connect with kafka and does not consume events …

build.gradle

plugins {
  id 'org.springframework.boot' version '2.4.4' 
  id 'io.spring.dependency-management' version '1.0.11.RELEASE'
  id 'java'
}

bootJar {
	layered {
		enabled = true
	}
}
ext {  
    springBootVersion = '2.4.4'
    swaggerVersion = '3.0.0'
    axonVersion = '4.5'    
    lombokVersion='1.18.12'
    axonMongo = '4.4'
    kafkaVersion = '2.3.0'   
}
allprojects {
    repositories {
        mavenLocal()
		mavenCentral()
    }
}

subprojects {
    apply plugin: 'maven-publish'
    apply plugin: 'java'
    apply plugin: 'java-library'    
    apply plugin: 'io.spring.dependency-management'
    apply plugin: 'eclipse'
    apply plugin: 'idea'
	
	group = 'com.techm.bm.ws.integration'
	version = '0.0.1-SNAPSHOT'
	
	dependencies {

	    // Boot Starter  
		implementation 'org.springframework.boot:spring-boot-starter-web'

    	//Lombok
    	implementation "org.projectlombok:lombok:${lombokVersion}"
    	annotationProcessor "org.projectlombok:lombok:${lombokVersion}" 
    	
    	// logback
		implementation group:'net.logstash.logback',name: 'logstash-logback-encoder', version: '6.4'  
		
				
		// bean-utils
		implementation group: 'commons-beanutils', name: 'commons-beanutils', version: "1.9.4"
	}
	
	sourceCompatibility = 1.8
    targetCompatibility = 1.8
    
	dependencyManagement {
        imports {
            mavenBom("org.springframework.boot:spring-boot-dependencies:${springBootVersion}")
        }
    }
}

project(':events') {   

	bootJar {
    	enabled = false
	}
	
	jar {
	    enabled = true
	} 
	
	dependencies {}
}

project(':command') {  
    apply plugin: 'org.springframework.boot'
    dependencies {
        //Project dependencies
        implementation project(':events')
        
		// logback
		implementation group:'net.logstash.logback',name: 'logstash-logback-encoder', version: '6.4' 
		       
        // Axon
		implementation("org.axonframework:axon-spring-boot-starter:${axonVersion}") 
    	
		
		// Database
		implementation 'org.springframework.boot:spring-boot-starter-data-jpa'	
		implementation group: 'mysql', name: 'mysql-connector-java', version: '8.0.21'	
		
		// kafka
		implementation group: 'org.apache.kafka', name: 'kafka-clients', version: "${kafkaVersion}"
		implementation group: 'org.axonframework.extensions.kafka', name: 'axon-kafka-spring-boot-starter', version: "${axonVersion}" 
		
		
		// Swagger
		implementation group: 'io.springfox', name: 'springfox-swagger2', version: "${swaggerVersion}"
		implementation group: 'io.springfox', name: 'springfox-swagger-ui', version: "${swaggerVersion}"
		implementation group: 'io.springfox', name: 'springfox-boot-starter', version: "${swaggerVersion}"	

    }
}

project(':query') {  
    apply plugin: 'org.springframework.boot'
    dependencies {
        //Project dependencies
        implementation project(':events')
        
		// Mongodb for query
		implementation 'org.springframework.boot:spring-boot-starter-data-mongodb'
	
		// logback
		implementation group:'net.logstash.logback',name: 'logstash-logback-encoder', version: '6.4' 
		       
        // Axon
		implementation( "org.axonframework:axon-spring-boot-starter:${axonVersion}" )
    	
		implementation group: 'org.axonframework.extensions.mongo', name: 'axon-mongo', version: "${axonMongo}" 
		
		// Swagger
		implementation group: 'io.springfox', name: 'springfox-swagger2', version: "${swaggerVersion}"
		implementation group: 'io.springfox', name: 'springfox-swagger-ui', version: "${swaggerVersion}"
		implementation group: 'io.springfox', name: 'springfox-boot-starter', version: "${swaggerVersion}"
		
		// kafka
		implementation group: 'org.apache.kafka', name: 'kafka-clients', version: "${kafkaVersion}"
		implementation group: 'org.axonframework.extensions.kafka', name: 'axon-kafka-spring-boot-starter', version: "${axonVersion}" 
		

    }
}

project(':event-processor') {
    apply plugin: 'org.springframework.boot'
    dependencies {
         // Project dependencies
        implementation project(':events')
        
        // Axon
		implementation( "org.axonframework:axon-spring-boot-starter:${axonVersion}" )
		
		// kafka
		implementation group: 'org.apache.kafka', name: 'kafka-clients', version: "${kafkaVersion}"
		implementation group: 'org.axonframework.extensions.kafka', name: 'axon-kafka-spring-boot-starter', version: "${axonVersion}" 
	}
}

application.yaml

spring:
  application:
    name: Event processor
  data:
      mongodb:
        uri: mongodb://localhost:27017/events  
axon:
  axonserver:
    enabled: false
    
  kafka:
    bootstrap-servers: localhost:9092
    client-id: order-info-consumer
    default-topic: order_command_topic
    properties:
      security.protocol: PLAINTEXT
    fetcher:
      poll-timeout: 3000
    
    consumer:
      enable-auto-commit: true
      auto-commit-interval: 3000
      event-processor-mode: tracking

And if I make change to the axon-kafka extension back to 4.0,RC2 and use eventhandling configuration construct, it works fine…

// kafka
		implementation group: 'org.apache.kafka', name: 'kafka-clients', version: "2.3.0"
		implementation group: 'org.axonframework.extensions.kafka', name: 'axon-kafka-spring-boot-starter', version: '4.0-RC2' 
axon:
  axonserver:
    enabled: false
  kafka:
    default-topic: order_command_topic
    consumer:
      group-id: kafka-group
      bootstrap-servers: localhost:9092
  eventhandling:
    processors:
      order-info-processor:
        mode: tracking
        source: kafkaMessageSource

Hi @Prashant_Shandilya, good morning

First of all, have you checked our own kafka-axon-example? This small sample is aimed to provide a working example for the extension where you can use to compare to your own setup and try to find the culprit.

If after trying to find it you are not able, can you provide a small reproduciable code so we can easily run and spot the problem you are describing?

KR,

thanks for your reply @lfgcampos !

I did had a look at the sample example and my demo code is also based on the simliar config.

The example mentioned in the documentation is probably not adequet for actual use cases, it uses in memory token-store and event-store. Would appreciate if you can refer me some example that has got event-store and token-store and also if possible uses spring-boot-starter.

As i mentioned, same code is working with axon-kafka version 4.0-RC2.

So probably there something missing in either build.gradle or application.yaml, which i already attached, can you please check and cofirm if looks okay ?

Even in past, had tried to migrate from 4.0.RC2 to 4.0.RC3 and was not able to do it. Looks like lot changed in between or i am missing something here. Below is SOF conversation for reference

One thing strikes me from your properties file. You’re expecting a message source with the bean name kafkaMessageSource. The Kafka Extension’s auto-configuration constructs a StreamableMessageSource for Streaming Event Processors (like the TrackingEventProcessor) with the bean name streamableKafkaMessageSource.

Would you mind adjusting the source property to that name?

If that doesn’t resolve it, then I am uncertain what the issues are, sadly enough.
Given that scenario, would you mind trying to remove any unnecessary configuration from your sample project to see when it breaks?

So, differently put, my plan of attack would be:

  1. Take the working sample project @lfgcampos shared.
  2. Test if this works locally. If yes, proceed.
  3. Expand the project by replacing sample code for your code.
  4. Repeat step 3 until you have made all changes.

If the steps taken in 3 are granular enough, you should eventually spot what the exact issue is.

Thanks @Steven_van_Beelen figured it out that need to use explicit java configurations. ideally would have liked to configure these using application.yaml which was available in previous version 4.0.RC2

I am able to resolve the issue by explicitly registering the event processor using java configurations (simlar to what is coded in the example application)

    @Autowired
    public void configure(final EventProcessingConfigurer configurer, StreamableKafkaMessageSource<String, byte[]> streamableKafkaMessageSource) {
        configurer.registerTrackingEventProcessor("info-processor1", c -> streamableKafkaMessageSource);
        configurer.registerTrackingEventProcessor("info-processor2", c -> streamableKafkaMessageSource);

You have tried adjusting the bean name in the properties file to what I suggested then, right?

If that’s the case, then I guess there’s a Spring ordering problem.
Pretty sure this was introduced when the Kafka Extension started supporting Subscribing Event Processors too.

Nonetheless, If you can confirm it fails with the properties set correctly, I feel it’s fine if you construct an issue for it on the Kafka extension.

Did try that, would create an issue, along with details. Even java config is good for now, it works that’s what matter :slight_smile:

1 Like

Completely right there, Prashant! Happy to hear your app works.
Your effort to contribute an issue to the Kafka Extension will be much appreciated.
As soon as you’ve got time for that, of course.