Issue with stale state when loading aggregate using repository

Hello Team,

In all the command execution handler section the logic is placed in dedicated command handlers and for any validation of the command against the aggregate state, below method is used to load the aggregate and upon the loaded aggregate the validation is performed.

getRepository().load("xxxxx").execute(aggRef->{
       // validation logic upon the aggregate 
  });

But many times some of the state is stale even though the respective events are published successfully with no exceptions, before this call.
So how does the aggregate get formed, is it going to be replaying all events and then forming the aggregate ?

Tried with both the axonserver and embeddedeventstore versions, issue is consistent.

Request for help.

NOTE: I am performing some mini perf testing and due to which I am running many commands per second. ( around 300 commands / sec which in turn would trigger 300 event/sec )

–Kartik

Hi Kartik, let me see how I can help you out here.

First point of note: you do not have to invoke the Repository yourself, ever.
That doesn’t mean it’s wrong but more so that you can simplify your implementation.
If you dispatch the commands, regardless of what they do, on the CommandGateway, then it’ll take care of reaching your Aggregate. The required validation is then performed inside the Aggregate, which it is intended for.

You can rely on Axon’s logic of distributed commands, consistently routing them to the same application node and consistently routing them to the same aggregate instance. In doing so, Axon’s Repository implementation will ensure no two (or more) commands can enter the aggregate simultaneously.

Following this pattern should net you with a situation where you should always have the latest state once the framework invokes a Command Handler. This holds because (1) no two commands can access the same model within a single application, and (2) because no two instances will load the same model as the commands are routed consistently.

Now that I have shared this, it’s time to better understand your setup.
So, would you be able to share the following with us so that we can deduce what is amiss:

  • Have you configured your own CommandBus? If so, how?
  • Have you configured your own CommandGateway? If so, how?
  • Have you configured your own EventStore? If so, how?
  • What’s the logic you’re currently performing inside the execute method of the returned Aggregate within your snippet?
  • Can you expand the code block to fully show how you’re loading it and what you’re doing? As you state, this is within a performance test, I suspect some multi-threaded logic that might mix badly with Axon’s own CommandBus logic.
  • What version of the Framework (or BOM) are you using?
  • What version of Java are you using?

If possible, sharing the sample project on GitHub might be the easiest. :slight_smile:
Axon Server would, by the way, provide you with the most straightforward way of utilizing a consistently routing command bus.

Hello @Steven_van_Beelen

My bad, I did not frame my question properly. Let me elaborate more on this.

Below is the aggregate command handler method and the logic is moved to the dedicated command handler class, for the current aggregate class to be readable.

@Aggregate 
public class WorkAgregate {

   public void on(TestCommand command, TestCommandHandler cmdHandler){
       cmdHandler.handle(command);
   }    
}

All the complex logic of the aggregate state validation is placed in here. Since its not a best practise to pass the aggregate reference using this from above handle() method, I am using the repository as shown below

@Component 
public class TestCommandHandler { 

    @Autowired
    private Repository<WorkAggregate> repository;

    public void handle(TestCommand command){
        repository.load(command.aggId())
                .execute(aggregate -> {    // [1] issue is here 
                        validate(aggregate, command)
                });
    }

   private void validate(WorkAggregate agg, TestCommand command) throws ValidateException {
      // perform validation 
   }
} 

So the issue is when the aggregate ( marked at [1] ) received from the repository, the state is stale.

Pls find the responses for the above questions :

  • Have you configured your own CommandBus? If so, how?

using default

  • Have you configured your own CommandGateway? If so, how?

using default

  • Have you configured your own EventStore? If so, how?
@Bean
public EventStore newEventStore() {
    return EmbeddedEventStore.builder()
            .storageEngine(new InMemoryEventStorageEngine())
            .build();
}
  • What’s the logic you’re currently performing inside the execute method of the returned Aggregate within your snippet?

few conditions using the state properties

  • What version of the Framework (or BOM) are you using?

