axon 3.4 CommandHandler in Aggregate not being triggered when command is send from external service

so I’m trying to get my head around the distributed command bus in axon 3.4. I have a use-case that when a certain command gets send an aggregate sends an event that starts a saga, this saga sends 2 commands to keep the data send to 2 different services in a consistent state.

Now here come’s the tricky part, the CommandHandlers are defined in external services that do something and then send a command back with the result of the operation in it. However when the command gets send I always end up with a timeout exception, so the CommandBus knows which aggregate has to handle it but can’t assign the correct Aggregate to the Command.

Currently commandService.createCurrency only logs a message, that’s why there’s a Thread.sleep in the event handler, to simulate a longer running process.

Below you’ll find my code:

@Configuration
public class AxonConfig {

    @Autowired
    private Registration registration;

    private RestTemplate restTemplate = new RestTemplate();

    @Bean
    public CommandBusConnector springHttpCommandBusConnector(@Qualifier("localSegment") CommandBus localSegment,
                                                             Serializer serializer) {
        return new SpringHttpCommandBusConnector(localSegment, restTemplate, serializer);
    }

    @Bean
    public CommandRouter springCloudCommandRouter(DiscoveryClient discoveryClient) {
        return new SpringCloudCommandRouter(discoveryClient, registration, new AnnotationRoutingStrategy());
    }

    @Primary // to make sure this CommandBus implementation is used for autowiring
    @Bean
    public DistributedCommandBus springCloudDistributedCommandBus(CommandRouter commandRouter,
                                                                  CommandBusConnector commandBusConnector) {
        return new DistributedCommandBus(commandRouter, commandBusConnector);
    }

}

Service 1

Aggregate:

@Aggregate
@Data
@Slf4j
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class CreateCurrencyAggregate {

    @AggregateIdentifier
    private String id;

    @CommandHandler
    public CreateCurrencyAggregate(CreateCurrencyCommand command) {
        log.info("starting create currency");
        Assert.notNull(command.getId(), "CreateCurrencyCommand must have an id");
        Assert.hasLength(command.getId(), "CreateCurrencyCommand id cannot be an empty String");
        this.id = command.getId();
        apply(CreateCurrencyEvent.builder()
                .id(command.getId())
                .payload(command.getPayload())
                .build());
    }

    @CommandHandler
    public void on(DalCreatedCommand command) {
        log.info("Currency created on dal layer");
        apply(DalCurrencyCreatedEvent.builder()
                .dalId(command.getId())
                .build());

    }
}

Saga:

@Slf4j
@Saga
public class CreateCurrencySaga {

    @Autowired
    private transient CommandGateway commandGateway;

    @StartSaga
    @SagaEventHandler(associationProperty = "id")
    public void handle(CreateCurrencyEvent event) {
        log.info("starting saga...");
        dalCreated = false;
        as400Created = true;
        SagaLifecycle.associateWith("id", event.getId());
        SagaLifecycle.associateWith("dalId", event.getId());
        commandGateway.send(CreateDalCurrencyCommand.builder()
                .id(event.getId())
                .payload(event.getPayload())
                .build());
    }

    @SagaEventHandler(associationProperty = "dalId")
    public void handle(DalCurrencyCreatedEvent event) {
        log.info("receiving createdEvent");
        SagaLifecycle.end();
    }

}

Service 2

External CommandHandler

@Slf4j
@NoArgsConstructor(access = AccessLevel.PROTECTED)
@Component
public class CurrencyCommandHandler {

    @Autowired
    private EventBus eventBus;

    @CommandHandler
    public void on(CreateDalCurrencyCommand command) {
        eventBus.publish(asEventMessage(CreateDalCurrencyEvent.builder()
                .id(command.getId())
                .payload(command.getPayload())
                .build()));
    }
}

Eventhandler

@Slf4j
@RequiredArgsConstructor
@Component
public class CurrencyEventHandlers {

    private final CurrencyCommandService commandService;

    private final CommandGateway commandGateway;

    @EventHandler
    public void handle(CreateDalCurrencyEvent event){
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        commandService.createCurrency(event.getId(), event.getPayload());
        var result = commandGateway.send(DalCreatedCommand.builder()
            .id(event.getId())
            .build());
    }
}

Hi Jeroen,

From the snippets you’ve shared, I am assuming the following:

  1. You are using Spring, very likely with Spring Boot and this Axon’s Spring Boot Auto Configuration.
  2. The only custom Axon configuration is that described in the AxonConfig class.

Taken both assumption, I am pretty sure that the CreateCurrencyAggregate is configured through the Axon Spring Boot Auto Configuration, which suggests you are Event Sourcing your Aggregate.
Given it’s an Event Sourced Aggregate, you are minimally inclined to provide an @EventSourcingHandler annotated method for the Aggregate Creation Event to set the @AggregateIdentifier field for the given Aggregate.
In your example, that’s the CreateCurrencyEvent, which would lead to something like this: