Axon 4.4.3 application interacting with multiple AxonServer SE

Hello all,

Here is the scenario I try to cover:

  • I have a module A connected to Axon Server A
  • I have a module B connected to Axon Server B
  • I have a Saga in the module A that would send commands to the module B and listen to events coming from Axon Server A and Axon Server B

Module B configuration

I don’t see anything special to do here, except of course to make sure that it points to the correct Axon server instance.

Module A configuration

This is where I’m still struggling to identify the proper configuration.
My assumption is that I have to configure these additional components beyond the default configuration:

  1. A (remote) Command Gateway that would dispatch the commands to Axon Server B
  2. A remote event store pointing to Axon Server B.
  3. A Tracking processor with multiple streaming sources, with the long polling source being Axon Server A, and the second one being the one declared at step 2.
  4. A Saga backed by the tracking processor configured at the step 3 in which we would inject the two command gateways (local command bus, and the remote one configured at the step 1).

From what I can see so far, it comes down to correctly configure the AxonServerConnectionManager in the respective builders of the DefaultCommandGateway and the AxonServerEventstore.

Am I on the right way ? Is it a configuration that is normally supported ? I’m well aware that I could simplify that by sharing the same Axon Server (and relying on EE multi-context support to isolate the events from the domain A to the domain B), but sticking with the current topology is currently a hard requirement on my side :slight_smile:

Thank you,
Have a nice day,
Jerome

Some progress but it seems to be hardly maintainable.

@Configuration
@Profile(“saga-owner”)
public class SagaOwnerConfiguration {

public static final String LOCAL_SAGA = “local-saga”;
private final AxonServerConnectionManager remoteAxonServerConnectionManager = AxonServerConnectionManager
.builder().axonServerConfiguration(
remoteAxonServerConfiguration()).build();

@Bean
@Primary
public CommandGateway localCommandGateway(CommandBus commandBus) {
return DefaultCommandGateway.builder().commandBus(commandBus).build();
}

@Bean(name = “remoteCommandGateway”)
public CommandGateway remoteCommandGateway(
@Qualifier(“localSegment”) CommandBus localSegment,
@Qualifier(“messageSerializer”) Serializer messageSerializer,
RoutingStrategy routingStrategy,
CommandPriorityCalculator priorityCalculator,
CommandLoadFactorProvider loadFactorProvider,
TargetContextResolver<? super CommandMessage<?>> targetContextResolver) {

return DefaultCommandGateway.builder()
    .commandBus(AxonServerCommandBus.builder()
        .axonServerConnectionManager(
            remoteAxonServerConnectionManager())
        .configuration(remoteAxonServerConfiguration())
        .localSegment(localSegment)
        .serializer(messageSerializer)
        .routingStrategy(routingStrategy)
        .priorityCalculator(priorityCalculator)
        .loadFactorProvider(loadFactorProvider)
        .targetContextResolver(targetContextResolver)
        .build()).build();

}

public AxonServerConnectionManager remoteAxonServerConnectionManager() {
return remoteAxonServerConnectionManager;
}

public AxonServerConfiguration remoteAxonServerConfiguration() {
return AxonServerConfiguration.builder().servers(“localhost:8125”).componentName(“saga-owner”)
.build();
}

public AxonServerEventStore remoteEventStore() {

return AxonServerEventStore.builder()
    .configuration(remoteAxonServerConfiguration())
    .platformConnectionManager(
        remoteAxonServerConnectionManager())
    .build();

}

@Bean
public MultiStreamableMessageSource multiStreamableMessageSource(EventStore localEventStore) {
return MultiStreamableMessageSource.builder()
.addMessageSource(“localEventStore”, localEventStore)
.addMessageSource(“remoteEventStore”,
remoteEventStore())
.longPollingSource(“localEventStore”)
.build();
}

@Autowired
public void configure(EventProcessingConfigurer eventProcessingConfigurer,
MultiStreamableMessageSource multiStreamableMessageSource) {

eventProcessingConfigurer
    .registerTrackingEventProcessor(LOCAL_SAGA, c -> multiStreamableMessageSource,
        c -> TrackingEventProcessorConfiguration
            .forSingleThreadedProcessing().andInitialTrackingToken(
                StreamableMessageSource::createHeadToken));

}

@Bean
CommandLineRunner commandLineRunner(CommandGateway commandGateway) {
return args -> commandGateway.send(new CreateAggregateA(UUID.randomUUID().toString()));
}

@Bean
SagaStore inMemorySagaStore() {
return new InMemorySagaStore();
}
}

