Multi tenancy event processors

Hi everyone,

currently we try to implement a multi-tenant Axon application. Each tenant has its own database or at least its own database schema.

Using the TrackingEventProcesor there is at least one dedicated thread for each processing group. I wonder how this scales in a multi-tenant-scenario. As far as I can see the TrackingEventProcessor is bound to a TokenStore (JPA in our case), but each tenant has its own schema and therefore its own TokenStore.

The only solution I currently see is to have a TrackingEventProcessor per processing group and per tenant. With let’s say 1.000 tenants and 5 processing groups we quickly have 5.000 event processor threads… and in this case we assume that there is only one segment for each tracking event processor.

I took a look at the PooledStreamingEventProcessor, but each instance of this processor type needs to be backed by a TokenStore as well. I would need a PooledStreamingEventProcessor for each tenant as well which might increase the complexity instead of reducing it.

I write this post, because I hope that their is something I missed to far. Any recommendations or hints are highly appreciated.

Oliver

Edit: I already crawled this board for answers, but did not succeed so far, although there are a couple of “Multi-tenency”-threads.

Hi Oliver,

Do you have one event-store for all events regardless of tenant? Without that having a TrackingEventProcessor per tenant would cause a lot of unnecessary work as each tenant specific TrackingEventProcessor will have to evaluate if the event is for the tenant it manages, throwing out the ones it does not handle. Would it be possible to have one TrackingEventProcessor, regardless of tenant, that has some logic in an interceptor that would setup the connection to the appropriate database before the event handler receives the event?

-Ben

Currently the idea is to have seperate token stores and event processors per tenant, but the question is: Does it scale? And I assume that it doesn’t as there are a lot of event processor threads floating around and producing noise which reduces the overall performance drastically.

Yes, with a TrackingEventProcessor for each tenant, all of the “noise” that each TrackingEventProcessor would have to filter out would cause your performance on each to be rather poor.

I will investigate the PooledStreamingEventProcessor… the idea is to have one PooledStreamingEventProcessor for each processing group,. This instance should be able to handle all the tenant by executing tasks wach in the context of a certain tenant.

First of all I have to understand how the PooledStreamingEventProcessor exactly works.
Maybe @Steven_van_Beelen can give a hint if this a is possible way and where to look at?

I think you can go a long way by using the same thread pool for all the PooledStreamingEventProcessor (PSEP) instances you set up. We have been thinking of auto-configuring it as such but felt that would be imposing too much on the users. Ease of configuration for this might come in the future because this isn’t the first scenario I’ve heard where using the threads for all processors at once was beneficial.

To use the same thread pool among several PSEP instances, you can do the following:

void samplePsepConfig(EventProcessingConfigurer processingConfigurer) {
    // This thread pool defaults to 1 within a PSEP
    ScheduledExecutorService coordinatorThreadPool =
            Executors.newScheduledThreadPool(5, new AxonThreadFactory("Coordinator"));

    // This pool also defaults to one, but should typically be set higher
    ScheduledExecutorService workPackageThreadPool =
            Executors.newScheduledThreadPool(10, new AxonThreadFactory("WorkPackage"));

    EventProcessingConfigurer.PooledStreamingProcessorConfiguration psepConfig =
            (config, builder) -> builder.coordinatorExecutor(coordinatorThreadPool)
                                        .workerExecutorService(workPackageThreadPool);

    processingConfigurer.registerPooledStreamingEventProcessor("foo-psep", Configuration::eventStore, psepConfig)
                        .registerPooledStreamingEventProcessor("bar-psep", Configuration::eventStore, psepConfig)
                        // ...
                        .registerPooledStreamingEventProcessor("baz-psep", Configuration::eventStore, psepConfig);
}

The above shows how you could construct the required ScheduledExecutorService instances used by the PSEP. To configure, you can provide a PooledStreamingProcessorConfiguration to the EventProcessingConfigurer. To reach that method, you will need to also require to provide the message source, which I have defaulted to the EventStore in this sample.

