Sending Events to Event Bus

Hello Everyone

I am very new to AXON and CQRS world. I used the quick guide to see how AXON works and I am able to successfully run it on my machine.

I see that after setting up the infrastructure configuration, AXON automatically store the events on the persistence storage and publish events on the Event Bus.

Since I want to be sure for (Persistence and Publish ) of events as atomic i.e. either both are Successful or both fail ,
I would like to first store the event in the Persistence storage and once it is stored , I want that event to publish it on the event bus.

For doing this , I need to have control on when and how these events are persisted and published by AXON.

Also, I am thinking of running the background thread which listens to the Persistence storage , once event get stored , thread will publish the event on the
event bus and then thread will mark the event in the event store as Published.

HELP !!!

Hi Sally,

what you’re describing, is actually the default Axon behavior. Axon will always first store the events in the event store, and publish them to the bus afterwards.
If you attach a transaction manager to the unit of work, everything is dealt with transactionally as well.

If you really must customize this part, you’ll need to create your own UnitOfWork and configure a UnitOfWorkFactory. It’s not a road I’d recommend, though.

Cheers,

Allard

Hi Allard,

Thanks for your reply. Does it mean that we need to associate a TransactionManager to the command bus as shown below?

<axon:command-bus id=“commandBus” transaction-manager=“txManager”/>

Thanks | Abhishek

Yes, that is correct.

Cheers, Allard

And one more question:

how would I configure a transaction manager for the same. I tried this but no luck.

<axon:command-bus id=“commandBus” transaction-manager=“transactionManager”/>

Thanks | Abhishek

By the way, I am using mongodb as the event store and by spring config looks like this:

Hi,

the transaction-manager atrtibute should point to a Spring PlatformTransactionManager. There is no need to wrap it in a SpringTransactionManager explicitly.

However, as far as I know, Mongo doesn’t support transactions. So there is no point configuring a transaction manager.

Cheers,

Allard

Hi Allard,

This really helps! However, I have couple of more questions before we decide what would be best-suited for our requirements.

  1. I read the Axon reference document and figured that UnitOfWork is not transactional. So, if there are five actions in an UnitOfWork and when that UnitOfWork proceeds to commit, there is no guarantee that all five actions will be persisted in the event store. So, if three of them are persisted in the event store before the UnitOfWork fails, those three would not be rolled back and three events corresponding to three actions persisted in the event store would be published in the event bus. Is this understanding correct? And to make sure that all five or none of the actions persist in the event store, we have to configure a transaction manager for the command bus. And this technique would only work if the event store supports transaction. Please tell me if this understanding is correct as well.
  2. Does Axon framework give a guarantee that if an UnitOfWork is committed then all the relevant events would be published in the event bus? If not, what can we do to ensure it?
  3. If we still decide to use MongoDB as the event store, we must ensure one of the two to guarantee that an UnitOfWork would still be transactional. Is this understanding correct?
  • There should be one action in one UnitOfWork since one document storage is atomic for MongoDB.
  • If there are multiple actions in one UnitOfWork, we have to configure in a way so that all the relevant events would be persisted on a single document in MongoDB. Do this approach has any negative impact on event replaying to form an aggregate?
    Sorry for such a big mail.

Thanks | Abhishek

Hi Allard

We want to guarantee the complete success or complete failure of the
following 2 actions.

1. Events gets stored in MongoDB .
2. Events are published on the Event Bus.

What should we do exactly?

The following things should never happen in the system

1. All Events associated with a Command gets Stored but are not Published
on the Event Bus.
2. Some events associated with a Command gets Stored and Unit of Work fails.

In the above failure Scenarios , we want to Roll back everything and try
again.

Also, very curious to know that the above scenario is very crucial and
basic to every system , why it is not included in the AXON framework to be
guaranteed. Any particular reasons for this.?

Looking Forward for the Solution.

Thanks

Sally

Hi Sally,

the issue isn’t that Axon cannot guarantee this, but that Mongo just doesn’t support transactions. You can configure a transaction manager on a unit of work that can give you the necessary guarantees (for another dbms).

Axon allows for a Document-per-commit (which stores all events for a single uow in one document) to be able to ensure either all or none of the events are stored.

Cheers,

Allard

Hello Allard

Thanks for your reply. We have done the following

  1. We have configured Transaction Manager with the Command Bus.
  2. We have registered Unit Of Work Listeners too.
  3. Now while testing, when we switch off Event Bus , we get callbacks both in Transaction Manager as well as UnitOfWork Listener. OnRollback callback is called for both
    Transaction Manager as well as Unit Of Work. What is the difference ?

