Multi tenancy event processors

Hi everyone,

currently we try to implement a multi-tenant Axon application. Each tenant has its own database or at least its own database schema.

Using the TrackingEventProcesor there is at least one dedicated thread for each processing group. I wonder how this scales in a multi-tenant-scenario. As far as I can see the TrackingEventProcessor is bound to a TokenStore (JPA in our case), but each tenant has its own schema and therefore its own TokenStore.

The only solution I currently see is to have a TrackingEventProcessor per processing group and per tenant. With let’s say 1.000 tenants and 5 processing groups we quickly have 5.000 event processor threads… and in this case we assume that there is only one segment for each tracking event processor.

I took a look at the PooledStreamingEventProcessor, but each instance of this processor type needs to be backed by a TokenStore as well. I would need a PooledStreamingEventProcessor for each tenant as well which might increase the complexity instead of reducing it.

I write this post, because I hope that their is something I missed to far. Any recommendations or hints are highly appreciated.

Oliver

Edit: I already crawled this board for answers, but did not succeed so far, although there are a couple of “Multi-tenency”-threads.

Hi Oliver,

Do you have one event-store for all events regardless of tenant? Without that having a TrackingEventProcessor per tenant would cause a lot of unnecessary work as each tenant specific TrackingEventProcessor will have to evaluate if the event is for the tenant it manages, throwing out the ones it does not handle. Would it be possible to have one TrackingEventProcessor, regardless of tenant, that has some logic in an interceptor that would setup the connection to the appropriate database before the event handler receives the event?

-Ben

Currently the idea is to have seperate token stores and event processors per tenant, but the question is: Does it scale? And I assume that it doesn’t as there are a lot of event processor threads floating around and producing noise which reduces the overall performance drastically.

Yes, with a TrackingEventProcessor for each tenant, all of the “noise” that each TrackingEventProcessor would have to filter out would cause your performance on each to be rather poor.

I will investigate the PooledStreamingEventProcessor… the idea is to have one PooledStreamingEventProcessor for each processing group,. This instance should be able to handle all the tenant by executing tasks wach in the context of a certain tenant.

First of all I have to understand how the PooledStreamingEventProcessor exactly works.
Maybe @Steven_van_Beelen can give a hint if this a is possible way and where to look at?

I think you can go a long way by using the same thread pool for all the PooledStreamingEventProcessor (PSEP) instances you set up. We have been thinking of auto-configuring it as such but felt that would be imposing too much on the users. Ease of configuration for this might come in the future because this isn’t the first scenario I’ve heard where using the threads for all processors at once was beneficial.

To use the same thread pool among several PSEP instances, you can do the following:

void samplePsepConfig(EventProcessingConfigurer processingConfigurer) {
    // This thread pool defaults to 1 within a PSEP
    ScheduledExecutorService coordinatorThreadPool =
            Executors.newScheduledThreadPool(5, new AxonThreadFactory("Coordinator"));

    // This pool also defaults to one, but should typically be set higher
    ScheduledExecutorService workPackageThreadPool =
            Executors.newScheduledThreadPool(10, new AxonThreadFactory("WorkPackage"));

    EventProcessingConfigurer.PooledStreamingProcessorConfiguration psepConfig =
            (config, builder) -> builder.coordinatorExecutor(coordinatorThreadPool)
                                        .workerExecutorService(workPackageThreadPool);

    processingConfigurer.registerPooledStreamingEventProcessor("foo-psep", Configuration::eventStore, psepConfig)
                        .registerPooledStreamingEventProcessor("bar-psep", Configuration::eventStore, psepConfig)
                        // ...
                        .registerPooledStreamingEventProcessor("baz-psep", Configuration::eventStore, psepConfig);
}

The above shows how you could construct the required ScheduledExecutorService instances used by the PSEP. To configure, you can provide a PooledStreamingProcessorConfiguration to the EventProcessingConfigurer. To reach that method, you will need to also require to provide the message source, which I have defaulted to the EventStore in this sample.

Additionally, the PSEP uses two distinct thread pools as you can see. The Coordinator thread pool is used to coordinate the event retrieval and segment operations (split/merge). The second thread pool, for the WorkPackages, is actually in charge of handling the events (thus invoking your @EventHandler annotated methods).

Internally, every WorkPackage resembles a TrackingToken segment. As the Coordinator is retrieving the events, you’ll thus have less streams open as opposed to the TEP, which opened a stream for every segment. Additionally, the TEP had a hard requirement that the number of threads was equal or greater to the number of segments. This is no longer the case for the PSEP, which can handle any number of segments (or more specifically, up to Short.MAX_VALUE),

