The following configuration is working, it was the remote event store that wasn’t properly configured.
@Configuration
@Profile("saga-owner")
public class SagaOwnerConfiguration {
public static final String LOCAL_SAGA = “local-saga”;
private final AxonServerConnectionManager remoteAxonServerConnectionManager = AxonServerConnectionManager
.builder().axonServerConfiguration(
remoteAxonServerConfiguration()).build();
@Bean
@Primary
public CommandGateway localCommandGateway(CommandBus commandBus) {
return DefaultCommandGateway.builder().commandBus(commandBus).build();
}
@Bean(name = “remoteCommandGateway”)
public CommandGateway remoteCommandGateway(
@Qualifier(“localSegment”) CommandBus localSegment,
@Qualifier(“messageSerializer”) Serializer messageSerializer,
RoutingStrategy routingStrategy,
CommandPriorityCalculator priorityCalculator,
CommandLoadFactorProvider loadFactorProvider,
TargetContextResolver<? super CommandMessage<?>> targetContextResolver) {
return DefaultCommandGateway.builder()
.commandBus(AxonServerCommandBus.builder()
.axonServerConnectionManager(
remoteAxonServerConnectionManager())
.configuration(remoteAxonServerConfiguration())
.localSegment(localSegment)
.serializer(messageSerializer)
.routingStrategy(routingStrategy)
.priorityCalculator(priorityCalculator)
.loadFactorProvider(loadFactorProvider)
.targetContextResolver(targetContextResolver)
.build()).build();
}
private AxonServerConnectionManager remoteAxonServerConnectionManager() {
return remoteAxonServerConnectionManager;
}
private AxonServerConfiguration remoteAxonServerConfiguration() {
return AxonServerConfiguration.builder().servers(“localhost:8125”).componentName(“saga-owner”)
.build();
}
@Bean
public MultiStreamableMessageSource multiStreamableMessageSource(EventStore localEventStore,
AxonConfiguration configuration,
AxonServerConnectionManager axonServerConnectionManager,
Serializer snapshotSerializer,
@Qualifier(“eventSerializer”) Serializer eventSerializer) {
return MultiStreamableMessageSource.builder()
.addMessageSource(“localEventStore”, localEventStore)
.addMessageSource(“remoteEventStore”,
AxonServerEventStore.builder()
.messageMonitor(configuration
.messageMonitor(AxonServerEventStore.class, “eventStore”))
.configuration(remoteAxonServerConfiguration())
.platformConnectionManager(remoteAxonServerConnectionManager())
.snapshotSerializer(snapshotSerializer)
.eventSerializer(eventSerializer)
.snapshotFilter(configuration.snapshotFilter())
.upcasterChain(configuration.upcasterChain())
.build())
.longPollingSource(“localEventStore”)
.build();
}
@Autowired
public void configure(EventProcessingConfigurer eventProcessingConfigurer,
MultiStreamableMessageSource multiStreamableMessageSource) {
eventProcessingConfigurer
.registerTrackingEventProcessor(LOCAL_SAGA, c -> multiStreamableMessageSource,
c -> TrackingEventProcessorConfiguration
.forSingleThreadedProcessing().andInitialTrackingToken(
StreamableMessageSource::createHeadToken));
}
@Bean
CommandLineRunner commandLineRunner(CommandGateway commandGateway) {
return args -> commandGateway.send(new CreateAggregateA(UUID.randomUUID().toString()));
}
}
This approach has a lot of downsides though:
- If the remote axon server is down at the point the saga is about to send a command to it, it won’t retry and the system will be inconsistent. However, I’m checking how to configure the proper RetryScheduler for this, maybe associated to a persistent storage to queue the commands to retry.
- There is a strong coupling in terms of infrastructure configuration (upcasters, serializers) that could lead to some inconsistencies if the module A and module B configuration diverge. But I guess I would face the same issue by sharing one single Axon Server instance.
All in all, a proper Axon Server EE cluster is definitely more reliable and less cumbersome in terms of configuration on the client side !