Also, we are planning to delete the documents from the mongodb once onRoolback is called, (to ensure atomicity of UOW ), is this strategy right ?

Cheers!!!
Sally

Hi,

how do you mean ‘We get callbacks both in tm and uow’? I am only aware of callbacks on the uow. The transaction manager is just to manage the tx. The uow takes care of the different actions (including invoking the transaction manager) to take and their timing.

Removing the document on rollback sounds reasonable.

Cheers, Allard

Hello Allard

Here goes our implementation. Could you please validate.

public class Employee extends AbstractAnnotatedAggregateRoot {

@AggregateIdentifier
private String id;

private String name;
private String email;
private Address address;

public Employee(){
}

@CommandHandler
public Employee(CreateEmployeeCommand command, UnitOfWork unitOfWork){
registerUnitOfWorkListener(unitOfWork);
apply(new EmployeeCreatedEvent(command.getId(), command.getName(), command.getEmail(), command.getAddress()));
}

@CommandHandler
public void changeName(ChangeEmployeeNameCommand command, UnitOfWork unitOfWork){
registerUnitOfWorkListener(unitOfWork);
apply(new EmployeeNameChangedEvent(command.getId(), command.getName()));
}

private void registerUnitOfWorkListener(UnitOfWork unitOfWork) {
unitOfWork.registerListener(new UnitOfWorkListenerAdapter() {

private List eventIdentifiers = new ArrayList();

@Override
public void onPrepareCommit(UnitOfWork unitOfWork, Set aggregateRoots, List events) {

for(EventMessage e : events){
eventIdentifiers.add(e.getIdentifier());
}

super.onPrepareCommit(unitOfWork, aggregateRoots, events);
}

@Override
public void onRollback(UnitOfWork unitOfWork, Throwable failureCause) {

for(String id: eventIdentifiers) {
DBObject document = App.customersDb.getCollection(“domainevents”).findOne();
App.customersDb.getCollection(“domainevents”).remove(document);
}

super.onRollback(unitOfWork, failureCause);
}

});
}

}

Hi Sally,

the general direction of the solution looks all right. However, if you want to perform some activity on each incoming command, it is easier/better to use a CommandHandlerInterceptor.instead. It takes the “infrastructure logic” our of your aggregate, which should focus on business logic.

You can register your CommandHandlerInterceptor on the Command Bus implementation.

Cheers,

Allard

Hello Allard

Thanks for the suggestion, we have plugged in Interceptor and doing registration of unit of work there which works for all commands.

Query : Since a command can spit multiple events, and it might happen that some events got published on event bus and subsequently Event-Bus goes off , consequently ,UnitOfWork Listener rollback is called , how could we understand which events are successfully published on event bus and which are not as we are
getting all the events associated with the Unit Of Work,
We tried associating ExceptionListener with ActiveMQConnectionFactory so as to get the error notification. BUt the callback is not called when the queue is shut off.
Are their any callbacks/listeners() which are notified when a event is published on event bus? Is there any way of knowing this info from events in onRollback()?

Hi Sally,

some message brokers support (light weight) transactions. You can use those when you need to send more than one message.

The callbacks on the UnitOfWork don’t provide this levelof detailed information.

Cheers,

Allard

Hi Allard

In the scenario, when all of the events pertaining to one command are not published into the event bus, we want to retry the “non-published” events to event bus
until they got published.

Is there any scheduler built-in Axon which we can exploit ?

Thanks !!!

There are some schedulers in the AsynchronousCluster, although I am not sure you’d want to use those in your scenario. You’re probably best of building something yourself.

Cheers,

Allard

Is there improved feature in higher version of Axon?
we do want to rollback a document stored in EventStore.
while I was debugging,
Before calling AggregateLifeCycle.apply() if eventbus stopped or crached, the event is still saved in the eventStore(MongoDB)

2015년 3월 26일 목요일 오전 11시 51분 36초 UTC+1, Allard Buijze 님의 말:

Hi Juya,

First off, if you’re still in the pre-production phase, I’d suggest using AxonServer or otherwise a regular Relational Database to store your events.
There are off chances that the MongoEventStorageEngine creates undesired situations in regards to trying to store big numbers of events concurrently.
To give a very short explanation on this, this happens because for Mongo the framework does not use a generated global index for ensured event ordering, by the timestamp.

A part from that though, I think the issue you’re having stems from the Transaction Manager being used.
Make sure that the same Transaction Manager as your Mongo instance uses is set on the CommandBus you’re using.

Hope this helps!

Cheers,
Steven