Debounce events in eventHandler

Hi,

Currently I’ve setup an EventHandler in a separate service (kotlin) which maps and then publishes events to a Kafka topic. I’am listening for DomainEventMessage<*>, map it to a generic format and post it to the topic.

During startup we create some testData which lead to a lot of command and events. Therefore the eventHandler publishes a lot of duplicate messages to Kafka.

What is the best way to debounce these? I’am thinking of creating a kotlinx.coroutines.flow of the incoming messages and use default debounce functions, but there might be better ways?

Regards,

Frank

Hi Frank,

I’m not sure I’ll get the whole picture. Typically, when multiple systems are involved, exactly once is not possible, and at least once is the best delivery guarantee. Which means you might get duplicates on the Kafka topic, where consumers need to be able to deal with.

However in this case, the events are read from where, before being send to Kafka? Typically you would use a streaming event processor, and store the token somewhere durable, so you don’t start from the beginning of the event stream each time. It seems like you do might start from the beginning each time. Is that assumption correct?

Morning Gerard,

To sketch the bigger picture: this is actually a continuation of Event interceptor not receiving events - #6 by Gerard

So instead of using an interceptor I created a separate projection service (new ProcessingGroup) and the read side of the application solely for the purpose of posting ‘event summaries’.

This projection service listens to every DomainEventMessage, does some filtering and maps them to a notification which is actually a summary of each event, so we don’t publish our internal event structure.
An example notification contains “Employee 123, Deleted” in order to trigger our GraphQL server to fetch the latest state from the read model and in certain conditions push it to our clients. There is also an aggregate versionNumber (retrieved using @SequenceNumber) in the message.

Both mechanisms (using the interceptors and projection) work fine, but there is are lot messages posted, especially since after a new deployment in out test environments we fire of a bunch of commands to have some initial test data. This leads to a lot of events, which are picked up by the projection and posted to kafka.

At this moment I’am trying to limit the amount of kafka message, since it is no use first publishing a Create and 5 updates when the last event is a Delete. That’s why I’am debouncing now.

I’ve written an own DeBounce implementation based on a ConcurrentHashmap and a scheduler. This denouncer is fed with events coming in by the projection service in the read model. By choosing the key to be a combination of aggregateName and aggregateId, notifications for the same aggregate are overwritten and therefore the amount of message is limited. The scheduler triggers every 0.7 seconds, reads the map, clears the map and post the message to kafka. There is some housekeeping around the map to maximise the amount of messages and make sure it doesn’t use to much resources.

This reduces the amount of messages posted from 2400 to 200. Which is fine. My question in short is: Is there is a better way of achieving this functionality?

A bonus feature is to make sure the message is posted to Kafka, only when all other projections are processed. Currently there might be notification posted to Kafka and pickedup by the GraphQL server when it is not committed in the read model.

On the GraphQL side we could use the aggregate version to determine if the fetch result from the read model is up to date and drop the message if it is not, but I rather solve this eventual consistency aspect in this projection service.

I’am curious to hear your thoughts on this.

Regards,

Frank

Hi Frank,

The setup sounds a bit over-engineered to me. I’m not sure if you can use Axon Server, but if you can, this would simplify things a lot. In that case, you could handle the queries directly by spring-graphql or Netflix-dgs, and have a 1-to-1 mapping to Axon query messages.

Does the separate projection service use a token store? As if it would it would not send similar messages again.

Hi Gerard,

Yes I agree using axon-server would simplify things, but unfortunately that is not an option.

Regarding the token store, the projection service uses a tracking event processor and it also checks if it is in replay mode so I would not expect it processing duplicate events.

Thanks,

Frank

1 Like