so I’m trying to get my head around the distributed command bus in axon 3.4. I have a use-case that when a certain command gets send an aggregate sends an event that starts a saga, this saga sends 2 commands to keep the data send to 2 different services in a consistent state.
Now here come’s the tricky part, the CommandHandlers are defined in external services that do something and then send a command back with the result of the operation in it. However when the command gets send I always end up with a timeout exception, so the CommandBus knows which aggregate has to handle it but can’t assign the correct Aggregate to the Command.
Currently commandService.createCurrency only logs a message, that’s why there’s a Thread.sleep in the event handler, to simulate a longer running process.
Below you’ll find my code:
@Configuration
public class AxonConfig {
@Autowired
private Registration registration;
private RestTemplate restTemplate = new RestTemplate();
@Bean
public CommandBusConnector springHttpCommandBusConnector(@Qualifier("localSegment") CommandBus localSegment,
Serializer serializer) {
return new SpringHttpCommandBusConnector(localSegment, restTemplate, serializer);
}
@Bean
public CommandRouter springCloudCommandRouter(DiscoveryClient discoveryClient) {
return new SpringCloudCommandRouter(discoveryClient, registration, new AnnotationRoutingStrategy());
}
@Primary // to make sure this CommandBus implementation is used for autowiring
@Bean
public DistributedCommandBus springCloudDistributedCommandBus(CommandRouter commandRouter,
CommandBusConnector commandBusConnector) {
return new DistributedCommandBus(commandRouter, commandBusConnector);
}
}
Service 1
Aggregate:
@Aggregate
@Data
@Slf4j
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class CreateCurrencyAggregate {
@AggregateIdentifier
private String id;
@CommandHandler
public CreateCurrencyAggregate(CreateCurrencyCommand command) {
log.info("starting create currency");
Assert.notNull(command.getId(), "CreateCurrencyCommand must have an id");
Assert.hasLength(command.getId(), "CreateCurrencyCommand id cannot be an empty String");
this.id = command.getId();
apply(CreateCurrencyEvent.builder()
.id(command.getId())
.payload(command.getPayload())
.build());
}
@CommandHandler
public void on(DalCreatedCommand command) {
log.info("Currency created on dal layer");
apply(DalCurrencyCreatedEvent.builder()
.dalId(command.getId())
.build());
}
}
Saga:
@Slf4j
@Saga
public class CreateCurrencySaga {
@Autowired
private transient CommandGateway commandGateway;
@StartSaga
@SagaEventHandler(associationProperty = "id")
public void handle(CreateCurrencyEvent event) {
log.info("starting saga...");
dalCreated = false;
as400Created = true;
SagaLifecycle.associateWith("id", event.getId());
SagaLifecycle.associateWith("dalId", event.getId());
commandGateway.send(CreateDalCurrencyCommand.builder()
.id(event.getId())
.payload(event.getPayload())
.build());
}
@SagaEventHandler(associationProperty = "dalId")
public void handle(DalCurrencyCreatedEvent event) {
log.info("receiving createdEvent");
SagaLifecycle.end();
}
}
Service 2
External CommandHandler
@Slf4j
@NoArgsConstructor(access = AccessLevel.PROTECTED)
@Component
public class CurrencyCommandHandler {
@Autowired
private EventBus eventBus;
@CommandHandler
public void on(CreateDalCurrencyCommand command) {
eventBus.publish(asEventMessage(CreateDalCurrencyEvent.builder()
.id(command.getId())
.payload(command.getPayload())
.build()));
}
}
Eventhandler
@Slf4j
@RequiredArgsConstructor
@Component
public class CurrencyEventHandlers {
private final CurrencyCommandService commandService;
private final CommandGateway commandGateway;
@EventHandler
public void handle(CreateDalCurrencyEvent event){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
commandService.createCurrency(event.getId(), event.getPayload());
var result = commandGateway.send(DalCreatedCommand.builder()
.id(event.getId())
.build());
}
}