Hi @Steven_van_Beelen,
thanks a lot! I finally managed to solve this. To be honest, without the new PooledStreamingEventProcessor
this would not have been possible as my machine exploded because of the tons of threads the TrackingEventProcessor
The release of Axon 4.5 came just in time for us and I wholeheartly would like to thank your for this HUGE improvement.
Although we have a technical breakthrough the implementation might need some polishing. There are certain aspects which are not that “clean”.
First of all: As far as I understand Axon wants to assign a single EventProcessor instance to a single ProcessingGroup. In our multi-tenant scenario I would like to have an EventProcessor per tenant and per ProcessingGroup.
I solved this by having a TenantAwarePooledStreamingEventProcessor
which iterates over all the tenants and creates a PooledStreamingEventProcessor
for each tenant.
class TenantAwarePooledStreamingEventProcessor extends AbstractEventProcessor {
private final Map<String, PooledStreamingEventProcessor> eventProcessors;
TenantAwarePooledStreamingEventProcessor( final MultiTenancyConfigurationProperties multiTenancyConfigurationProperties, final ScheduledExecutorService coordinatorexecutorService, final ScheduledExecutorService workerExecutorService, final String name, final org.axonframework.config.Configuration configuration, final EventProcessingModule eventProcessingModule, final EventHandlerInvoker eventHandlerInvoker ) {
super( new Builder( ) {
.name( name )
.eventHandlerInvoker( eventHandlerInvoker )
.rollbackConfiguration( eventProcessingModule.rollbackConfiguration( name ) )
.errorHandler( eventProcessingModule.errorHandler( name ) )
.messageMonitor( eventProcessingModule.messageMonitor( TrackingEventProcessor.class, name ) ) );
@SuppressWarnings( "unchecked" )
final StreamableMessageSource<TrackedEventMessage<?>> eventBus = ( StreamableMessageSource<TrackedEventMessage<?>> ) configuration.eventBus( );
eventProcessors = multiTenancyConfigurationProperties.getTenants( )
.stream( )
.collect( Collectors.toMap( Function.identity( ), tenant -> {
final PooledStreamingEventProcessor.Builder builder = PooledStreamingEventProcessor.builder( )
.name( tenant + "_" + name )
.eventHandlerInvoker( eventHandlerInvoker )
.rollbackConfiguration( eventProcessingModule.rollbackConfiguration( name ) )
.errorHandler( eventProcessingModule.errorHandler( name ) )
.messageMonitor( eventProcessingModule.messageMonitor( PooledStreamingEventProcessor.class, name ) )
.messageSource( eventBus )
.tokenStore( eventProcessingModule.tokenStore( name ) )
.transactionManager( eventProcessingModule.transactionManager( name ) );
return noOp( )
.andThen( new TenantAwarePooledStreamingProcessorConfiguration( tenant, coordinatorexecutorService, workerExecutorService ) )
.apply( configuration, builder )
.build( );
} ) );
@StartHandler( phase = Phase.INBOUND_EVENT_CONNECTORS )
public void start( ) {
for ( final Entry<String, PooledStreamingEventProcessor> trackingEventProcessorEntry : eventProcessors.entrySet( ) ) {
log.info( "Starting EventProcessor '" + trackingEventProcessorEntry.getValue( ).getName( ) + "' for tenant " + trackingEventProcessorEntry.getKey( ) );
TenantContext.runWithTenantId( trackingEventProcessorEntry.getKey( ), ( ) -> trackingEventProcessorEntry.getValue( ).start( ) );
* {@inheritDoc}
* <p>
* Will be shutdown on the {@link Phase#INBOUND_EVENT_CONNECTORS} phase.
@ShutdownHandler( phase = Phase.INBOUND_EVENT_CONNECTORS )
public CompletableFuture<Void> shutdownAsync( ) {
return super.shutdownAsync( );
public void shutDown( ) {
for ( final Entry<String, PooledStreamingEventProcessor> trackingEventProcessorEntry : eventProcessors.entrySet( ) ) {
log.info( "Shutting down EventProcessor '" + trackingEventProcessorEntry.getValue( ).getName( ) + "' for tenant " + trackingEventProcessorEntry.getKey( ) );
TenantContext.runWithTenantId( trackingEventProcessorEntry.getKey( ), ( ) -> trackingEventProcessorEntry.getValue( ).shutDown( ) );
public boolean isRunning( ) {
return eventProcessors.values( ).stream( ).anyMatch( EventProcessor::isRunning );
public boolean isError( ) {
return eventProcessors.values( ).stream( ).anyMatch( EventProcessor::isError );
The TenantAwarePooledStreamingProcessorConfiguration
configures the coordinatorExecutor
and the workerExecutorService
wrapping the globally configured ScheduledExecutorServices (which are backed by a ThreadPool) into a MultiTenantScheduledExecutorService
public class TenantAwarePooledStreamingProcessorConfiguration implements PooledStreamingProcessorConfiguration {
private final String tenant;
private final ScheduledExecutorService coordinatorExecutorService;
private final ScheduledExecutorService workerExecutorService;
public Builder apply( final Configuration config, final Builder builder ) {
return builder
.coordinatorExecutor( new MultiTenantScheduledExecutorService( coordinatorExecutorService, tenant ) )
.workerExecutorService( new MultiTenantScheduledExecutorService( workerExecutorService, tenant ) );
The MultiTenantScheduledExecutorService
executes the given Runnables in the context of a tenant.
public class MultiTenantScheduledExecutorService implements ScheduledExecutorService {
private final ScheduledExecutorService delegate;
private final String tenant;
public ScheduledFuture<?> schedule( final Runnable command,
final long delay, final TimeUnit unit ) {
log.debug( "Scheduling task for tenant " + tenant );
return delegate.schedule( runWithTenantId( command ), delay, unit );
public ScheduledFuture<?> scheduleAtFixedRate( final Runnable command,
final long initialDelay,
final long period,
final TimeUnit unit ) {
log.debug( "Scheduling task at fixed rate for tenant " + tenant );
return delegate.scheduleAtFixedRate( runWithTenantId( command ), initialDelay, period, unit );
public ScheduledFuture<?> scheduleWithFixedDelay( final Runnable command,
final long initialDelay,
final long delay,
final TimeUnit unit ) {
log.debug( "Scheduling task with fixed delay for tenant " + tenant );
return delegate.scheduleWithFixedDelay( runWithTenantId( command ), initialDelay, delay, unit );
public Future<?> submit( final Runnable task ) {
log.debug( "Submitting task for tenant " + tenant );
return delegate.submit( runWithTenantId( task ) );
private Runnable runWithTenantId( final Runnable task ) {
return TaskUtils.decorateTaskWithErrorHandler(
( ) -> TenantContext.runWithTenantId( tenant,
task::run ),
My question: Is there a more convient way to start a PooledStreamingEventProcessor
for each tenant? In order to use our TenantAwarePooledStreamingEventProcessor
we had to implement a EventProcessorBuilder
and I’m not sure if this is the best way to achieve this.
public class MultiTenancyEventProcessorBuilder implements EventProcessorBuilder {
private final MultiTenancyConfigurationProperties multiTenancyConfigurationProperties;
private final ScheduledExecutorService coordinatorExecutorService;
private final ScheduledExecutorService workerExecutorService;
public EventProcessor build( final String name, final org.axonframework.config.Configuration configuration, final EventHandlerInvoker eventHandlerInvoker ) {
final List<EventProcessingModule> eventProcessingModules = configuration.findModules( EventProcessingModule.class );
log.info( "Building TenantAwarePooledStreamingEventProcessor '" + name + "'" );
return new TenantAwarePooledStreamingEventProcessor( multiTenancyConfigurationProperties, coordinatorExecutorService, workerExecutorService, name, configuration, eventProcessingModules.get( 0 ), eventHandlerInvoker );