Queries on conflict resolver in axon

Hi,

I am new to axon framework and need more understanding on how conflict resolver works. I have two events one that is unseen and second that is uncommitted. How these two events will be merged in the resolve conflicts method or how can i make the uncommitted event to wait for unseen event to be committed?

As per the api
" Detecting these conflicts is a matter of comparing the two lists of DomainEvents provided in the resolveConflicts method declared on the ConflictResolver. If such a conflict is found, a ConflictingModificationException (or better, a more explicit and explanatory subclass of it) must be thrown. If the ConflictResolver returns normally, the events are persisted, effectively meaning that the concurrent changes have been merged."

What exactly we mean by comparing the two list of events?
Do we need to check the version of the two events ?

Any pointers are welcome:)

Thanks & Regards,
Prashant Tiwari

You may want to check out the ConflictResolver API. The resolveConflicts method takes two lists of events, just as you found in the documentation. You need to implement this interface. You can loop over the two lists, and compare each event. If any unseen (“committed”) events conflicts with any uncommitted (“applied”) event, then you throw an exception. Otherwise, if you don’t throw an exception, the unseen (“applied”) events will be committed as-is.

Here is an example for a basic rule. Two events conflict if they are the same type, otherwise they do not conflict. I’ve found this is a pretty good starting place. (Of course your requirements will probably be more complex than this.)

// wire this conflict resolver into the repository:
public class MyConflictResolver implements ConflictResolver {

public void resolveConflicts(List appliedChanges, List committedChanges) {
for (DomainEventMessage appliedChange : appliedChanges) {
for (DomainEventMessage committedChange : committedChanges) {
if (appliedChange.getPayloadType().equals(committedChange.getPayloadType()) {
throw new ConflictingModificationException("conflict between two events of the same type: "

  • appliedChange.getPayloadType().getSimpleName());
    }
    }
    }
    }
    }

-Peter

Thanks Peter for the reply. I have following scenarios:

i) I am getting deadlock exception when the second event tries to get a lock on the aggregate. I am using PessimisticLockManager & want to gracefully handle this for e.g like is there any way the second event will wait for the first event to complete?
ii) If i am trying to load an aggregate that is not saved yet i am getting “Trying to peek beyond the limit of existing DomainEventStream”. In this case too, i want the second event to wait for the first event to complete.

Any suggestion are welcome.

Thanks & Regards,
Prashant Tiwari

Hi Prashant,

why would an event want to get a lock in an aggregate?

In the ConflictResolver, don’t load any aggregates. Just iterate through the two lists you get and throw an exception if there is a conflict. In case of a conflicht, the unit of work (and transaction) is rolled back. Otherwise the command is executed normally.
Note that a command always executes against the latest state of an aggregate. Only if the latest state is newer than the expected state, the conflict resolver allows you to specify whether there is a conflict or not.

Hope this helps.
Cheers,

Allard

Hi Allard,

Thanks for your reply.

  1. This is my first scenario:

As per the documentation:

“Deadlocks are a common problem when threads use more than one lock to complete their operation. In the case of Sagas, it is not uncommon that a command is dispatched -causing a lock to be acquired-, while still holding a lock on the aggregate that cause the Saga to be invoked. The PessimisticLockManager will automatically detect an imminent deadlock and will throw a DeadlockException before the deadlock actually occurs.”

Instead of throwing deadlock exception, is there any way to make the second command wait so that first command executes completely?

  1. While loading from the event sourcing repository i am getting “Trying to peek beyond the limits of DomainEventStream” when we try to create an aggregate (events.peek() is called in the constructor) if none exits, for the second command while the first command that invoked the saga has not stored the aggregate yet. In this scenario too, i want the second command to wait so the first command could save the aggregate.

I am getting above issue in axon-2.0.3.

Kindly let me know if you need more details.

Cheers,
Prashant Tiwari

Hi Prashant,

  1. From the framework perspective, that’s not possible. You simply cannot make something wait if it runs in the same thread. However, if you configure an AsynchronousCommandBus in your application, you can do exactly that.

  2. Why are you peeking into a DomainEventStream at all? Axon uses the stream to initialize your aggregate. Obivously, when you try to read from it, the pointer of the stream is already at the end. It’s not really clear to me what you’re trying to achieve.

Cheers,

Allard

Hi Allan,

Thanks for your response. I will try the AsynchronousCommandBus.

Regarding point 2, I was thinking if there is no aggregate present in the repository with provided aggregateIdentifier then i would get AggregateNotFoundException instead i am getting “Trying to peek beyond the limits of DomainEventStream”. I am using AuditEventLogger and “Trying to peek…” exception message is not so informative.

I have two commands A & B with same aggregate identifier. I create the aggregate in command A. Now,

i) If command B comes & there is no aggregate (with that identifier) then i get “Trying to peek…” exception. As i am loading the aggregate in command B (hoping command A will reach first).
ii) If command B comes after command A but command A has not yet saved the aggregate and i load the aggregate then also i am getting same exception

Is it possible to put a check method in repository before loading the aggregate?

Cheers,
Prashant

Ah, I see. Which repository and event store implementations do you use?

To prevent the commands overtaking eachother, you can send command B to the aggregate as part of the callback of command A. This will guarantee that command B is sent after command A is processed.

Cheers,

Allard

Hi Allard,

I am using redis event store (jedis 2.0.0).

Cheers,
Prashant

I assume you know that the RedisEventStore currently has incubator status? It’s not at all production-ready.

Just so you know.
Cheers,

Allard