We want to restrict the Event Processor to check only for immediate previous events and not an entire stack of events

Hi Team,
We are using Axon for event souring by considering MongoDB as Event Store.

Here is the scenario: -
Initially, a ‘CreateEvent’ event is triggered to create a ‘trade’, we also have various other events like Insert, Update, Delete, etc. which are getting performed to achieve business functionalities.

As per need, grouping takes place on those trades, based on specific parameters.
Here, thousands of events are stored in the database with respect to each trade.

Problem:
To fulfill our functionality, only an immediate previous event must be checked if a new event occurs.
But unfortunately, the event processor checks all the previous events which are not required. This is a performance compromise.

Expected Solution:
We want to restrict the Event Processor to check only for immediate previous events and not an entire stack of events.

Let us know if this is a defect/limitation/feature of Axon
framework.

Kindly, help to achieve the required functionality through Axon.

Hi Kuldeep.

I’m not sure I get the question correctly. As can be seen in the reference guide, the initial token can be set to something different than the tail, so it doesn’t go through all the events.

However, from your question, it seems you don’t want to run the event processor all the time? Mentioning the ‘immediate previous events’ confuses me. When run distributed, the event processor might lag or temporarily not run. In those cases, you typically want the event processor to pick up where it was left previously. Is that also the case here?

1 Like

Added to the points made by @Gerard, I’d like to propose you do not construct CUD (create update delete) like events.
The benefit of this design is that your events depict what’s happening inside your system.
However, an “update” event does not describe what happened at all.
You are thus losing the opportunity to discuss within your team and business what’s happening when doing so.

2 Likes

Hi,
I tried to add following configuration to get events after 2022-11-18T17:05:30.00Z.
But gets all the events.
Someone can please guide?

    @Bean
    public TrackingEventProcessorConfiguration customConfiguration() {
        return TrackingEventProcessorConfiguration
            .forSingleThreadedProcessing()
            .andInitialTrackingToken(streamableMessageSource -> streamableMessageSource.createTokenAt(
                Instant.parse("2022-11-18T17:05:30.00Z")
            ));
    }


    @Bean
    public ConfigurerModule initialTrackingTokenConfigurerModule() {
        TrackingEventProcessorConfiguration tepConfig =
            TrackingEventProcessorConfiguration.forSingleThreadedProcessing()
                .andInitialTrackingToken(
                    StreamableMessageSource::createTailToken);

        return configurer -> configurer.eventProcessing(
            processingConfigurer ->
              processingConfigurer.registerTrackingEventProcessorConfiguration(

                    "my-processor", config -> tepConfig
                )
        );
    }

There are three things that might go wrong. The most probable is that the my-processor is not the group actually used. Did you configure it with an annotation like @ProcessingGroup("my-processor") on the class with the event handlers?

Another possibility is that the my-processor already existed, in which case it would continue from the stored offset. To work around that, you could give it another name, or remove the stored tokens.

It might be a bug, as you use MongoDB as event store, which might not properly create this token. In which case it would be interesting how you configured the Mongo event storage engine.

1 Like

Hi @Gerard
Here I want to elaborate on the scenario:

At first, I send a CreateGroup Request.

Logs are as follows:

20:16:45.020 INFO com.testing.classification.master.ClassMaster -
ClassMaster - Command: CreateGroupCmd 33591f70-da1a-49f5-9e46-87494e2570b6
20:16:45.076 INFO com.testing.classification.master.events.GroupCreatedEvt -
*********************** GroupCreatedEvt ***********************

The second time, I sent a CreateGroup request.

Logs are as follows:

20:17:04.177 INFO com.testing.classification.master.events.GroupCreatedEvt -
*********************** GroupCreatedEvt ***********************
20:17:04.181 INFO com.testing.classification.master.ClassMaster -
ClassMaster - Command: CreateGroupCmd 33591f70-da1a-49f5-9e46-87494e2570b6
20:17:04.182 INFO com.testing.classification.master.events.GroupCreatedEvt -
*********************** GroupCreatedEvt ***********************

The third time, I sent a CreateGroup request.

Logs are as follows:

