Axon Example is not reading from kafka

Hi I’m new to axon, I’ve been checking the documentation, the forum and stackoverflow, I’ve found several questions relating to this topic, but almost all of their answers point to use the example. However, I’m not able to consume events from kafka using the example.

What I aim

I was thinking of integrating axon framework with an existing platform to implement some new functionality that requires an Event Store. Since it will need to communicate with non axon services, I was thinking of using kafka to communicate the events among non-axon services and axon services.

What I’ve tried

  • I’ve cloned the axon example
  • Followed the README.md

Everything works. Then I commented the scheduled tasks, so they do not send any message to kafka anymore. After, I rerun the application and I manually send a message to the kafka topic using the kafka console producer. Not working. I’ve realized that the console producer cannot send headers, so just in case the headers have something to do with it. I’ve used a desktop application to send a message to the topic using also the headers. I’ve tried both, using them with the same values published by the axon framework in an empty kafka(docker-compose down & docker-compose up -d), and updating the id’s. Neither worked.

BankClient

/**
 * Bank client sending scheduled commands.
 */
@Component
class BankClient(private val commandGateway: CommandGateway) {

    private val accountId = UUID.randomUUID().toString()
    private var amount = 100

    /**
     * Creates account once.
     */
//    @Scheduled(initialDelay = 5_000, fixedDelay = 1000_000_000)
    fun createAccount() {
        commandGateway.send<CompletableFuture<String>>(CreateBankAccountCommand(bankAccountId = accountId, overdraftLimit = 1000))
    }

    /**
     * Deposit some money every 20 seconds.
     */
//    @Scheduled(initialDelay = 10_000, fixedDelay = 20_000)
    fun deposit() {
        commandGateway.send<CompletableFuture<String>>(DepositMoneyCommand(bankAccountId = accountId, amountOfMoney = amount.toLong()))
        amount = amount.inc()
    }
}

The content of the event I’m sending

<org.axonframework.extension.kafka.example.api.MoneyDepositedEvent><bankAccountId defined-in="org.axonframework.extension.kafka.example.api.MoneyAddedEvent">c6f4cc3c-91bb-48d2-abf6-a8bcd4bb3bc2</bankAccountId><amount defined-in="org.axonframework.extension.kafka.example.api.MoneyAddedEvent">100</amount><bankAccountId>c6f4cc3c-91bb-48d2-abf6-a8bcd4bb3bc2</bankAccountId><amount>100</amount></org.axonframework.extension.kafka.example.api.MoneyDepositedEvent>

The headers:

key value
axon-message-aggregate-id c6f4cc3c-91bb-48d2-abf6-a8bcd4bb3bc2
axon-message-aggregate-seq
axon-message-aggregate-type BankAccount
axon-message-id 2d30b594-4d0e-4a44-b15b-cb133829742b
axon-message-revision null
axon-message-timestamp 2021-06-24 13:39:20.979
axon-message-type org.axonframework.extension.kafka.example.api.BankAccountCreatedEvent
axon-metadata-correlationId 5f03d8c3-94aa-46b2-9d8b-ab4ad47ff2dc
axon-metadata-traceId 5f03d8c3-94aa-46b2-9d8b-ab4ad47ff2dc

Command to publish message to kafka using console producer

kafka-console-producer.sh --bootstrap-server localhost:9092  --topic local.event

I’ve tried with all the profiles combinations. But for instance, you can use tracking-producer,subscribing-consumer that are the ones in the README.md

My question is: Is the axon-kafka-extension example able to consume events from a kafka topic that have not been published by the axon framework?

Hi @bbodoque,

Recently we (mainly @Steven_van_Beelen) have been investing a lot of time on the extension-kafka.
Having said that, it saw the very first release (as you can see here) and is ready to be used!

Do you mind trying with latest and see if all works for you?
If not, please come back to us.

KR,

You can turn on TRACE level logging for the DefaultKafkaMessageConverter.
It will print out whether converting worked yes/no. That will give us a hint where in the process it fails.

It is also true that the message is required to contain Axon’s headers. Without those, it will not recognize it as an Axon Event Message and thus it cannot give it to the @EventHandler annotated methods with the same guarantees attached to other events.

Next to this, would you be able to share the version of Kafka and Axon you’re using? And, this producer script you’re referring too, is that something we could test out locally as well to replay the problem, if needed?

Hi,

good to hear about the release! I’ve tried the example again, but I keep seeing the same behaviour.

I’ve created a repo in github where I just cloned the kafka-extension, add a new commit to avoid the application to send messages to kafka, changed the remote and pushed the changes so you can use the code I have to replicate it. You can clone it from here. The only changes that have been applied to the example are the ones you can see in the commit I did(commit/644e6605b6db1c3d5172a3c9d81f10076d7e2761).

Then what I’ve done is to follow the README.md. I was not able to build the project using mvn clean package -f ./kafka-axon-example because I was getting an error related with sonar

[ERROR] Failed to execute goal on project axon-kafka-example: Could not resolve dependencies for project org.axonframework.extensions.kafka:axon-kafka-example:jar:4.6.0-SNAPSHOT: Failure to find org.axonframework.extensions.kafka:axon-kafka-spring-boot-starter:jar:4.6.0-SNAPSHOT in https://oss.sonatype.org/content/repositories/snapshots was cached in the local repository, resolution will not be reattempted until the update interval of sonatype has elapsed or updates are forced -> [Help 1]

But I was able to run build it from the parent directory. I skiped tests to go faster though.

So this are the steps to replicate the error.

git clone https://github.com/brjt23/kafka-axon-extension-discussion-3393.git
cd kafka-axon-extension-discussion-3393/
mvn clean package -DskipTests
docker-compose -f ./kafka-axon-example/docker-compose.yaml up -d
java -jar ./kafka-axon-example/target/axon-kafka-example.jar --spring.profiles.active=tracking-producer,subscribing-consumer

Then I send a message using the kafka-producer.sh of the dockerized kafka

docker exec -it kafka-axon-example_kafka_1 /opt/kafka/bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic Axon.Event

The message I send is

<org.axonframework.extension.kafka.example.api.MoneyDepositedEvent><bankAccountId defined-in="org.axonframework.extension.kafka.example.api.MoneyAddedEvent">c6f4cc3c-91bb-48d2-abf6-a8bcd4bb3bc2</bankAccountId><amount defined-in="org.axonframework.extension.kafka.example.api.MoneyAddedEvent">100</amount><bankAccountId>c6f4cc3c-91bb-48d2-abf6-a8bcd4bb3bc2</bankAccountId><amount>100</amount></org.axonframework.extension.kafka.example.api.MoneyDepositedEvent>

What I want to verify is that Axon is reading events that are not generated by Axon itself but consumed from a kafka topic.

Using the kafka-console-producer.sh I am not able to send axon headers. So it fails as expected. But I installed a tool called conduktor(I cannot add more links but its web is conduktor.io) that allows me to send also the axon headers of the message. The content of the headers I send are in my first message.