Load balancing Kafka consumers with separate micro service instances without Axon server, fails without custom segment identifier

Greetings,

We have a requirement where we need to run multiple instances of a spring boot micro service in separate nodes(or jvms) and they have to consume AXON event messages from a Kafka topic.
We do not use AXON server here and use a Postgres for JPATokenStore.

The consumers were part of same consumer group (group id)

We tried the default configuration

  1. Default JpaTokenStore
  2. Default SequencyPolicyPerAggregate
  3. Batchsize 1 and thread count 1

With that configuration only one of the consumers receives messages from Kafka. When we kill that instance, the one among the remaining instances will start consuming messages. At any point of time only one among the instances were able to consume messages, so no load balancing.

We investigated and figured out that the services are contending for a lock in the token_entry table in the JpaTokenStore.

Hence we tuned the “claimTimeOut” parameter to 500ms from the default value of 5000ms. Now all the services started receiving messages from the Kafka topic, however we identified message loss in the consumer. Not all messages produced by the producer were consumed. Also there were lot of exceptions in the Kafka consumer during rebalancing of partitions (ConsumerUtil.onPartitionAssigned() method)

We also realized that claimTimeOut is not the appropriate tuning parameter. Hence we took a different approach, and assigned unique segment identifiers to each instance in a distributed fashion. To support the segment id per instance, we extended the JpaTokenStore and override the methods “initializeTokenSegments” and “fetchTokenSegments” and return the segment id of the instance. This way each instance has a separate record in the “token_entry” table identified by their segment ID.

Still we had message loss and then we identified the root cause for the message loss is because of the Segment.matches() method. This method returns false for all messages which fails the mask check with hash value of event identifier.
To circumvent this problem we created a new sequencing policy by implementing the SequencingPolicy interface and return the segment identifier of the instance from the sequencing policy class. This way the segment id mask check in the “Segment.matches” method will return true for any message consumed by the consumer instance.

Our solution is available in the below repository (it is an extension of similar example by marinkobabic/axon-kafka-example). I removed the dependency on Axon server in the Gradle.
https://github.com/dharanikumarp/axon-kafka-example/

The key classes to look for are

  1. MySegmentId
  2. MySequencingPolicy
  3. MyTokenStore

We would like to know whether our approach is a valid way to achieve load balancing
OR
is there an alternative configuration option available within Axon Kafka or messaging module to support load balancing.

Thanks & Regards,
Dharani Kumar P

Hi Dharani,

First, I want to point out that the Axon Kafka Extension has not received a final release yet.
I would thus be hesitant to use it for any production environments just yet.

Then, for the issue at hand.
Axon ensures that a given Tracking Event Processor will handle each and every events once.
It does so by keeping track of the events it has handled through the Tracking Tokens.
Additionally, to be able to perform any work, a given Tracking Event Processor thread is required to have a claim on the Tracking Token.

Thus, simply scaling out your applications to have a given Tracking Event Processor on each node does not automatically scale the work.
It simply increases the number of threads, not the number of segments for a given Tracking Token.

To be able to parallelize the work among a given Tracking Event Processor, you thus need to segmentize the Tracking Token into several parts.
Upon start-up of an application, you can configure a given Tracking Event Processor to split it’s token upon creation (thus if no Token is present yet).

You can do this by using the TrackingEventProcessorConfiguration.forParallelProcessing(int).

The provided int chances the number of threads but also the number of segments of it’s Tracking Token.

For a live system or an application for which the Tracking Tokens are already stored, you will have to use the API provided on the TrackingEventProcessor to split and merge tokens.

Granted, the API is provided for you, but the TrackingEventProcessor does not provide any delegation between your nodes to correctly split and merge.
Additionally, note that the split/merge operations are an Axon Framework 4.1 feature.

If you require this functionality out of the box, I do recommend to use Axon Server.
Axon Server’s UI provides operations to split and merge the tokens of a given Tracking Event Processor, and delegates the right operations between any number of application nodes running the given TEP.

Concluding, I wouldn’t take the described custom route you have taken, as you are reintroducing the need correctly build up the delegation of messages between threads which Axon Framework gives you out of the box.
Configuring for several segments can additionally be done up front or on a live system through the split/merge API.
The latter is greatly simplified though Axon Server, thus omitting the need to build this load balancing work yourself.
As such, if you are seeking after simple load balancing on the event handling side of things, I again recommend to take a look at Axon Server.

Hope this sheds some light on the situation.

Cheers,
Steven

Steven,
I wanted to jump in on this conversation. Who owns the Kafka extension for axon and do you know what the approximate roadmap for it reaching 1.0? In our case it would be quite costly to migrate away from using Kafka.

Thank you,
Michael

Hi Michael,

AxonIQ currently controls all the Axon Framework Extension repositories, thus including the Axon Kafka Extension.
I am not able to give you any exact dates around it’s 4.0 release though (keeping everything on the same minor release scheme for clarity).

