i’m using axon with kafka and mongodb. but event handler deosn’t invoked. i don’t know why this is happening.
i posted already here in stackoverflow with my code.
i don’t know how to make it work.
please let me know
i’m using axon with kafka and mongodb. but event handler deosn’t invoked. i don’t know why this is happening.
i posted already here in stackoverflow with my code.
i don’t know how to make it work.
please let me know
Hi @imnewone, and welcome to the forum!
Although I understand it’s a duplicate, for increasing the quality of the posts, I think it would be beneficial if you’d actually add the question here too instead of just referring to it.
Note that I’ve posted a comment on your StackOverflow question, asking if you can expand on the status of your Kafka topic.
i copy from stackoverflow!
i’m using
i checked command app that storing event data to mongodb.
However event handler not invoked.
here is project structure
practice-root
│
├── command
│ ├── build
│ ├── src
│ │ ├── main
│ │ │ ├ kotlin
│ │ │ │ └ com.cqrs.axon
│ │ │ │ ├ Application.kt
│ │ │ │ ├ SimpleDTO
│ │ │ │ ├ SimpleController
│ │ │ │ ...
│ │ │ └ resources
│ │ ├── test
│ │ │
│ └── build.gradle
│
├── query
│ ├ ...
│ ...
│
├── common
│ ├ ...
│ ...
│
├── README
├── build.gradle
└── settings.gradle
here is my code
command module
Application.kt
@SpringBootApplication()
class Application
fun main(args: Array<String>) {
runApplication<Application>(*args)
}
SimpleService.kt
@Service
class SimpleService(
private val eventGateway: EventGateway
) {
@CommandHandler
fun createSimple(simpleDTO: SimpleDTO): Unit {
return this.eventGateway.publish(
SimpleEvent(
id = UUID.randomUUID().toString(),
data = simpleDTO.data
)
)
}
}
SimpleDTO.kt
data class SimpleDTO (
val data: String
)
SimpleController.kt
@RestController
class SimpleController(
private val simpleService: SimpleService
) {
@PostMapping("/simple")
fun createSimple(@RequestBody simpleDTO: SimpleDTO): Unit {
return simpleService.createSimple(simpleDTO)
}
}
AxonConfig.kt
@Configuration
class AxonConfig {
@Bean
fun eventStore(storageEngine: EventStorageEngine, configuration: AxonConfiguration): EmbeddedEventStore {
return EmbeddedEventStore.builder()
.storageEngine(storageEngine)
.messageMonitor(configuration.messageMonitor(EventStore::class.java, "eventStore"))
.build()
}
@Bean
fun storageEngine(client: MongoClient): EventStorageEngine {
return MongoEventStorageEngine.builder()
.mongoTemplate(DefaultMongoTemplate.builder().mongoDatabase(client).build()).build()
}
/**
* Creates a Kafka producer factory, using the Kafka properties configured in resources/application.yml
*/
@Bean
fun producerFactory(kafkaProperties: KafkaProperties): ProducerFactory<String, ByteArray> {
return DefaultProducerFactory.builder<String, ByteArray>()
.configuration(kafkaProperties.buildProducerProperties())
.producerCacheSize(10_000)
.confirmationMode(ConfirmationMode.WAIT_FOR_ACK)
.build()
}
}
application.yml
---
server:
port: 8080
spring:
application:
name: commandSpringApplication
data:
mongodb:
uri: mongodb://user:u123@localhost:27016/test?authSource=admin
autoconfigure:
exclude:
- org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration
axon:
axonserver:
enabled: false
kafka:
client-id: myproducer
default-topic: axon-events
producer:
bootstrap-servers: localhost:9092
event-processor-mode: tracking
transaction-id-prefix: sample-eventstx
properties:
security.protocol: PLAINTEXT
logging:
level:
com:
cqrs:
command: debug
org:
axonframework: debug
query module
Application.kt
@SpringBootApplication()
class Application
fun main(args: Array<String>) {
runApplication<Application>(*args)
}
QuerySimpleProjection.kt
@Component
@ProcessingGroup("SampleProcessor")
class QuerySimpleProjection (
private val simpleRepository: QuerySimpleRepository
) {
@EventHandler
fun on(event: SimpleEvent, @Timestamp instant: Instant) {
val simpleMV = SimpleMV(
id = event.id,
data = event.data
)
simpleRepository.save(simpleMV)
}
}
QuerySimpleRepository.kt
@Repository
interface QuerySimpleRepository : JpaRepository<SimpleMV, String>
SimpleMV.kt
@Entity
@Table(name = "mv_simple_mongo")
data class SimpleMV (
@Id
val id: String,
val data: String
)
AxonConfig.kt
@Configuration
class AxonConfig {
@Autowired
fun configureStreamableKafkaSource(configurer: EventProcessingConfigurer,
streamableKafkaMessageSource: StreamableKafkaMessageSource<String, ByteArray>
) {
configurer.registerTrackingEventProcessor("SampleProcessor") { streamableKafkaMessageSource }
}
}
application.yml
---
server:
port: 9090
spring:
application:
name: queryMongoSpringApplication
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:33060/test?useSSL=false&characterEncoding=utf8&useUnicode=true
username: user
password: u123
jpa:
show-sql: true
properties:
hibernate:
dialect: org.hibernate.dialect.MySQL8Dialect
hbm2ddl.auto: update
format_sql: true
jdbc:
time_zone: UTC
axon:
axonserver:
enabled: false
kafka:
client-id: myconsumer
default-topic: axon-events
consumer:
bootstrap-servers: localhost:9092
event-processor-mode: tracking
properties:
security.protocol: PLAINTEXT
logging:
level:
com:
cqrs:
command: debug
org:
axonframework: debug
common module
SimpleEvent.kt
data class SimpleEvent (
val id: String,
val data: String
)
build.gradle of command module and query module
apply plugin: 'kotlin-jpa'
apply plugin: 'org.springframework.boot'
apply plugin: 'kotlin-allopen'
...
dependencies {
implementation project(':common')
implementation 'org.springframework.boot:spring-boot-starter-validation'
implementation 'mysql:mysql-connector-java'
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
implementation 'org.apache.kafka:kafka-clients'
implementation 'org.axonframework.extensions.kafka:axon-kafka-spring-boot-starter:4.5.3'
compile "org.axonframework:axon-spring-boot-starter:4.5.8", {
exclude group:'org.axonframework', module: 'axon-server-connector'
}
implementation "org.axonframework:axon-configuration:4.5.8"
testImplementation 'org.springframework.boot:spring-boot-starter-test'
}
query, common, command app package names are same - com.cqrs.axon
i check that kafka get messages! but event handler not invoked
one more information added!
when i use commandhandler and eventhandler in one module than it works!
after split two module(query and command) with common module, i can’t get any message.
please can anyone help me?
@imnewone - Were you able to resolve the issue ?
I’m not sure what the issue was. If the event is stored in Mongo, it’s not automatically als send to Kafka. This seems to be the issue, but I’m not sure.
@Arunav_Bhattacharya maybe you have a similar, or related problem?