We have a requirement where we need to run multiple instances of a spring boot micro service in separate nodes(or jvms) and they have to consume AXON event messages from a Kafka topic.
We do not use AXON server here and use a Postgres for JPATokenStore.
The consumers were part of same consumer group (group id)
We tried the default configuration
- Default JpaTokenStore
- Default SequencyPolicyPerAggregate
- Batchsize 1 and thread count 1
With that configuration only one of the consumers receives messages from Kafka. When we kill that instance, the one among the remaining instances will start consuming messages. At any point of time only one among the instances were able to consume messages, so no load balancing.
We investigated and figured out that the services are contending for a lock in the token_entry table in the JpaTokenStore.
Hence we tuned the “claimTimeOut” parameter to 500ms from the default value of 5000ms. Now all the services started receiving messages from the Kafka topic, however we identified message loss in the consumer. Not all messages produced by the producer were consumed. Also there were lot of exceptions in the Kafka consumer during rebalancing of partitions (ConsumerUtil.onPartitionAssigned() method)
We also realized that claimTimeOut is not the appropriate tuning parameter. Hence we took a different approach, and assigned unique segment identifiers to each instance in a distributed fashion. To support the segment id per instance, we extended the JpaTokenStore and override the methods “initializeTokenSegments” and “fetchTokenSegments” and return the segment id of the instance. This way each instance has a separate record in the “token_entry” table identified by their segment ID.
Still we had message loss and then we identified the root cause for the message loss is because of the Segment.matches() method. This method returns false for all messages which fails the mask check with hash value of event identifier.
To circumvent this problem we created a new sequencing policy by implementing the SequencingPolicy interface and return the segment identifier of the instance from the sequencing policy class. This way the segment id mask check in the “Segment.matches” method will return true for any message consumed by the consumer instance.
Our solution is available in the below repository (it is an extension of similar example by marinkobabic/axon-kafka-example). I removed the dependency on Axon server in the Gradle.
The key classes to look for are
We would like to know whether our approach is a valid way to achieve load balancing
is there an alternative configuration option available within Axon Kafka or messaging module to support load balancing.
Thanks & Regards,
Dharani Kumar P