Disruptor Command Bus Example

Hi,

I am looking for an example of using Disruptor Command Bus. Can anyone please provide the same?

Thanks & Regards,
Prachi Mujumdar

Hi,

you can check out the DisruptorCommandBus enchmark code: https://github.com/AxonFramework/AxonFramework/blob/master/core/src/test/java/org/axonframework/commandhandling/disruptor/DisruptorCommandBusBenchmark.java

Or do you need a Spring sample?
Cheers,

Allard

Thanks Allard.

I am using following configuration for Disruptor Command Bus in Spring Context. With this configuration my command handler is getting called but events applied from aggregate are not getting published to event bus.

Spring Application Context

`

<axon:annotation-config />

<axon:disruptor-command-bus id=“commandBus” event-store=“eventStore” event-bus=“eventBus” transaction-manager=“transactionManager” executor=“taskExecutor”>
axon:repositories
<axon:repository id=“toDoItemRepository” aggregate-type=“org.axonframework.sample.aggregate.ToDoItem”/>
</axon:repositories>
</axon:disruptor-command-bus>

<axon:event-bus id=“eventBus” />

<axon:jpa-event-store id=“eventStore” data-source=“dataSource” />





<axon:snapshotter id=“snapshotter” event-store=“eventStore”
executor=“taskExecutor” />

<axon:event-sourcing-repository id=“toDoItemRepository”
aggregate-type=“org.axonframework.sample.aggregate.ToDoItem”
event-bus=“eventBus” event-store=“eventStore”>
<axon:snapshotter-trigger
event-count-threshold=“5” snapshotter-ref=“snapshotter” />
</axon:event-sourcing-repository>



`

ToDoItem

`

public class ToDoItem extends AbstractAnnotatedAggregateRoot {

@AggregateIdentifier
private String toDoItemAggregateId;

public ToDoItem() {

}

public ToDoItem(String toDoItemAggregateId) {
apply(new ToDoItemCreatedEvent(toDoItemAggregateId));
}

@EventSourcingHandler
public void handleCaseActiveEvent(ToDoItemCreatedEvent toDoItemCreatedEvent) {
this.toDoItemAggregateId = toDoItemCreatedEvent.getToDoItemAggregateId();
}

}

`

CreateToDoItemCommand

`

public class CreateToDoItemCommand {

public CreateToDoItemCommand() {
}

}

`

ToDoItemCommandHandler

`

public class ToDoItemCommandHandler {

private Repository toDoItemRepository;

@CommandHandler
public String handleCreateToDoItemCommand(CreateToDoItemCommand createToDoItemCommand) {

// Create Aggregate Identifier
String aggregateId = UUID.randomUUID().toString();

// Create case Aggregate
ToDoItem toDoItem = new ToDoItem(aggregateId);
toDoItemRepository.add(toDoItem);

return aggregateId;
}

public void setToDoItemRepository(Repository toDoItemRepository) {
this.toDoItemRepository = toDoItemRepository;
}
}

`

ToDoItemCreatedEvent

`

public class ToDoItemCreatedEvent implements Serializable {

private String toDoItemAggregateId;

public ToDoItemCreatedEvent(String toDoItemAggregateId) {
this.toDoItemAggregateId = toDoItemAggregateId;
}

public String getToDoItemAggregateId() {
return toDoItemAggregateId;
}
}

`

ToDoItemCreatedEventHandler

`

public class ToDoItemCreatedEventHandler {

@EventHandler
public void handleToDoItemCreatedEvent(ToDoItemCreatedEvent toDoItemCreatedEvent) {
System.out.println(“In Event handler of ToDoItemCreatedEvent”);
}
}

`

Thanks & Regards,
Prachi Mujumdar

You have two repositories for ToDoItem in your context. That conflict might be the cause. You should not have a axon:event-sourcing-repository element for the TodoItem. The repository configured inside the DisruptorCommandBus is the only repository definition you should have.

Cheers,

Allard