Additionally, the PSEP uses two distinct thread pools as you can see. The Coordinator thread pool is used to coordinate the event retrieval and segment operations (split/merge). The second thread pool, for the WorkPackages, is actually in charge of handling the events (thus invoking your @EventHandler annotated methods).

Internally, every WorkPackage resembles a TrackingToken segment. As the Coordinator is retrieving the events, you’ll thus have less streams open as opposed to the TEP, which opened a stream for every segment. Additionally, the TEP had a hard requirement that the number of threads was equal or greater to the number of segments. This is no longer the case for the PSEP, which can handle any number of segments (or more specifically, up to Short.MAX_VALUE),

Hope this gives you the guidance you’re looking for @Oliver_Libutzki!

Hi @Steven_van_Beelen,

thanks a lot! I finally managed to solve this. To be honest, without the new PooledStreamingEventProcessor this would not have been possible as my machine exploded because of the tons of threads the TrackingEventProcessor started.

The release of Axon 4.5 came just in time for us and I wholeheartly would like to thank your for this HUGE improvement.

Although we have a technical breakthrough the implementation might need some polishing. There are certain aspects which are not that “clean”.

First of all: As far as I understand Axon wants to assign a single EventProcessor instance to a single ProcessingGroup. In our multi-tenant scenario I would like to have an EventProcessor per tenant and per ProcessingGroup.

I solved this by having a TenantAwarePooledStreamingEventProcessor which iterates over all the tenants and creates a PooledStreamingEventProcessor for each tenant.

@Log4j2
class TenantAwarePooledStreamingEventProcessor extends AbstractEventProcessor {

	private final Map<String, PooledStreamingEventProcessor> eventProcessors;

	TenantAwarePooledStreamingEventProcessor( final MultiTenancyConfigurationProperties multiTenancyConfigurationProperties, final ScheduledExecutorService coordinatorexecutorService, final ScheduledExecutorService workerExecutorService, final String name, final org.axonframework.config.Configuration configuration, final EventProcessingModule eventProcessingModule, final EventHandlerInvoker eventHandlerInvoker ) {
		super( new Builder( ) {

		}
				.name( name )
				.eventHandlerInvoker( eventHandlerInvoker )
				.rollbackConfiguration( eventProcessingModule.rollbackConfiguration( name ) )
				.errorHandler( eventProcessingModule.errorHandler( name ) )
				.messageMonitor( eventProcessingModule.messageMonitor( TrackingEventProcessor.class, name ) ) );

		@SuppressWarnings( "unchecked" )
		final StreamableMessageSource<TrackedEventMessage<?>> eventBus = ( StreamableMessageSource<TrackedEventMessage<?>> ) configuration.eventBus( );

		eventProcessors = multiTenancyConfigurationProperties.getTenants( )
				.stream( )
				.collect( Collectors.toMap( Function.identity( ), tenant -> {
					final PooledStreamingEventProcessor.Builder builder = PooledStreamingEventProcessor.builder( )
							.name( tenant + "_" + name )
							.eventHandlerInvoker( eventHandlerInvoker )
							.rollbackConfiguration( eventProcessingModule.rollbackConfiguration( name ) )
							.errorHandler( eventProcessingModule.errorHandler( name ) )
							.messageMonitor( eventProcessingModule.messageMonitor( PooledStreamingEventProcessor.class, name ) )
							.messageSource( eventBus )
							.tokenStore( eventProcessingModule.tokenStore( name ) )
							.transactionManager( eventProcessingModule.transactionManager( name ) );
					return noOp( )
							.andThen( new TenantAwarePooledStreamingProcessorConfiguration( tenant, coordinatorexecutorService, workerExecutorService ) )
							.apply( configuration, builder )
							.build( );

				} ) );

	}

	@Override
	@StartHandler( phase = Phase.INBOUND_EVENT_CONNECTORS )
	public void start( ) {
		for ( final Entry<String, PooledStreamingEventProcessor> trackingEventProcessorEntry : eventProcessors.entrySet( ) ) {
			log.info( "Starting EventProcessor '" + trackingEventProcessorEntry.getValue( ).getName( ) + "' for tenant " + trackingEventProcessorEntry.getKey( ) );
			TenantContext.runWithTenantId( trackingEventProcessorEntry.getKey( ), ( ) -> trackingEventProcessorEntry.getValue( ).start( ) );
		}
	}