The concern you are voicing is definitely felt though; a full release should come soon.
It’s however a game of time and resources, and sadly contributions for the extension have been rather low as well.
Once a full release is under way, I can assure you we will notify users on the regular channels.
Checking the Axon Kafka Extension Github page is likely the best place to stay on track though.

Regardless, you can safely assume that we set the Axon Kafka Extension as a perfect fit for dispatching Events between services.
We however strongly believe in the benefit of a routing solution that does Commands, Events and Queries, thus exactly what Axon Server would bring you.
So, if you are looking to scale out your entire application landscape and not just your events through the Kafka extension, taking a look at Axon Server might be helpful.

Hope this clarifies things.

Cheers,
Steven

Hi Steven,

Thanks for your response and I immediately tried using the “forParallelProcessing” API from the TrackingEventProcessorConfiguration class.

However I was struggling to make it work as expected. I am wondering whether I am missing a critical component in the configuration.

Let me explain what I tried

In my Spring configuration (@Configuration annotated class), I created a bean method to return the “TrackingEventProcessorConfiguration” as below

@Bean
public TrackingEventProcessorConfiguration tepConfiguration() {
return TrackingEventProcessorConfiguration.forParallelProcessing(4);
}

My naive assumption was only this simple change would create a parallel processing with multiple threads and segments in the database. However I observed only one record in the “token_entry” table and one thread in the tracking event processor.

Hence I step debugged a while and realized that even though I return this TEPConfiguration bean, it was not picked up and there is another invocation of TEPConfiguration class through EventProcessingModule class.

So I tried to create a @Bean method to return “TrackingEventProcessor”. I was facing issues with constructing the TEP.Builder instance, because I was not able to instantiate and set eventHandlerInvoker in the Builder class, without which the Builder class fails on validation.

Hence I tried the following bean definitions (included the commented code blocks as well for more clarity)

@Bean
public Configurer configurer(EntityManagerProvider emp, TransactionManager tm) {
return DefaultConfigurer.defaultConfiguration(true);
// return DefaultConfigurer.jpaConfiguration(emp, tm);
}

@Bean
public EventProcessor tep(DefaultConfigurer configurer, KafkaMessageSource kms,
TrackingEventProcessorConfiguration tepConfig) {
// TrackingEventProcessor.Builder builder = TrackingEventProcessor.builder()
// .errorHandler(PropagatingErrorHandler.INSTANCE)
// .eventHandlerInvoker(new MultiEventHandlerInvoker(new SimpleEventHandlerInvoker())).

EventProcessingModule epm = new EventProcessingModule();
org.axonframework.config.Configuration configuration = configurer.buildConfiguration();
epm.initialize(configuration);
EventProcessingConfigurer epc = epm.registerTrackingEventProcessor(“MyProcessor”, c -> kms, c -> tepConfig);
EventProcessor tep = epm.eventProcessors().get(“MyProcessor”);
return tep;

// TrackingEventProcessor.Builder builder = TrackingEventProcessor.builder()
// .errorHandler(PropagatingErrorHandler.INSTANCE)
// .trackingEventProcessorConfiguration(tepConfig)
// .messageSource(kms)
// .eventHandlerInvoker(new SimpleEventHandlerInvoker())
// .name(“MyProcessor”)
// .rollbackConfiguration(RollbackConfigurationType.RUNTIME_EXCEPTIONS)
// .transactionManager™;
//
// return TrackingEventProcessor.builder().trackingEventProcessorConfiguration(tepConfig).name(“MyProcessor”).build();
}

Even with this code I find that the TrackingEventProcessor was null after the call “EventProcessor tep = epm.eventProcessors().get(“MyProcessor”);”. The registerTrackingEventProcessor does not add to the eventProcessor list the new registered processor. So I am not able to achieve the desired parallelism.

Could you please let me know how to use the “forParallelProcessing()” API?

Thanks & Regards,
Dharani Kumar P

Hi Dharani,

Sorry for adding the missing link how to use the TrackingEventProcessorConfiguration to my earlier response.

Take a look a the EventProcessingConfigurer, especially the registerTrackingEventProcessor(String, Function<Configuration, StreamableMessageSource<TrackedEventMessage<?>>>,Function<Configuration, TrackingEventProcessorConfiguration>) method contained in it.

The API of the “registerTrackingEventProcessor” method is such that the first parameter is the name of the event processor.

The second is a function giving you Axon’s Configuration and expecting a “StreamableMessageSource” (this can be the EventStore for example) and the last allows you to set your custom TrackingEventProcessorConfiguration.

Anyhow, you should not be required to build the TrackingEventProcessor yourself, as the EventProcessingConfigurer provides the necessary components to configure any Event Processor in your application.

Hope this helps!

Cheers,
Steven

Hi Steven,

Thanks for your reply and sorry for my delayed response.
I will check this out today and post updates

Regards,

Hi Steven,

After a long time, I got a chance to revisit this problem.

