Axon kafka integration question

The documentation for Axon 4 on kafka integration is so out of date. The APIs have changed. I am having trouble to understand Consumer portion. On the producer side, we have to configure a producer factory to provide to KafkaPublisher and start it. I would assume the same on the consumer side, i.e. configure a consumer factory and provide to AsyncFetcher. There is also a KafkaMessageSource which takes in AsyncFetcher. I believe I need to configure it too. The doc says no need to start AsyncFetcher directly (Checking the code, it is started by KafkaMessageSource.openStream, so the doc should not even mention this to confuse people). Now my question is: I got a fully configured message source, how do I further config the axon configuration ? Or I could use a spring @Bean? (BTW, I tried to just add entries to application.properties. It is creating many more issues.)

Maybe this example helps

https://github.com/marinkobabic/axon-kafka-example

Marinko,

Thanks for the quick reply, The example you gave is for version 3. But regardless it is helpful. I have no problem of sending event to Kafka. My issue is how to configure event processor group’s source. I saw you have the properties defined:
axon.eventhandling.processors."[MyProcessor]".source=kafkaMessageSource. Maybe that is what I am missing? (My processing group defaults to the package name). Is there any annotation for source specification?

I don’t think that there is an annotation for the source. You can try using the configuration and see if it works. If you don’t have individual sources I think you can set a default for all procesor groups.

I have to explicitly specify processor group and use property file to specify source. (Really wished there is annotation). On other thing is the consumer won’t pick up existing messages in the topic, only the new messages which arrived after the new consumer started. Is this a Kafka related configuration which I need to figure out, or axon is doing something?

Hello,

I have this demo application available (Kotlin) https://github.com/idugalic/digital-restaurant/blob/master/drestaurant-apps/drestaurant-microservices/README.md

The order of events matters. For example, we might expect that a customer is created before anything else can happen to a customer. When using Kafka, you can preserve the order of those events by putting them all in the same Kafka partition. They must be in the same Kafka topic because different topics mean different partitions.

I have configured Kafka instance to crate only one topic (axon-events) with one partition initially.

If all consumers are from the same group, the Kafka model functions as a traditional message queue would. All the records and processing is then load balanced. Each message would be consumed by one consumer of the group only. Each partition is connected to at most one consumer from a group.When multiple consumer groups exist, the flow of the data consumption model aligns with the traditional publish-subscribe model. The messages are broadcast to all consumer groups.

I have configured (micro)services to use publish-subscribe model, by setting unique consumer group id for each (micro)service.

My Axon event processor has been configured to use Kafka message source: https://github.com/idugalic/digital-restaurant/blob/master/drestaurant-apps/drestaurant-microservices/drestaurant-microservices-query/src/main/resources/application.yml.

Please note that this is a demo. Using one partition is a radical approach, but I hope you have better impression in what direction you want to go.

Best,

Ivan

Does anyone has Axon4 + Kafka + Springboot Java example? Please share.

Hi Ivan

Thanks for the example !

I am working on Java - spring boot. Referring to your example, I am able to run commands and saga part. Somehow at query side projector is not getting invoked.

I am using same application.yml config in your example and empty Axonconfig.

Hi Prashant,

My guess is:

  1. Please pay attention to processors configuration ( https://github.com/idugalic/digital-restaurant/blob/master/drestaurant-apps/drestaurant-microservices/drestaurant-microservices-query/src/main/resources/application.yml#L19 ). On the query side I have only one processor named query which is configured to use Kafka as a message source.

axon:
eventhandling:
processors:
query:
mode: tracking
source: kafkaMessageSource
2. All event handlers are assigned to query processor (https://github.com/idugalic/digital-restaurant/blob/master/drestaurant-apps/drestaurant-microservices/drestaurant-microservices-query/src/main/kotlin/com/drestaurant/query/handler/RestaurantEventHandler.kt#L18)

Best,
Ivan

Thanks Ivan ! It works now :slight_smile: