Kafka publisher guarantees

Hello guys!

Could someone please help me understand which guarantees we have when publishing events to Kafka topic?

Are duplicates possible? Is it possible that some events are lost?

Thank you,
Alexey

Hi Alexey,

This somewhat depends on your set up, but let’s assume you’re using the default provided approach.
Thus, you’re using the latest Axon Framework release (4.1.2) with default settings, storing the events in Axon Server / a RDBMS solution.
Additionally, you’ve wired up the Axon Kafka Extension (4.0-RC2, thus not a final release yet), utilizing the KafkaPublisher it provides to publish the events.

If this is the case, then Axon will make sure that when you publish an event, that it will always first be stored in the Axon Server or in to your RDBMS solution.
Second to that will come the publication process to all SubscribableMessageSources of which the Event Bus is an implementation.

The KafkaPublisher uses once such SubscribableMessageSource to subscribe it self to that source, aka the Event Bus.

Now, when it comes to duplicates, the Event Store will ensure you that duplicate events cannot be stored.
As storage will come prior to publication, you are thus ensured that you do not have duplicates on your Kafka topic.

When it comes to event loss, there is one scenario imaginable that this might occur, and that is when the SubscribableMessageSource errors out upon providing the event or when the KafkaPublisher fails internally in one way or another.
The SubscribableMessageSource will only receive the message once it occurs and does not keep track which have been dealt with.
Thus ending up in an exceptional state will cause a loss of the event.

The changes of this occurring are however infrastructure configuration related I think.
The default SubscribableMessageSource, thus the Event Bus, will very, very likely not be the culprit when this occurs.
Instead, I’d assume this happens only when the KafkaPublisher fails to publish on the topic.
And if this is the case, I’d check whether the topic has been configured correctly.

Hope this sheds some light on the situation for you Alexey!

Cheers,
Steven

Hello Steven!

Thank you for the explanation!

It seems, that there are no guarantees then. And we need our own publishing mechanism as we cannot tolerate message loss.

Alexey

Hi Alexey,

At the moment that assumption is fair, but I hope I made clear that the fault scenario will not lie within the framework code but with the local configuration.
That might very well influence any decisions made upon the usability of this approach, hence why I am emphasizing it.

I’d also like to state that it should not pose to much of an effort to make the KafkaPublisher us a StreamableMessageSource, for which we can have the requirements you’re looking for.
As AxonIQ, we are (obviously) putting more development effort in to Axon Server then the Kafka Extension, for the sole reasons Axon Server provides a grouped solution for every type of message, whilst Kafka is limited to being a (very good) event bus.
Axon Server on that end also gives you the guarantees you’re looking for, thus making it a fair candidate to consider if you would ask me.

Regardless, we are still investing in connectors like Kafka.
A feature request to provide this guarantee would be a very reasonable thing to employee.
If you feel strongly about this, we would very much applaud if you’d start an issue for this.
Additionally, if you have the time, contributing the solution would even be better.

Any how, let us know whether such an adjustment would fill the hole you feel is obstructing you.

Cheers,
Steven

Hello Alexey,

I would like to just point out that message delivery problems, should be dealt inside the domain itself, meaning you should be aware that some event might not be published and have solution for it in places where it is essential for the domain, it is perfectly reasonable thing to assume and try to find peace with.

Sometimes i do not care if the user’s order delivery was not started and i can just pick filter not started ones manually and replay the order saga myself instead of doing it automatically, but sometimes i do care if some part of the process did not went through during some time period and i can replay it based on the axon’s deadline manager, which could introduce different logic in itself as sometimes the non-delivered events may cause invalidation or be just - delayed.

Hi Robert!

I thought, that the way how Axon publishes events to Kafka means that this publication is considered to be outside of the domain and is not controlled by it. It seems like a side effect, provided by the framework infrastructure. Like for monitoring, audit and so on. So, it would be strange to use Sagas for this and there is not logical place for it. Also, Sagas mean request-response, but we often need just one-way and it is not clean how Sagas can help? I think, Sagas should start as Sagas, publication should continue as publication.

At the same time, I have zero practical experience with Axon, so my question may look stupid :slight_smile: Documentation doesn’t help much.

As in the case of my previous question, it seems that I look from the wider scope, whereas Axon has application-focused scope. This is why thought might look strange for Axon logic.

Alexey

Hey Alexey.

I said that completelly out of Axon scope. Thing is sagas may communicate with the external system or may not. The triggering any action in the domain is made by the command bus, and command bus can fail or external system can fail.

Once back in time I’ve read about OOP and it changed my thinking a bit about the event driven things. Every object behavior is driven by invoking the methods on our objects let it be simple invocation or listening to some event, it does’t really matter. Thing is that even if you use pure method invocation it might not always be invoked, lets say in 3 nanoseconds, application crashes and some method was not invoked in downstream object that was meant to continue some bigger process. This is exactly the same situation as we have with the event/command buses which are well, meant to have failures.

With that being said i would also say that it may happen really rarely, 99.9% of times everything will be fine and even if you do not receive something to drive some process, the process might be started by its own, because the user themself re-published some action.