Hope this gives you the guidance you’re looking for @Oliver_Libutzki!

Hi @Steven_van_Beelen,

thanks a lot! I finally managed to solve this. To be honest, without the new PooledStreamingEventProcessor this would not have been possible as my machine exploded because of the tons of threads the TrackingEventProcessor started.

The release of Axon 4.5 came just in time for us and I wholeheartly would like to thank your for this HUGE improvement.

Although we have a technical breakthrough the implementation might need some polishing. There are certain aspects which are not that “clean”.

First of all: As far as I understand Axon wants to assign a single EventProcessor instance to a single ProcessingGroup. In our multi-tenant scenario I would like to have an EventProcessor per tenant and per ProcessingGroup.

I solved this by having a TenantAwarePooledStreamingEventProcessor which iterates over all the tenants and creates a PooledStreamingEventProcessor for each tenant.

@Log4j2
class TenantAwarePooledStreamingEventProcessor extends AbstractEventProcessor {

	private final Map<String, PooledStreamingEventProcessor> eventProcessors;

	TenantAwarePooledStreamingEventProcessor( final MultiTenancyConfigurationProperties multiTenancyConfigurationProperties, final ScheduledExecutorService coordinatorexecutorService, final ScheduledExecutorService workerExecutorService, final String name, final org.axonframework.config.Configuration configuration, final EventProcessingModule eventProcessingModule, final EventHandlerInvoker eventHandlerInvoker ) {
		super( new Builder( ) {

		}
				.name( name )
				.eventHandlerInvoker( eventHandlerInvoker )
				.rollbackConfiguration( eventProcessingModule.rollbackConfiguration( name ) )
				.errorHandler( eventProcessingModule.errorHandler( name ) )
				.messageMonitor( eventProcessingModule.messageMonitor( TrackingEventProcessor.class, name ) ) );

		@SuppressWarnings( "unchecked" )
		final StreamableMessageSource<TrackedEventMessage<?>> eventBus = ( StreamableMessageSource<TrackedEventMessage<?>> ) configuration.eventBus( );

		eventProcessors = multiTenancyConfigurationProperties.getTenants( )
				.stream( )
				.collect( Collectors.toMap( Function.identity( ), tenant -> {
					final PooledStreamingEventProcessor.Builder builder = PooledStreamingEventProcessor.builder( )
							.name( tenant + "_" + name )
							.eventHandlerInvoker( eventHandlerInvoker )
							.rollbackConfiguration( eventProcessingModule.rollbackConfiguration( name ) )
							.errorHandler( eventProcessingModule.errorHandler( name ) )
							.messageMonitor( eventProcessingModule.messageMonitor( PooledStreamingEventProcessor.class, name ) )
							.messageSource( eventBus )
							.tokenStore( eventProcessingModule.tokenStore( name ) )
							.transactionManager( eventProcessingModule.transactionManager( name ) );
					return noOp( )
							.andThen( new TenantAwarePooledStreamingProcessorConfiguration( tenant, coordinatorexecutorService, workerExecutorService ) )
							.apply( configuration, builder )
							.build( );

				} ) );

	}

	@Override
	@StartHandler( phase = Phase.INBOUND_EVENT_CONNECTORS )
	public void start( ) {
		for ( final Entry<String, PooledStreamingEventProcessor> trackingEventProcessorEntry : eventProcessors.entrySet( ) ) {
			log.info( "Starting EventProcessor '" + trackingEventProcessorEntry.getValue( ).getName( ) + "' for tenant " + trackingEventProcessorEntry.getKey( ) );
			TenantContext.runWithTenantId( trackingEventProcessorEntry.getKey( ), ( ) -> trackingEventProcessorEntry.getValue( ).start( ) );
		}
	}

	/**
	 * {@inheritDoc}
	 * <p>
	 * Will be shutdown on the {@link Phase#INBOUND_EVENT_CONNECTORS} phase.
	 */
	@Override
	@ShutdownHandler( phase = Phase.INBOUND_EVENT_CONNECTORS )
	public CompletableFuture<Void> shutdownAsync( ) {
		return super.shutdownAsync( );
	}

