How to use axon with kafka and mongodb

i’m using axon with kafka and mongodb. but event handler deosn’t invoked. i don’t know why this is happening.

i posted already here in stackoverflow with my code.

i don’t know how to make it work.

please let me know

Hi @imnewone, and welcome to the forum!
Although I understand it’s a duplicate, for increasing the quality of the posts, I think it would be beneficial if you’d actually add the question here too instead of just referring to it.

Note that I’ve posted a comment on your StackOverflow question, asking if you can expand on the status of your Kafka topic.

i copy from stackoverflow!


i’m using

  1. two instances (command app, query app)
  2. mongodb and kafka without axon server
  3. kotlin
  4. multi-module
  5. axon framework 4.5

i checked command app that storing event data to mongodb.

However event handler not invoked.

here is project structure

practice-root
│
├── command
│   ├── build
│   ├── src
│   │    ├── main
│   │    │     ├ kotlin
│   │    │     │  └ com.cqrs.axon
│   │    │     │         ├ Application.kt
│   │    │     │         ├ SimpleDTO
│   │    │     │         ├ SimpleController
│   │    │     │         ...
│   │    │     └ resources
│   │    ├── test
│   │    │
│   └── build.gradle
│
├── query
│     ├ ...
│     ...
│
├── common
│     ├ ...
│     ...
│
├── README
├── build.gradle
└── settings.gradle

here is my code

command module

Application.kt

@SpringBootApplication()
class Application

fun main(args: Array<String>) {
    runApplication<Application>(*args)
}

SimpleService.kt

@Service
class SimpleService(
    private val eventGateway: EventGateway
) {
    @CommandHandler
    fun createSimple(simpleDTO: SimpleDTO): Unit {
        return this.eventGateway.publish(
            SimpleEvent(
                id = UUID.randomUUID().toString(),
                data = simpleDTO.data
            )
        )
    }
}

SimpleDTO.kt

data class SimpleDTO (
    val data: String
)

SimpleController.kt


@RestController
class SimpleController(
    private val simpleService: SimpleService
    ) {

    @PostMapping("/simple")
    fun createSimple(@RequestBody simpleDTO: SimpleDTO): Unit {
        return simpleService.createSimple(simpleDTO)
    }
}

AxonConfig.kt


@Configuration
class AxonConfig {


    @Bean
    fun eventStore(storageEngine: EventStorageEngine, configuration: AxonConfiguration): EmbeddedEventStore {
    return EmbeddedEventStore.builder()
        .storageEngine(storageEngine)
        .messageMonitor(configuration.messageMonitor(EventStore::class.java, "eventStore"))
        .build()
    }


    @Bean
    fun storageEngine(client: MongoClient): EventStorageEngine {
        return MongoEventStorageEngine.builder()
            .mongoTemplate(DefaultMongoTemplate.builder().mongoDatabase(client).build()).build()
    }

    /**
     * Creates a Kafka producer factory, using the Kafka properties configured in resources/application.yml
     */
    @Bean
    fun producerFactory(kafkaProperties: KafkaProperties): ProducerFactory<String, ByteArray> {
        return DefaultProducerFactory.builder<String, ByteArray>()
            .configuration(kafkaProperties.buildProducerProperties())
            .producerCacheSize(10_000)
            .confirmationMode(ConfirmationMode.WAIT_FOR_ACK)
            .build()
    }
}

application.yml

---
server:
  port: 8080

spring:
  application:
    name: commandSpringApplication
  data:
    mongodb:
      uri: mongodb://user:u123@localhost:27016/test?authSource=admin
  autoconfigure:
    exclude:
      - org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration

axon:
  axonserver:
    enabled: false
  kafka:
    client-id: myproducer
    default-topic: axon-events
    producer:
      bootstrap-servers: localhost:9092
      event-processor-mode: tracking
      transaction-id-prefix: sample-eventstx

    properties:
      security.protocol: PLAINTEXT

logging:
  level:
    com:
      cqrs:
        command: debug
    org:
      axonframework: debug

query module

Application.kt

@SpringBootApplication()
class Application

fun main(args: Array<String>) {
    runApplication<Application>(*args)
}

QuerySimpleProjection.kt

@Component
@ProcessingGroup("SampleProcessor")
class QuerySimpleProjection (
    private val simpleRepository: QuerySimpleRepository
) {

    @EventHandler
    fun on(event: SimpleEvent, @Timestamp instant: Instant) {
        val simpleMV = SimpleMV(
            id = event.id,
            data = event.data
        )
        simpleRepository.save(simpleMV)
    }
}

QuerySimpleRepository.kt

@Repository
interface QuerySimpleRepository : JpaRepository<SimpleMV, String>

SimpleMV.kt

@Entity
@Table(name = "mv_simple_mongo")
data class SimpleMV (

    @Id
    val id: String,
    val data: String
)

AxonConfig.kt

@Configuration
class AxonConfig {

    @Autowired
    fun configureStreamableKafkaSource(configurer: EventProcessingConfigurer,
                                       streamableKafkaMessageSource: StreamableKafkaMessageSource<String, ByteArray>
    ) {
        configurer.registerTrackingEventProcessor("SampleProcessor") { streamableKafkaMessageSource }
    }
}

application.yml

---
server:
  port: 9090

spring:
  application:
    name: queryMongoSpringApplication
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://localhost:33060/test?useSSL=false&characterEncoding=utf8&useUnicode=true
    username: user
    password: u123

  jpa:
    show-sql: true
    properties:
      hibernate:
        dialect: org.hibernate.dialect.MySQL8Dialect
        hbm2ddl.auto: update
        format_sql: true
        jdbc:
          time_zone: UTC


axon:
  axonserver:
    enabled: false
  kafka:
    client-id: myconsumer
    default-topic: axon-events
    consumer:
      bootstrap-servers: localhost:9092
      event-processor-mode: tracking

    properties:
      security.protocol: PLAINTEXT


logging:
  level:
    com:
      cqrs:
        command: debug
    org:
      axonframework: debug

common module

SimpleEvent.kt

data class SimpleEvent (
    val id: String,
    val data: String
)

build.gradle of command module and query module

apply plugin: 'kotlin-jpa'
apply plugin: 'org.springframework.boot'
apply plugin: 'kotlin-allopen'
...
dependencies {
    implementation project(':common')
    implementation 'org.springframework.boot:spring-boot-starter-validation'

    implementation 'mysql:mysql-connector-java'
    implementation 'org.springframework.boot:spring-boot-starter-web'
    implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
    implementation 'org.apache.kafka:kafka-clients'
    implementation 'org.axonframework.extensions.kafka:axon-kafka-spring-boot-starter:4.5.3'
    compile "org.axonframework:axon-spring-boot-starter:4.5.8", {
        exclude group:'org.axonframework', module: 'axon-server-connector'
    }
    implementation "org.axonframework:axon-configuration:4.5.8"
    testImplementation 'org.springframework.boot:spring-boot-starter-test'
}

query, common, command app package names are same - com.cqrs.axon
i check that kafka get messages! but event handler not invoked

one more information added!
when i use commandhandler and eventhandler in one module than it works!
after split two module(query and command) with common module, i can’t get any message.

please can anyone help me?

@imnewone - Were you able to resolve the issue ?

I’m not sure what the issue was. If the event is stored in Mongo, it’s not automatically als send to Kafka. This seems to be the issue, but I’m not sure.

@Arunav_Bhattacharya maybe you have a similar, or related problem?