20:17:31.521 INFO com.testing.classification.master.events.GroupCreatedEvt -
*********************** GroupCreatedEvt ***********************
20:17:31.513 INFO com.testing.classification.master.events.GroupCreatedEvt -
*********************** GroupCreatedEvt ***********************
20:17:31.515 INFO com.testing.classification.master.events.GroupCreatedEvt -
*********************** GroupCreatedEvt ***********************
20:17:31.517 INFO com.testing.classification.master.ClassMaster -
ClassMaster - Command: CreateGroupCmd 33591f70-da1a-49f5-9e46-87494e2570b6
20:17:31.521 INFO com.testing.classification.master.events.GroupCreatedEvt -
*********************** GroupCreatedEvt ***********************

In the whole process, the previous Events got triggered. I want to stop triggering the previous events.

Expected log for the Next Group Creation request: (for the third time, Sending Group Creation Request)

20:17:31.517 INFO com.testing.classification.master.ClassMaster -
ClassMaster - Command: CreateGroupCmd 33591f70-da1a-49f5-9e46-87494e2570b6
20:17:31.521 INFO com.testing.classification.master.events.GroupCreatedEvt -
*********************** GroupCreatedEvt ***********************

But the Actual log for the Next Group Creation request:

20:17:31.521 INFO com.testing.classification.master.events.GroupCreatedEvt -
********************* GroupCreatedEvt *********************
20:17:31.513 INFO com.testing.classification.master.events.GroupCreatedEvt -
********************* GroupCreatedEvt *********************
20:17:31.515 INFO com.testing.classification.master.events.GroupCreatedEvt -
********************** GroupCreatedEvt **********************
20:17:31.517 INFO com.testing.classification.master.ClassMaster -
ClassMaster - Command: CreateGroupCmd 33591f70-da1a-49f5-9e46-87494e2570b6
20:17:31.521 INFO com.testing.classification.master.events.GroupCreatedEvt -
*********************** GroupCreatedEvt ***********************

Can you suggest a solution to my problem?
Thank You

Hi,

how does your aggregate looks like?
Do you use @EventSourcingHandler or @EventHandler there? The latter is not correct. See differences between @eventhandler and @eventsourcinghandler .

Where is the event triggered (with apply)?
Did you test your aggregate with a unit test?

What confuses me is that every command has the same UUID. Is that intended?

Best regards,
JohT

2 Likes

Hi,
yes, @EventHandler is used.
(tries @EventSourcingHandler, but getting same result)
Here is the code:

    @EventHandler
    public void on(final GroupCreatedEvt evt)
            throws InvalidProtocolBufferException {

        this.cmId = evt.getId();
        final GroupCreated groupCreated = ProtoSerDer.fromJsonToProtoBuf(
                evt.getGroupCreated(), GroupCreated.class).orElseThrow();
        this.data.created(groupCreated);
    }

The event is triggered inside the @CommandHandler.
Tests are implemented as fixture test cases.

@Kuldeep, I’m not sure If I get you right. So i might ask you some further questions.

Is it possible, that an event is applied/sent while the aggregate is recreating its state?
This would be the case if you use an Event Sourcing Aggregate with a @EventSourcingHandler annotated method, that contains code that itself triggers an event. That could lead to a bug like you mentioned.

When using Aggregates for Event Sourcing, their state will be recreated based on all past events for that Aggregate (or also from a snapshot, if available). This happens on command-side and has nothing to do with a read-side Event Handler. During that phase, the @EventSourcingHandler methods are called, which must not do anything else but updating the Aggregates internal state. When this phase is finished, the new command is handled and a new event might get applied.

1 Like

Hi @JohT
I’m unable to get your question.

There is another GRPC controller, through which we are sending requests by command gateway to the aggregate.
That’s why we have the same Aggregate Identifier

It would help us to know the entire expected message flow. Which commands are handled by the aggregate, which events are sent from the aggregate, and are there additional messages sent based on those? In a typical application, the events are only created by the aggregate, which should use @EventSourcingHandle to build its state based on the events. Other processors can use @EventHandler to read the stream of events.

We are a bit confused since you showed an @EventHandler annotated method while we asked what your aggregate looked like. More specifically, from which method does the unexpected logging originate?

Issue resolved.

That was the issue related to snapshots.
I tried to fix it with this (WeakReferenceCache) configuration.

Thanks @JohT @Gerard @Steven_van_Beelen

1 Like