Seeking help tracking down lost messages Kafka + Volume

In certain circumstances (usually due to high load frequency of messages) we’ve observed certain messages not being fired from the event bus and arriving within Kafka.
Are there any limited sized buffers within the framework that may result in the queue (buffer) getting filled and thereby dropping messages?

Possibly related to the above, how can we reduce the number of KafkaProducers to only operate over a single thread? I believe in most applications having only a single Producer per application instance is desirable… at the very least in our case of using transactions within Kafka we can see conflict with ids due to this behavior.

Any references within the framework source code is highly appreciated to helping me narrow down the root cause.

Hi Michael,

For readers of this question, it is important to tell them you are mostly using Kafka as the Event Bus within the Axon application entirely.
Application publish on the Axon.Kafka topic and read back from it, without any direct relation to the RDBMS Event Store underneath it.
This makes it so that you are using a SimpleEventBus internally, not the EmbeddedEventStore/AxonServer implementations.

Any how, regarding your buffer question, no there are none.
Added, it feels like you are not sure where the event “drops” between event publication and having the KafkaProducer publish it to the Axon.Events topic.
Firstly, I would start trying to figure out where it is dropped and for that I think adding monitoring is the first step; otherwise everybody is just guessing.

To that end the MessageMonitors Axon provides would be suitable as a first stab, I think.
You can either use DropWizard Metrics or Micrometer as the implementation of metrics.
Added, the KafkaPublisher also allows the addition of a MessageMonitor through the Builder.
This monitor is not auto configured for you at the moment, due to the extension’s RC state.

Let us know, after some further monitoring, where you see the message being dropped within the framework.
If it’s not within the framework, I would have a look at the Kafka set up you are using.

Hope this’ll bring you the specific issue!
And, if it is an issue, I’d like to ask you to add it as such here.

Cheers,

Steven van Beelen

Axon Framework Lead Developer

AxonIQ

twitter-icon_128x128.png

Thank you for clarifying our usecase, as stated above we are using the SimpleEventBud for Kafka.

Yesterday we dug much deeper on this issue. At this point it does not appear to be an issue with Axon … but likely a configuration problem yet the it is still not clear and today we’ll be debugging kafka producer responses.

Looking over the following

`

private Map<Future<RecordMetadata>, ? super EventMessage<?>> publishToKafka(List<? extends EventMessage<?>> events,
                                                                            Producer<K, V> producer) {
    Map<Future<RecordMetadata>, ? super EventMessage<?>> results = new HashMap<>();
    events.forEach(event -> results.put(producer.send(messageConverter.createKafkaMessage(event, topic)), event));
    return results;
}

`

We’ve confirmed that the number of events arriving include all expected 8149 events (with the last one being SyncEnded that is dropped with large volumes). We Also confirmed that results before returning contains all 8149. Never-the-less SyncEnded (the last message in the Array is lost)

The same code with 256 items works perfectly well with no dropped events.

Today I will investigate the KafkaProducer in depth and will certainly follow up with an Issue if one is found.

The one thing I was not able to understand was how/where does eventBus.publish() aggregate events into an array before they are sent to AbstractEventBus. I expected to see 1 event at a time but found the full list. This is not an issue but would like to have a deeper understanding of how this works.

Solved.
Increasing the message-size limit was the trick.
Unfortunately there was no logging off the exception. I think Axon could benefit from creating their own Interceptor that captures write exceptions and associates them or logs them for the user. This might have overhead so possible a dev profile only. Food for thought.