Earlier the axon configuration were externalized through the application.yml file.
I removed those and programmatically configured the TrackingEventProcessor as below

@Bean
public KafkaMessageSource kms(Fetcher fetcher) {
return new KafkaMessageSource(fetcher);
}

@Autowired
public EventProcessingConfigurer configureEPC(EventProcessingConfigurer epc, KafkaMessageSource kms) {
return epc.registerTrackingEventProcessor(“MyProcessor”,
c -> kms,
c -> TrackingEventProcessorConfiguration.forParallelProcessing(8));
}

All these changes tested without the support of Axon server (I excluded Axon connector in gradle).

Kafka Configuration
3 brokers and 8 partitions in the topic

Observations
1.) After the initial clean DB state (empty token_entry table), the first JVM started 8 threads.
2.) The second JVM had only one thread started, it aligns with the expected behavior that TEPConfiguration.forParallelProcessing() will work only for the initial token creation and split the tokens among segments only if the tokens were not created.
3.) I ran the producer and pumped about 20 events to the topic. Only 3 events in total were consumed by the JVM node 1 in 3 different threads. Remaining events were never consumed by the event handler (possibly failed at the segment match check).
4.) The second JVM instance did not receive a single message, which is the expected behavior as the second instance
was not able to obtain lock on the token_entry segments.

Question.
Let’s say we run only one JVM instance (no multiple nodes) with n threads, then in this scenario, I expect all the
messages to be consumed. But that’s not the case. Why?

To confirm the behavior, I ran only one JVM instance with 2 threads. Pumped about 20 messages to the topic multiple times and found that there is a consistent message loss in the consumer. Only 2 or 3 messages are consumed. Most of the messages were lost, again I assume they failed the segment match check (I did not verify)

How to prevent the message loss here?

I also tried with FullConcurrencyPolicy

@Autowired
public EventProcessingConfigurer configureEPC(EventProcessingConfigurer epc, KafkaMessageSource kms) {
return epc.registerTrackingEventProcessor(“MyProcessor”,
c -> kms,
c -> TrackingEventProcessorConfiguration.forParallelProcessing(2)).registerDefaultSequencingPolicy(c -> new FullConcurrencyPolicy());
}

and then ran two instances and found four entries in the token_entry table with segments 0 to 3. Both instances received messages but there were consistent message losses. Only 3 to 4 messages were consumed in total in two JVM nodes.

Any suggestions?

Thanks,
Dharani Kumar P

Hi Dharani,

From Axon’s configuration stand point I don’t see anything wrong.
As such I would personally first look at the Kafka set up you have in place.

So, you state you’ve “pumped” 20 events on to the topic.
How did you do this “pumping”?
Are you using the Axon Kafka Extension as the event producer as well or are you using an entirely different set up to this end?

Cheers,
Steven

Hi Steven,

I am using the example which was forked from marinkobabic/axon-kafka-example. My changes are available here
https://github.com/dharanikumarp/axon-kafka-example/

branch: multi-threaded-tep

I am using the producer which is again based on Axon Kafka extension.

Kafka is running locally with 3 brokers (localhost:9092, 9093, 9094) and the topic (mydefaulttopic) has eight partitions.

Here is an excerpt from the log in the consumer application

Screen Shot 2019-08-27 at 12.04.45 PM.png

Hi Dharani,

The problem becomes quite interesting now, but I am still not entirely sure what to suggest you.

My hunch tells me you might have found a bug, although I am note sure of this.

Additionally, bugs like this are to be expected since, as I said in my first reply, the Axon Kafka Extension is not fully released yet.
Regardless, I feel we might be able to help you out here.

First thing which pops to mind is the following question for you.
Does the entire set up work if you do not run with multiple threads and segments?

Thus, I’d suggest to test whether dropping all existing tokens and the parallel-processing configuration, whether the issue is resolved then.
If yes, then we can at least rule some things out.

Cheers,
Steven

Screen Shot 2019-08-27 at 12.04.45 PM.png

Hi Steven,

I changed the configuration as below and started with the clean database and kafka. Same as earlier 3 brokers and 8 partitions.

public EventProcessingConfigurer configureEPC(EventProcessingConfigurer epc, KafkaMessageSource kms) {
return epc.registerTrackingEventProcessor(“MyProcessor”, c -> kms);
}

Yes all the 20 messages sent by the producer is all received at the consumer. I executed the producer multiple times to confirmed the behaviour.

So it comes down to the parallelProcessing configuration API and event distribution among the TEP threads.

Thanks,
Dharani Kumar P

Hi Dharani,

Thanks for verifying this for me, much appreciated.
Sounds like we’ll be adding a test case to verify this behaviour for the extension, followed up by a fix to allow parallel processing.

For now though, I would suggest not to set up your Tracking Event Processor (TEP) with the Kafka message source to run in parallel.
To utilize parallelism for you TEP’s, I suggest to use the Event Store directly as the message source.
Either by sharing the Event Store database or simpler by using Axon Server.

Cheers,
Steven