	/**
	 * {@inheritDoc}
	 * <p>
	 * Will be shutdown on the {@link Phase#INBOUND_EVENT_CONNECTORS} phase.
	 */
	@Override
	@ShutdownHandler( phase = Phase.INBOUND_EVENT_CONNECTORS )
	public CompletableFuture<Void> shutdownAsync( ) {
		return super.shutdownAsync( );
	}

	@Override
	public void shutDown( ) {
		for ( final Entry<String, PooledStreamingEventProcessor> trackingEventProcessorEntry : eventProcessors.entrySet( ) ) {
			log.info( "Shutting down EventProcessor '" + trackingEventProcessorEntry.getValue( ).getName( ) + "' for tenant " + trackingEventProcessorEntry.getKey( ) );
			TenantContext.runWithTenantId( trackingEventProcessorEntry.getKey( ), ( ) -> trackingEventProcessorEntry.getValue( ).shutDown( ) );
		}
	}

	@Override
	public boolean isRunning( ) {
		return eventProcessors.values( ).stream( ).anyMatch( EventProcessor::isRunning );
	}

	@Override
	public boolean isError( ) {
		return eventProcessors.values( ).stream( ).anyMatch( EventProcessor::isError );
	}
}

The TenantAwarePooledStreamingProcessorConfiguration configures the coordinatorExecutor and the workerExecutorService wrapping the globally configured ScheduledExecutorServices (which are backed by a ThreadPool) into a MultiTenantScheduledExecutorService.

@RequiredArgsConstructor
public class TenantAwarePooledStreamingProcessorConfiguration implements PooledStreamingProcessorConfiguration {

	private final String tenant;
	private final ScheduledExecutorService coordinatorExecutorService;
	private final ScheduledExecutorService workerExecutorService;

	@Override
	public Builder apply( final Configuration config, final Builder builder ) {
		return builder
				.coordinatorExecutor( new MultiTenantScheduledExecutorService( coordinatorExecutorService, tenant ) )
				.workerExecutorService( new MultiTenantScheduledExecutorService( workerExecutorService, tenant ) );
	}

}

The MultiTenantScheduledExecutorService executes the given Runnables in the context of a tenant.

@RequiredArgsConstructor
@Log4j2
public class MultiTenantScheduledExecutorService implements ScheduledExecutorService {

	@Delegate
	private final ScheduledExecutorService delegate;
	private final String tenant;

	@Override
	public ScheduledFuture<?> schedule( final Runnable command,
			final long delay, final TimeUnit unit ) {
		log.debug( "Scheduling task for tenant " + tenant );
		return delegate.schedule( runWithTenantId( command ), delay, unit );

	}

	@Override
	public ScheduledFuture<?> scheduleAtFixedRate( final Runnable command,
			final long initialDelay,
			final long period,
			final TimeUnit unit ) {
		log.debug( "Scheduling task at fixed rate for tenant " + tenant );
		return delegate.scheduleAtFixedRate( runWithTenantId( command ), initialDelay, period, unit );

	}

	@Override
	public ScheduledFuture<?> scheduleWithFixedDelay( final Runnable command,
			final long initialDelay,
			final long delay,
			final TimeUnit unit ) {
		log.debug( "Scheduling task with fixed delay for tenant " + tenant );
		return delegate.scheduleWithFixedDelay( runWithTenantId( command ), initialDelay, delay, unit );

	}

	@Override
	public Future<?> submit( final Runnable task ) {
		log.debug( "Submitting task for tenant " + tenant );
		return delegate.submit( runWithTenantId( task ) );
	}

	private Runnable runWithTenantId( final Runnable task ) {
		return TaskUtils.decorateTaskWithErrorHandler(
				( ) -> TenantContext.runWithTenantId( tenant,
						task::run ),
				TaskUtils.LOG_AND_PROPAGATE_ERROR_HANDLER, true );
	}

}