I have removed axon:event-sourcing-repository element but the bean defined for Command handler needs reference to event repository and in absence of that I get org.springframework.beans.factory.NoSuchBeanDefinitionException. I have tried this even with Aggregate command handler but that too needs reference to repository.

<bean id="toDoItemCommanHandler" class="org.axonframework.sample.command.handler.ToDoItemCommandHandler"> <property name="toDoItemRepository" **ref="toDoItemRepository"** /> </bean>

Or

<axon:aggregate-command-handler id="toDoItemCommanHandler" command-bus="commandBus" aggregate-type="org.axonframework.sample.aggregate.ToDoItem" **repository="toDoItemRepository"** />

Thanks & Regards,
Prachi Mujumdar

What is the type of the toDoItemRepository atteibute in your command handler?
Whih spring version do you use?

toDoItemRepository is of Type Repository. I am using Spring 4.0.3.RELEASE

Thanks & Regards,
Prachi Mujumdar

There was a known issue in Spring 4.0.0, but it has been resolved in 4.0.1, if I recall correctly.
The definition of the axon:repository inside the disruptor-command-bus should provide a bean of that type (but without generic parameters) under the id mentioned.

disruptor-config.xml (1.96 KB)

Thanks Allard. My code is working now with disruptor command bus.
The repository ID mentioned in repositories of disruptor command bus was not matching with the bean ref ID of command bus.

Just a couple more questions around this?

  1. How can I configure snapshot-trigger for the repository mentioned in disruptor command bus? This was possible when defining event-sourcing repository.

<axon:event-sourcing-repository id=“toDoItemRepository”
aggregate-type=“org.axonframework.sample.aggregate.ToDoItem”
event-bus=“eventBus” event-store=“eventStore”>
<axon:snapshotter-trigger
event-count-threshold=“5” snapshotter-ref=“snapshotter” />
</axon:event-sourcing-repository>

  1. I have used a callback while dispatching a command over disruptor command bus. But the result returned is always null as command handler which is responsible for returning the result is executed in separate thread. This works fine with Simple Command Bus.

Thanks & Regards,
Prachi Mujumdar

Hi Prachi,

the disruptor commandbus doesn’t have a shortcut configuration for the snapshotter trigger (yet). Instead, you need to configure the snapshotter trigger bean as a publisher-interceptor (for which there is a child xml element in disruptor-command-bus).

Cheers,

Allard

And about the callback, it should just provide you with the value returned by the command handler. If that method declares void, you always get null back. The fact that other threads send the result shouldn’t have anything to do with it.

Cheers,

Allard

Hello Allard,
Great job on the release of 3.0.6 and all that boot integration; it does infact takes away a lot of the boiler plate code.
Can you provide a simple sample TODO app (in Axon 3.0.6, or what whatever the current migjt be when reading this email) that uses JPA either annotated on the aggregate class or a custom entity class that is Spring Data aware using an in memory database such as H2 and/or MySQL configuration, and most importantly using the Disruptor command bus instead of the simple one.
If you could attach a full working example as an attached zip, that would be highly appreciated.

Thanks for all the good work, and I am keeping an eye on AxonIQ as well as any upcoming ddd conferences.

Cheers,
Joe

Hi Joe,

unfortunately, my schedule doesn’t allow me to create sample apps for all combinations of Axon components ;-)… But I can give you some pointers to get started.
The DisruptorCommandBus only works on Event Sourced aggregates. So the combination of Aggregates stored as Entities in an ORM will not work. So you want one or the other but not both.

To configure the DisruptorCommandBus, simply define an @Bean in your application context that returns one. Axon will automatically pick it up.
To store your aggregate using an ORM, the simplest way is to define a @Bean returning a GenericJpaRepository for your aggregate. Either name the bean (i.e. method) <>Repository (e.g. for MyAggregate call it “myAggregateRepository”), or otherwise specify the name of the repository in the @Aggregate annotation.

Hope this helps.

Allard