How to achieve fully consistent CQRS with Axon while still storing events in an Event Store?

Hi,

In our project, the domain folks demand our system to be fully consistent, fearing occasional inconsistencies between read (query) model and write model caused by eventual consistency. At the same time, we need all domain objects (aggregates) to have historical data (e.g. to be able to rebuild a historical version or analyze historical changes). Thus, Event Sourcing and CQRS seem to fit quite well except for the otherwise preferred asynchronous dispatching and processing of events. Due to the full consistency requirement we want to process the command, change the state of the aggregate object and update the query model within the same transaction. We also use Spring (Boot etc.) already.

After going through the Axon docs and forums, I have not found any way how to update the query model synchronously and still keep the events in an event store. I could have a Spring @Transactional transaction encompassing the command processing and updating the query model via an event sent through the SimpleEventBus, which is synchronous. But the SimpleEventBus does not provide for an Event Store. Only the EmbeddedEventStore persists events in an Event Store, but it is asynchronous. By associating a Transaction Manager I could make the dispatching of events (e.g. in RabbitMQ) synchronous and thus part of my big transaction, but the processing of the event would still happen outside the transaction and therefore the query model would not become fully consistent.

Should we use SimpleEventBus and implement our own Event Store? Are there other options?

br
Marek

There is no universally correct solution here, but we had a requirement to guarantee that by the time we returned an “OK” response to the initial request from a client, the query model had a record of the request already. (We had complaints from clients who sent us a request and then immediately tried to query its status and were alarmed that the query didn’t return anything.)

Our solution was to prepopulate the query model before sending the command to Axon and before replying to the client. The query model still gets populated by event handlers too (otherwise we couldn’t rebuild) but the event handlers that update the query model are designed to handle the case where the model is already up to date. In practice for us, on PostgreSQL, that means having ON CONFLICT clauses on all our INSERT statements.

For us the concern is only whether the record exists at all in the query model; after that our application is fine with eventual consistency. However, for the more general case, one option would be to update the query model from your command handlers and make sure the command bus is synchronous. At its simplest, this might look like

class SomeAggregate {
@CommandHandler
public void handle(SomeCommand command, QueryModelUpdater updater) {
SomeEvent event = doComputations(command);
updater.on(event);
apply(event);
}
}

class QueryModelUpdater {
@EventHandler
public void on(SomeEvent event) {
// do the equivalent of this with whatever database library you use
UPDATE query_model SET field1 = event.getField1() WHERE id = event.getId();
}
}

The downside is you’ll double-update the query model; if that’s a nonstarter for performance reasons, you could add logic to detect whether you’re in an event replay, say, and skip the update if not.

Hope that’s useful for you.

-Steve

Wouldn't you get the desired behaviour if you put a custom commandbus interface that returns the value as returned by the commandhandler? IIRC that forces everything to be synchronous.

Jorg

Hi Marek,

note that, by default, the Axon Spring Boot Starters will give you implementations of components that give you full consistency. A Command will trigger a transaction, which is then also used to update the query models and event store. Only when you start distributing events, or processing them asynchronously, then you will have eventual consistency to deal with.

So if you stick to default implementations, there is no configuration needed to get what you want.

Allard

Hi Allard,

good news! Thanks!

Still, could you be more precise please if this is the axon-spring-boot-starter setup we should or may use to get full consistency? :

  • Event Bus and Event Store Configuration : we use JPA (Hibernate) already, so axon-spring-boot-starter provides us with an event store
  • Command Bus Configuration: SimpleCommandBus cause no CommandBus implementation is explicitly defined in the Application Context
  • Aggregate Configuration: we would use @Aggregate and @CommandHandler on Aggregate Root, but our “aggregates” are just pure data objects (Hibernate @Entities) and the business logic, alias commands is placed in a separate workflow object hierarchy. So we try to set up the workflow engine as an External Command Handler. As we have a generic approach and many concrete workflow classes (template method and kind of strategy pattern), So we would make our WorkflowExecutor to be the external comamnd handler.
  • Saga Configuration: we probably won’t need and use sagas at all
  • Event Handling Configuration: on services except the query model updating ones, we use @EventHandler
  • Enabling AMQP: the use of AMQP precludes full consistency, or doesn’t it? That is what you mean by we should not process events asynchronously, right?

Regards,

Marek

Hi Marek,

in the setup you mention, everything runs in the same thread, and is therefore also handled in the same transaction. This includes publishing to the AMQP message broker (as much as is possible, since they’re 2 different systems that need to be coordinated).
Obviously, reading from the broker is no longer part of the same transaction.

To validate this, simply write to a log file from different components. There, you’ll see that they’re executed in the same thread.

Cheers,

Allard