My question: Is there a more convient way to start a PooledStreamingEventProcessor for each tenant? In order to use our TenantAwarePooledStreamingEventProcessor we had to implement a EventProcessorBuilder and I’m not sure if this is the best way to achieve this.

@RequiredArgsConstructor
@Log4j2
public class MultiTenancyEventProcessorBuilder implements EventProcessorBuilder {

	private final MultiTenancyConfigurationProperties multiTenancyConfigurationProperties;
	private final ScheduledExecutorService coordinatorExecutorService;
	private final ScheduledExecutorService workerExecutorService;

	@Override
	public EventProcessor build( final String name, final org.axonframework.config.Configuration configuration, final EventHandlerInvoker eventHandlerInvoker ) {
		final List<EventProcessingModule> eventProcessingModules = configuration.findModules( EventProcessingModule.class );
		log.info( "Building TenantAwarePooledStreamingEventProcessor '" + name + "'" );
		return new TenantAwarePooledStreamingEventProcessor( multiTenancyConfigurationProperties, coordinatorExecutorService, workerExecutorService, name, configuration, eventProcessingModules.get( 0 ), eventHandlerInvoker );
	}

}

Thanks!

1 Like

Happy to hear 4.5 came right on time for you and your team Oliver! Hoping the PooledStreamingEventProcessor gives you the components you require to proceed. And as always, if you find anything amiss, please let us know. We’ll try to find a solution ASAP in that case.

To move back a bit from your question, let me reiterate your point:

As far as I understand Axon wants to assign a single EventProcessor instance to a single ProcessingGroup.

This is not the case actually. The EventProcessor in Axon can be in charge of N Processing Groups. By default, and actually in almost all scenarios, there’s just one Processing Group per EventProcessor. When this format however explodes the number of threads beyond what’s manageable by an application, like in your scenario, it would be wiser to let an Event Processor be in charge of several Processing Groups.

As it stands, the Reference Guide is not extremely clear on this, although it can be seen in the diagram on this page. What it doesn’t specify, is how to achieve this different ordering.

Key is with the assignment rules you can set on the EventProcessingConfigurer. More specifically, the assignProcessingGroup(String, String) and assignProcessingGroup(Function<String, String>) methods can be used to influence Processing Groups to move to different Processors instead of the one-to-one mapping the framework defaults too.

Doing this, you should be able to define a single PooledStreamingEventProcessor for a bunch of Processing Groups per tenant, each containing (I assume) the same set of Event Handling Components. Thus, you’d no longer have to provide your own EventProcessorBuilder, but only have to define the custom ScheduledExecutorService to the PSEP.

Hope this gives you some guidance @Oliver_Libutzki! If I caught your problem incorrectly, please let me know! Then I’ll adjust my answer.

Hi @Steven_van_Beelen,

thanks for the clarification.

Nevertheless, I need a EventProcessor instance per tenant in order to configure the ScheduledExecutorService which executes a task in the context of a tenant, right?

I wonder how to achieve that without providing a custom EventProcessorBuilder.

Btw. your assumption is correct that each tenant has the same event handling components.

With the current MultiTenantScheduledExecutorService it looks like that’s indeed a requirement. At that point, there’s no means to use Axon’s Message (to retrieve the tenant id from the MetaData) or the UnitOfWork (to retrieve it from the UoW’s resources), as both are out of scope when the executor service is invoked… What you’d need to know, is the processing group at that point in time. That way the processing group could reflect the tenant id too. Ideally, it would be an object carrying a name and tenant id maybe.

Having said that, I think you have two options:

  1. Stick with what you have right now
  2. Not use the MultiTenantScheduledExecutorService

A third option would be a generic solution providing something like this, although I don’t know one on the top of my mind. Hence, I’ve left a third option out.

In the meantime we decided to solve this differently by deploying one instance of our application per tenant… So we do not have to deal with the tenancy stuff at application level.

Nevertheless, we learned a lot about the PooledStreamingEventProcessor which - from now on - we use instead of the TrackingEventProcessor.

That being said, I share your excitement about the new event processor.