i’m using axon with kafka and mongodb. but event handler deosn’t invoked. i don’t know why this is happening.
i posted already here in stackoverflow with my code.
i don’t know how to make it work.
please let me know
i’m using axon with kafka and mongodb. but event handler deosn’t invoked. i don’t know why this is happening.
i posted already here in stackoverflow with my code.
i don’t know how to make it work.
please let me know
Hi @imnewone, and welcome to the forum!
Although I understand it’s a duplicate, for increasing the quality of the posts, I think it would be beneficial if you’d actually add the question here too instead of just referring to it.
Note that I’ve posted a comment on your StackOverflow question, asking if you can expand on the status of your Kafka topic.
i copy from stackoverflow!
i’m using
i checked command app that storing event data to mongodb.
However event handler not invoked.
here is project structure
practice-root
│
├── command
│ ├── build
│ ├── src
│ │ ├── main
│ │ │ ├ kotlin
│ │ │ │ └ com.cqrs.axon
│ │ │ │ ├ Application.kt
│ │ │ │ ├ SimpleDTO
│ │ │ │ ├ SimpleController
│ │ │ │ ...
│ │ │ └ resources
│ │ ├── test
│ │ │
│ └── build.gradle
│
├── query
│ ├ ...
│ ...
│
├── common
│ ├ ...
│ ...
│
├── README
├── build.gradle
└── settings.gradle
here is my code
command module
Application.kt
@SpringBootApplication()
class Application
fun main(args: Array<String>) {
runApplication<Application>(*args)
}
SimpleService.kt
@Service
class SimpleService(
private val eventGateway: EventGateway
) {
@CommandHandler
fun createSimple(simpleDTO: SimpleDTO): Unit {
return this.eventGateway.publish(
SimpleEvent(
id = UUID.randomUUID().toString(),
data = simpleDTO.data
)
)
}
}
SimpleDTO.kt
data class SimpleDTO (
val data: String
)
SimpleController.kt
@RestController
class SimpleController(
private val simpleService: SimpleService
) {
@PostMapping("/simple")
fun createSimple(@RequestBody simpleDTO: SimpleDTO): Unit {
return simpleService.createSimple(simpleDTO)
}
}
AxonConfig.kt
@Configuration
class AxonConfig {
@Bean
fun eventStore(storageEngine: EventStorageEngine, configuration: AxonConfiguration): EmbeddedEventStore {
return EmbeddedEventStore.builder()
.storageEngine(storageEngine)
.messageMonitor(configuration.messageMonitor(EventStore::class.java, "eventStore"))
.build()
}
@Bean
fun storageEngine(client: MongoClient): EventStorageEngine {
return MongoEventStorageEngine.builder()
.mongoTemplate(DefaultMongoTemplate.builder().mongoDatabase(client).build()).build()
}
/**
* Creates a Kafka producer factory, using the Kafka properties configured in resources/application.yml
*/
@Bean
fun producerFactory(kafkaProperties: KafkaProperties): ProducerFactory<String, ByteArray> {
return DefaultProducerFactory.builder<String, ByteArray>()
.configuration(kafkaProperties.buildProducerProperties())
.producerCacheSize(10_000)
.confirmationMode(ConfirmationMode.WAIT_FOR_ACK)
.build()
}
}
application.yml
---
server:
port: 8080
spring:
application:
name: commandSpringApplication
data:
mongodb:
uri: mongodb://user:u123@localhost:27016/test?authSource=admin
autoconfigure:
exclude:
- org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration
axon:
axonserver:
enabled: false
kafka:
client-id: myproducer
default-topic: axon-events
producer:
bootstrap-servers: localhost:9092
event-processor-mode: tracking
transaction-id-prefix: sample-eventstx
properties:
security.protocol: PLAINTEXT
logging:
level:
com:
cqrs:
command: debug
org:
axonframework: debug
query module
Application.kt
@SpringBootApplication()
class Application
fun main(args: Array<String>) {
runApplication<Application>(*args)
}
QuerySimpleProjection.kt
@Component
@ProcessingGroup("SampleProcessor")
class QuerySimpleProjection (
private val simpleRepository: QuerySimpleRepository
) {
@EventHandler
fun on(event: SimpleEvent, @Timestamp instant: Instant) {
val simpleMV = SimpleMV(
id = event.id,
data = event.data
)
simpleRepository.save(simpleMV)
}
}
QuerySimpleRepository.kt
@Repository
interface QuerySimpleRepository : JpaRepository<SimpleMV, String>
SimpleMV.kt
@Entity
@Table(name = "mv_simple_mongo")
data class SimpleMV (
@Id
val id: String,
val data: String
)
AxonConfig.kt
@Configuration
class AxonConfig {
@Autowired
fun configureStreamableKafkaSource(configurer: EventProcessingConfigurer,
streamableKafkaMessageSource: StreamableKafkaMessageSource<String, ByteArray>
) {
configurer.registerTrackingEventProcessor("SampleProcessor") { streamableKafkaMessageSource }
}
}
application.yml
---
server:
port: 9090
spring:
application:
name: queryMongoSpringApplication
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:33060/test?useSSL=false&characterEncoding=utf8&useUnicode=true
username: user
password: u123
jpa:
show-sql: true
properties:
hibernate:
dialect: org.hibernate.dialect.MySQL8Dialect
hbm2ddl.auto: update
format_sql: true
jdbc:
time_zone: UTC
axon:
axonserver:
enabled: false
kafka:
client-id: myconsumer
default-topic: axon-events
consumer:
bootstrap-servers: localhost:9092
event-processor-mode: tracking
properties:
security.protocol: PLAINTEXT
logging:
level:
com:
cqrs:
command: debug
org:
axonframework: debug
common module
SimpleEvent.kt
data class SimpleEvent (
val id: String,
val data: String
)
build.gradle of command module and query module
apply plugin: 'kotlin-jpa'
apply plugin: 'org.springframework.boot'
apply plugin: 'kotlin-allopen'
...
dependencies {
implementation project(':common')
implementation 'org.springframework.boot:spring-boot-starter-validation'
implementation 'mysql:mysql-connector-java'
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
implementation 'org.apache.kafka:kafka-clients'
implementation 'org.axonframework.extensions.kafka:axon-kafka-spring-boot-starter:4.5.3'
compile "org.axonframework:axon-spring-boot-starter:4.5.8", {
exclude group:'org.axonframework', module: 'axon-server-connector'
}
implementation "org.axonframework:axon-configuration:4.5.8"
testImplementation 'org.springframework.boot:spring-boot-starter-test'
}
query, common, command app package names are same - com.cqrs.axon
i check that kafka get messages! but event handler not invoked
one more information added!
when i use commandhandler and eventhandler in one module than it works!
after split two module(query and command) with common module, i can’t get any message.
please can anyone help me?
I’m not sure what the issue was. If the event is stored in Mongo, it’s not automatically als send to Kafka. This seems to be the issue, but I’m not sure.
@Arunav_Bhattacharya maybe you have a similar, or related problem?