Extracting the communication between objects from pure method invocation to the event driven approach gives you this big thing in distributed called atleast once delivery, meaning that even if something has not been properly published, the kafka or any other thing will try to make sure that it will publish it atleast once.

And as i said, it should not be thought strictly within axon context as it is more generic kind of problem here, and our domain should take care of such situation. With that in mind if you build such system that is prone to theese kind of problems, you could pick literally any event driven framework and easily rewrite your domain using new framework without concerns about how they do dispatching.

I have yet to seen problems with axon on delivery side of things, however as Steven said, axon has its own dispatching system, and i think that they want to have all in one framework and they will not focus on the kafka side of things, even thought the axon in itself is really open for extension.

Cheers

Hey Robert, Alexey,

First off, great discussion going off here.
A lot of things to learn from this for anyone who reads it I’d say!

I am not going to add anything new though, I just want to react to Robert’s last point:

I have yet to seen problems with axon on delivery side of things, however as Steven said, axon has its own dispatching system, and i think that they want to have all in one framework and they will not focus on the kafka side of things, even thought the axon in itself is really open for extension

The assumption here is correct, that Axon Framework as a repository aims to provide the building blocks to help out with CQRS, DDD and Event Sourcing, and importantly in this discussion, Message Driven Microservices.
We however see great benefit in providing extension/connectors for much sought after option, of which Kafka is an example.
That said, we have been spread thin at AxonIQ and even though it is open source, we have not had that many contributions to the Kafka Extension in the last six months.

This left it in a somewhat dormant state, something we are not to pleased about.
We are looking in to how to improve the functionality of the extension, so I would ask to stay tuned in that respect.
Any ideas or contributions to the extension are, in the mean time, more than welcome.

Cheers,
Steven

Hello Steven.

Ok, thank you for these information. We are not using Axon yet, but want to. Some time later will, probably, investigate it deeper and decide about best way to do that. For now it is enough to know that some work is needed here.

Thank you.

Alexey

Hi Steven,

I would like to volunteer for implementing this, since I also need reliable event publishing on Kafka. I understood that Axon by itself is reliable, it is just the “bridge” between Axon world and Kafka world where events get lost, for example while the connection to Kafka is unavailable.

Let me briefly point out what I understood of what needs to be changed, just to make sure that I am not thinking completely along the wrong lines:

  • The KafkaPublisher uses a StreamableMessageSource instead of a SubscribableMessageSource.

  • From that streamable message source, the kafka publisher obtains a BlockingStream using a tracking token. That tracking token is obtained from a TokenStore that is provided to the kafka publisher.

  • From the blocking stream, the kafka publisher gets a Java Stream. On this, a consumer is registered with forEach. The consumer sends each event to Kafka and saves the event’s tracking token into the token store.

  • The KafkaAutoConfiguration which instantiates the builder to produces the kafka publisher bean does not add an EventBus bean but an EventStore bean (as a streamable message source) to the builder instead. It further also adds the TokenStore bean to the builder.

My assumptions are:

  • It is up to applications, not the Kafka extension, to ensure that the EventStore bean and TokenStore beans exist.
  • The Kafka extension (in the end, the kafka publisher) will only support a streamable message source but not a subscribable message source. In particular, it is not required that it is possible to configure by a property which type of message source to use.
    Does this sound reasonable? If so, I will fork the kafka extension repository, implement the changes, and create a pull request.

Thanks and regards,
Andre

Hi Andre,

Your explanation sounds reasonable, but more so from how it’s currently implemented.
I do not directly see what you mean to adjust to the current Axon Kafka Extension given your description.

Regardless, any intentions from anybody to help contribute to the Axon Kafka Extension are very much appreciated!
We would thus look forward to any pull requests you would provide.

In doing so, it is highly beneficial for us at AxonIQ if you provide a thorough description of what your pull request aims to improve or resolve.
Ideally even, you create an issue beforehand describing the problem and potentially a proposed fix; this makes discussions around your suggestions more straightforward at our end.

Cheers,

Steven van Beelen

Axon Framework Lead Developer

AxonIQ
Axon in Action Award 2019 - Nominate your project

Hi Steven,

what I described is my proposal for the to-be implementation, not the current implementation, in terms of reliable sending over Kafka, unless I have overlooked something or got things wrong.

I have created an issue for this: https://github.com/AxonFramework/extension-kafka/issues/10

If I am wrong and event tracking with the Kafka extension already works with the current implementation, please let me know.

Otherwise I will go and implement it and create a PR for my changes.

Thanks,
Andre

Hi Steven, hi Andre, hi guys,

I just provided a PR implementing the tracking/subscribing switch of Kafka Extension which controls if it behaves as previously defined or sends messages in a new transaction, as described by Andre.
I used event processors to support different modes (instead of re-implementation the processor semantics in the extension itself). Depending on the switch the tracking/subscribing modes are used (subscribing remains default).
In order to cope with possible errors, I added an invocation error handler, re-throwing exceptions.

I would appreciate if you find some time to review the PR.
https://github.com/AxonFramework/extension-kafka/pull/11

Kind regards,

Simon