Hi I’m new to axon, I’ve been checking the documentation, the forum and stackoverflow, I’ve found several questions relating to this topic, but almost all of their answers point to use the example. However, I’m not able to consume events from kafka using the example.
What I aim
I was thinking of integrating axon framework with an existing platform to implement some new functionality that requires an Event Store. Since it will need to communicate with non axon services, I was thinking of using kafka to communicate the events among non-axon services and axon services.
What I’ve tried
- I’ve cloned the axon example
- Followed the README.md
Everything works. Then I commented the scheduled tasks, so they do not send any message to kafka anymore. After, I rerun the application and I manually send a message to the kafka topic using the kafka console producer. Not working. I’ve realized that the console producer cannot send headers, so just in case the headers have something to do with it. I’ve used a desktop application to send a message to the topic using also the headers. I’ve tried both, using them with the same values published by the axon framework in an empty kafka(docker-compose down & docker-compose up -d
), and updating the id’s. Neither worked.
BankClient
/**
* Bank client sending scheduled commands.
*/
@Component
class BankClient(private val commandGateway: CommandGateway) {
private val accountId = UUID.randomUUID().toString()
private var amount = 100
/**
* Creates account once.
*/
// @Scheduled(initialDelay = 5_000, fixedDelay = 1000_000_000)
fun createAccount() {
commandGateway.send<CompletableFuture<String>>(CreateBankAccountCommand(bankAccountId = accountId, overdraftLimit = 1000))
}
/**
* Deposit some money every 20 seconds.
*/
// @Scheduled(initialDelay = 10_000, fixedDelay = 20_000)
fun deposit() {
commandGateway.send<CompletableFuture<String>>(DepositMoneyCommand(bankAccountId = accountId, amountOfMoney = amount.toLong()))
amount = amount.inc()
}
}
The content of the event I’m sending
<org.axonframework.extension.kafka.example.api.MoneyDepositedEvent><bankAccountId defined-in="org.axonframework.extension.kafka.example.api.MoneyAddedEvent">c6f4cc3c-91bb-48d2-abf6-a8bcd4bb3bc2</bankAccountId><amount defined-in="org.axonframework.extension.kafka.example.api.MoneyAddedEvent">100</amount><bankAccountId>c6f4cc3c-91bb-48d2-abf6-a8bcd4bb3bc2</bankAccountId><amount>100</amount></org.axonframework.extension.kafka.example.api.MoneyDepositedEvent>
The headers:
key | value |
---|---|
axon-message-aggregate-id | c6f4cc3c-91bb-48d2-abf6-a8bcd4bb3bc2 |
axon-message-aggregate-seq | |
axon-message-aggregate-type | BankAccount |
axon-message-id | 2d30b594-4d0e-4a44-b15b-cb133829742b |
axon-message-revision | null |
axon-message-timestamp | 2021-06-24 13:39:20.979 |
axon-message-type | org.axonframework.extension.kafka.example.api.BankAccountCreatedEvent |
axon-metadata-correlationId | 5f03d8c3-94aa-46b2-9d8b-ab4ad47ff2dc |
axon-metadata-traceId | 5f03d8c3-94aa-46b2-9d8b-ab4ad47ff2dc |
Command to publish message to kafka using console producer
kafka-console-producer.sh --bootstrap-server localhost:9092 --topic local.event
I’ve tried with all the profiles combinations. But for instance, you can use tracking-producer,subscribing-consumer
that are the ones in the README.md
My question is: Is the axon-kafka-extension example able to consume events from a kafka topic that have not been published by the axon framework?