This is not completely functional, as the Multi-sourced saga doesn’t process the events from the Axon Server B. On the other hand, the command processing part seems to work well. I keep digging.

The following configuration is working, it was the remote event store that wasn’t properly configured.

@Configuration @Profile("saga-owner")

public class SagaOwnerConfiguration {

public static final String LOCAL_SAGA = “local-saga”;
private final AxonServerConnectionManager remoteAxonServerConnectionManager = AxonServerConnectionManager
.builder().axonServerConfiguration(
remoteAxonServerConfiguration()).build();

@Bean
@Primary
public CommandGateway localCommandGateway(CommandBus commandBus) {
return DefaultCommandGateway.builder().commandBus(commandBus).build();
}

@Bean(name = “remoteCommandGateway”)
public CommandGateway remoteCommandGateway(
@Qualifier(“localSegment”) CommandBus localSegment,
@Qualifier(“messageSerializer”) Serializer messageSerializer,
RoutingStrategy routingStrategy,
CommandPriorityCalculator priorityCalculator,
CommandLoadFactorProvider loadFactorProvider,
TargetContextResolver<? super CommandMessage<?>> targetContextResolver) {

return DefaultCommandGateway.builder()
    .commandBus(AxonServerCommandBus.builder()
        .axonServerConnectionManager(
            remoteAxonServerConnectionManager())
        .configuration(remoteAxonServerConfiguration())
        .localSegment(localSegment)
        .serializer(messageSerializer)
        .routingStrategy(routingStrategy)
        .priorityCalculator(priorityCalculator)
        .loadFactorProvider(loadFactorProvider)
        .targetContextResolver(targetContextResolver)
        .build()).build();

}

private AxonServerConnectionManager remoteAxonServerConnectionManager() {
return remoteAxonServerConnectionManager;
}

private AxonServerConfiguration remoteAxonServerConfiguration() {
return AxonServerConfiguration.builder().servers(“localhost:8125”).componentName(“saga-owner”)
.build();
}

@Bean
public MultiStreamableMessageSource multiStreamableMessageSource(EventStore localEventStore,
AxonConfiguration configuration,
AxonServerConnectionManager axonServerConnectionManager,
Serializer snapshotSerializer,
@Qualifier(“eventSerializer”) Serializer eventSerializer) {
return MultiStreamableMessageSource.builder()
.addMessageSource(“localEventStore”, localEventStore)
.addMessageSource(“remoteEventStore”,
AxonServerEventStore.builder()
.messageMonitor(configuration
.messageMonitor(AxonServerEventStore.class, “eventStore”))
.configuration(remoteAxonServerConfiguration())
.platformConnectionManager(remoteAxonServerConnectionManager())
.snapshotSerializer(snapshotSerializer)
.eventSerializer(eventSerializer)
.snapshotFilter(configuration.snapshotFilter())
.upcasterChain(configuration.upcasterChain())
.build())
.longPollingSource(“localEventStore”)
.build();
}

@Autowired
public void configure(EventProcessingConfigurer eventProcessingConfigurer,
MultiStreamableMessageSource multiStreamableMessageSource) {

eventProcessingConfigurer
    .registerTrackingEventProcessor(LOCAL_SAGA, c -> multiStreamableMessageSource,
        c -> TrackingEventProcessorConfiguration
            .forSingleThreadedProcessing().andInitialTrackingToken(
                StreamableMessageSource::createHeadToken));

}

@Bean
CommandLineRunner commandLineRunner(CommandGateway commandGateway) {
return args -> commandGateway.send(new CreateAggregateA(UUID.randomUUID().toString()));
}
}

This approach has a lot of downsides though:

  • If the remote axon server is down at the point the saga is about to send a command to it, it won’t retry and the system will be inconsistent. However, I’m checking how to configure the proper RetryScheduler for this, maybe associated to a persistent storage to queue the commands to retry.
  • There is a strong coupling in terms of infrastructure configuration (upcasters, serializers) that could lead to some inconsistencies if the module A and module B configuration diverge. But I guess I would face the same issue by sharing one single Axon Server instance.

All in all, a proper Axon Server EE cluster is definitely more reliable and less cumbersome in terms of configuration on the client side :wink: !