	@Override
	public void shutDown( ) {
		for ( final Entry<String, PooledStreamingEventProcessor> trackingEventProcessorEntry : eventProcessors.entrySet( ) ) {
			log.info( "Shutting down EventProcessor '" + trackingEventProcessorEntry.getValue( ).getName( ) + "' for tenant " + trackingEventProcessorEntry.getKey( ) );
			TenantContext.runWithTenantId( trackingEventProcessorEntry.getKey( ), ( ) -> trackingEventProcessorEntry.getValue( ).shutDown( ) );
		}
	}

	@Override
	public boolean isRunning( ) {
		return eventProcessors.values( ).stream( ).anyMatch( EventProcessor::isRunning );
	}

	@Override
	public boolean isError( ) {
		return eventProcessors.values( ).stream( ).anyMatch( EventProcessor::isError );
	}
}

The TenantAwarePooledStreamingProcessorConfiguration configures the coordinatorExecutor and the workerExecutorService wrapping the globally configured ScheduledExecutorServices (which are backed by a ThreadPool) into a MultiTenantScheduledExecutorService.

@RequiredArgsConstructor
public class TenantAwarePooledStreamingProcessorConfiguration implements PooledStreamingProcessorConfiguration {

	private final String tenant;
	private final ScheduledExecutorService coordinatorExecutorService;
	private final ScheduledExecutorService workerExecutorService;

	@Override
	public Builder apply( final Configuration config, final Builder builder ) {
		return builder
				.coordinatorExecutor( new MultiTenantScheduledExecutorService( coordinatorExecutorService, tenant ) )
				.workerExecutorService( new MultiTenantScheduledExecutorService( workerExecutorService, tenant ) );
	}

}

The MultiTenantScheduledExecutorService executes the given Runnables in the context of a tenant.

@RequiredArgsConstructor
@Log4j2
public class MultiTenantScheduledExecutorService implements ScheduledExecutorService {

	@Delegate
	private final ScheduledExecutorService delegate;
	private final String tenant;

	@Override
	public ScheduledFuture<?> schedule( final Runnable command,
			final long delay, final TimeUnit unit ) {
		log.debug( "Scheduling task for tenant " + tenant );
		return delegate.schedule( runWithTenantId( command ), delay, unit );

	}

	@Override
	public ScheduledFuture<?> scheduleAtFixedRate( final Runnable command,
			final long initialDelay,
			final long period,
			final TimeUnit unit ) {
		log.debug( "Scheduling task at fixed rate for tenant " + tenant );
		return delegate.scheduleAtFixedRate( runWithTenantId( command ), initialDelay, period, unit );

	}

	@Override
	public ScheduledFuture<?> scheduleWithFixedDelay( final Runnable command,
			final long initialDelay,
			final long delay,
			final TimeUnit unit ) {
		log.debug( "Scheduling task with fixed delay for tenant " + tenant );
		return delegate.scheduleWithFixedDelay( runWithTenantId( command ), initialDelay, delay, unit );

	}

	@Override
	public Future<?> submit( final Runnable task ) {
		log.debug( "Submitting task for tenant " + tenant );
		return delegate.submit( runWithTenantId( task ) );
	}

	private Runnable runWithTenantId( final Runnable task ) {
		return TaskUtils.decorateTaskWithErrorHandler(
				( ) -> TenantContext.runWithTenantId( tenant,
						task::run ),
				TaskUtils.LOG_AND_PROPAGATE_ERROR_HANDLER, true );
	}

}

My question: Is there a more convient way to start a PooledStreamingEventProcessor for each tenant? In order to use our TenantAwarePooledStreamingEventProcessor we had to implement a EventProcessorBuilder and I’m not sure if this is the best way to achieve this.

@RequiredArgsConstructor
@Log4j2
public class MultiTenancyEventProcessorBuilder implements EventProcessorBuilder {

	private final MultiTenancyConfigurationProperties multiTenancyConfigurationProperties;
	private final ScheduledExecutorService coordinatorExecutorService;
	private final ScheduledExecutorService workerExecutorService;

	@Override
	public EventProcessor build( final String name, final org.axonframework.config.Configuration configuration, final EventHandlerInvoker eventHandlerInvoker ) {
		final List<EventProcessingModule> eventProcessingModules = configuration.findModules( EventProcessingModule.class );
		log.info( "Building TenantAwarePooledStreamingEventProcessor '" + name + "'" );
		return new TenantAwarePooledStreamingEventProcessor( multiTenancyConfigurationProperties, coordinatorExecutorService, workerExecutorService, name, configuration, eventProcessingModules.get( 0 ), eventHandlerInvoker );
	}

}

Thanks!

1 Like