Hi @Steven_van_Beelen,
thanks a lot! I finally managed to solve this. To be honest, without the new PooledStreamingEventProcessor
this would not have been possible as my machine exploded because of the tons of threads the TrackingEventProcessor
started.
The release of Axon 4.5 came just in time for us and I wholeheartly would like to thank your for this HUGE improvement.
Although we have a technical breakthrough the implementation might need some polishing. There are certain aspects which are not that “clean”.
First of all: As far as I understand Axon wants to assign a single EventProcessor instance to a single ProcessingGroup. In our multi-tenant scenario I would like to have an EventProcessor per tenant and per ProcessingGroup.
I solved this by having a TenantAwarePooledStreamingEventProcessor
which iterates over all the tenants and creates a PooledStreamingEventProcessor
for each tenant.
@Log4j2
class TenantAwarePooledStreamingEventProcessor extends AbstractEventProcessor {
private final Map<String, PooledStreamingEventProcessor> eventProcessors;
TenantAwarePooledStreamingEventProcessor( final MultiTenancyConfigurationProperties multiTenancyConfigurationProperties, final ScheduledExecutorService coordinatorexecutorService, final ScheduledExecutorService workerExecutorService, final String name, final org.axonframework.config.Configuration configuration, final EventProcessingModule eventProcessingModule, final EventHandlerInvoker eventHandlerInvoker ) {
super( new Builder( ) {
}
.name( name )
.eventHandlerInvoker( eventHandlerInvoker )
.rollbackConfiguration( eventProcessingModule.rollbackConfiguration( name ) )
.errorHandler( eventProcessingModule.errorHandler( name ) )
.messageMonitor( eventProcessingModule.messageMonitor( TrackingEventProcessor.class, name ) ) );
@SuppressWarnings( "unchecked" )
final StreamableMessageSource<TrackedEventMessage<?>> eventBus = ( StreamableMessageSource<TrackedEventMessage<?>> ) configuration.eventBus( );
eventProcessors = multiTenancyConfigurationProperties.getTenants( )
.stream( )
.collect( Collectors.toMap( Function.identity( ), tenant -> {
final PooledStreamingEventProcessor.Builder builder = PooledStreamingEventProcessor.builder( )
.name( tenant + "_" + name )
.eventHandlerInvoker( eventHandlerInvoker )
.rollbackConfiguration( eventProcessingModule.rollbackConfiguration( name ) )
.errorHandler( eventProcessingModule.errorHandler( name ) )
.messageMonitor( eventProcessingModule.messageMonitor( PooledStreamingEventProcessor.class, name ) )
.messageSource( eventBus )
.tokenStore( eventProcessingModule.tokenStore( name ) )
.transactionManager( eventProcessingModule.transactionManager( name ) );
return noOp( )
.andThen( new TenantAwarePooledStreamingProcessorConfiguration( tenant, coordinatorexecutorService, workerExecutorService ) )
.apply( configuration, builder )
.build( );
} ) );
}
@Override
@StartHandler( phase = Phase.INBOUND_EVENT_CONNECTORS )
public void start( ) {
for ( final Entry<String, PooledStreamingEventProcessor> trackingEventProcessorEntry : eventProcessors.entrySet( ) ) {
log.info( "Starting EventProcessor '" + trackingEventProcessorEntry.getValue( ).getName( ) + "' for tenant " + trackingEventProcessorEntry.getKey( ) );
TenantContext.runWithTenantId( trackingEventProcessorEntry.getKey( ), ( ) -> trackingEventProcessorEntry.getValue( ).start( ) );
}
}
/**
* {@inheritDoc}
* <p>
* Will be shutdown on the {@link Phase#INBOUND_EVENT_CONNECTORS} phase.
*/
@Override
@ShutdownHandler( phase = Phase.INBOUND_EVENT_CONNECTORS )
public CompletableFuture<Void> shutdownAsync( ) {
return super.shutdownAsync( );
}
@Override
public void shutDown( ) {
for ( final Entry<String, PooledStreamingEventProcessor> trackingEventProcessorEntry : eventProcessors.entrySet( ) ) {
log.info( "Shutting down EventProcessor '" + trackingEventProcessorEntry.getValue( ).getName( ) + "' for tenant " + trackingEventProcessorEntry.getKey( ) );
TenantContext.runWithTenantId( trackingEventProcessorEntry.getKey( ), ( ) -> trackingEventProcessorEntry.getValue( ).shutDown( ) );
}
}
@Override
public boolean isRunning( ) {
return eventProcessors.values( ).stream( ).anyMatch( EventProcessor::isRunning );
}
@Override
public boolean isError( ) {
return eventProcessors.values( ).stream( ).anyMatch( EventProcessor::isError );
}
}
The TenantAwarePooledStreamingProcessorConfiguration
configures the coordinatorExecutor
and the workerExecutorService
wrapping the globally configured ScheduledExecutorServices (which are backed by a ThreadPool) into a MultiTenantScheduledExecutorService
.
@RequiredArgsConstructor
public class TenantAwarePooledStreamingProcessorConfiguration implements PooledStreamingProcessorConfiguration {
private final String tenant;
private final ScheduledExecutorService coordinatorExecutorService;
private final ScheduledExecutorService workerExecutorService;
@Override
public Builder apply( final Configuration config, final Builder builder ) {
return builder
.coordinatorExecutor( new MultiTenantScheduledExecutorService( coordinatorExecutorService, tenant ) )
.workerExecutorService( new MultiTenantScheduledExecutorService( workerExecutorService, tenant ) );
}
}
The MultiTenantScheduledExecutorService
executes the given Runnables in the context of a tenant.
@RequiredArgsConstructor
@Log4j2
public class MultiTenantScheduledExecutorService implements ScheduledExecutorService {
@Delegate
private final ScheduledExecutorService delegate;
private final String tenant;
@Override
public ScheduledFuture<?> schedule( final Runnable command,
final long delay, final TimeUnit unit ) {
log.debug( "Scheduling task for tenant " + tenant );
return delegate.schedule( runWithTenantId( command ), delay, unit );
}
@Override
public ScheduledFuture<?> scheduleAtFixedRate( final Runnable command,
final long initialDelay,
final long period,
final TimeUnit unit ) {
log.debug( "Scheduling task at fixed rate for tenant " + tenant );
return delegate.scheduleAtFixedRate( runWithTenantId( command ), initialDelay, period, unit );
}
@Override
public ScheduledFuture<?> scheduleWithFixedDelay( final Runnable command,
final long initialDelay,
final long delay,
final TimeUnit unit ) {
log.debug( "Scheduling task with fixed delay for tenant " + tenant );
return delegate.scheduleWithFixedDelay( runWithTenantId( command ), initialDelay, delay, unit );
}
@Override
public Future<?> submit( final Runnable task ) {
log.debug( "Submitting task for tenant " + tenant );
return delegate.submit( runWithTenantId( task ) );
}
private Runnable runWithTenantId( final Runnable task ) {
return TaskUtils.decorateTaskWithErrorHandler(
( ) -> TenantContext.runWithTenantId( tenant,
task::run ),
TaskUtils.LOG_AND_PROPAGATE_ERROR_HANDLER, true );
}
}
My question: Is there a more convient way to start a PooledStreamingEventProcessor
for each tenant? In order to use our TenantAwarePooledStreamingEventProcessor
we had to implement a EventProcessorBuilder
and I’m not sure if this is the best way to achieve this.
@RequiredArgsConstructor
@Log4j2
public class MultiTenancyEventProcessorBuilder implements EventProcessorBuilder {
private final MultiTenancyConfigurationProperties multiTenancyConfigurationProperties;
private final ScheduledExecutorService coordinatorExecutorService;
private final ScheduledExecutorService workerExecutorService;
@Override
public EventProcessor build( final String name, final org.axonframework.config.Configuration configuration, final EventHandlerInvoker eventHandlerInvoker ) {
final List<EventProcessingModule> eventProcessingModules = configuration.findModules( EventProcessingModule.class );
log.info( "Building TenantAwarePooledStreamingEventProcessor '" + name + "'" );
return new TenantAwarePooledStreamingEventProcessor( multiTenancyConfigurationProperties, coordinatorExecutorService, workerExecutorService, name, configuration, eventProcessingModules.get( 0 ), eventHandlerInvoker );
}
}
Thanks!