4.5.1

  • What version of Java are you using?

8

Thank you for your time, pls let know if any other info is needed.

Thanks for this Kartik, this helps a lot!
The one thing I’m missing from your sample is the @CommandHandler annotation. Without it, Axon Framework does not know how to invoke the handle method on the TestCommandHandler, nor the WorkAggregate.
So, I have a small rewrite suggestion for your implementation that I’d like to share.

Your WorkAggregate:

@Aggregate 
public class WorkAgregate {

   public void handle(TestCommand command) throws ValidateException
       // perform validation
   }   
}

Your TestCommandHandler:

@Component 
public class TestCommandHandler { 

    @Autowired
    private Repository<WorkAggregate> repository;

    @CommandHandler
    public void handle(TestCommand command){
        repository.load(command.aggId())
                  .execute(aggregate -> aggregate.handle(command));
    }
} 

By doing the above, you make certain that Axon will invoke the TestCommandHandler#handle(TestCommand) method for you.
By loading the Aggregate there, you can invoke execute to call a method on the WorkAggregate itself. The WorkAggregate#handle(TestCommand) would then become the place to perform the validation.

This ensures the WorkAggregate state sticks within the aggregate. If you perform the validation outside of the aggregate as you’ve drafted up in your TestCommandHandler, you would potentially expose that information (although this depends on the validate implementation you had in mind).

Note that you can simplify this further with the following:

@Aggregate 
public class WorkAgregate {

   @CommandHandler
   public void handle(TestCommand command) throws ValidateException
       // perform validation
   }   
}

Axon Framework will automatically perform the task you’ve written in the TestCommandHandler by doing this.

Hello @Steven_van_Beelen

Actually @CommandHandler is added in code and I missed mentioning in the post.

The reason the validation logic is to be moved to another command handler class is to have readability in aggregate class, else due to many commands and events handlers, it would be bloated into many lines of code.

Even in the above solution of calling the Aggregate handle() from CommandHandler would still be the same as having without the handler and entire validation logic would still reside in the Aggregate.

Isnt there any solution to move the logic from the aggregate into respective handlers without any state corruption. And do we have any solution for the event handlers too.

NOTE: For the aggregate property accessors, all are marked protected and only getXX are exposed. Even for complex object properties, I am ensuring the setXX are not exposed inside the complex object. Protected accessors are also used in case of unit testing where the test classes are placed in the same package structure.

Alright, that’s good to know Kartik, thanks.

You can surely move validation to a separate class if you and your team feel it otherwise bloats your Aggregate. I personally wouldn’t pass the Aggregate itself as an argument to this component, though, as that would eventually mean you’re sharing more than is necessary for the validation component to know about.

Simply having a class performing the validation and wiring it as a parameter is sufficient.
If you’re in a Spring environment, this service would be a bean.

What’s of utmost importance with such a component, is that it does not do any outbound operations.
Otherwise, you’re extending the lock period on your aggregate, blocking other operations from even entering the instance.
You should thus make these validation components simple state machines.


So, this should be possible, but for some reason doesn’t work in your environment.
Is there per chance still an open-session-in-view, Kartik?
Spring can keep the connection and transaction open until the response is sent over HTTP.
The lock on the Aggregate that occurs if you (1) dispatch a command directly to an aggregate or (2) when you invoke Repository#load(String) might be released earlier than that.

I have disabled the open-session-in-view=false in spring config properties and yet the issue is same. Always the command is published via the gateway only.

Hmm, I’m running out of ideas a little at this moment.
I’d wager there must be something amiss with the setup you have at this stage, because an issue like this would otherwise have surfaced a lot sooner (or, at least, I’d hope).

I have another request for you, Kartik, which I hope will bring this to a conclusion.
Any chance you can make a (minimal) project wherein this behavior consistently occurs, that I can check out? That way I can figure out (likely through debugging) what’s amiss.

Sure @Steven_van_Beelen

1 Like