Replaying subscribing event processors

Hi,

I know that this topic has been talked about multipe times (Replaying Events and Subscribing Event Processors, Replay events with subscribing event processors, …), yet I would like to be a little bit more specific.

To provide a short background: For technical reasons we are forced in our application to use some subscribing event processors. This means that it is rather difficult to migrate the corresponding read models and we would like to simply reset the subscribing event processors as it is possible with tracking event processors.

I came up with the following implementation.

public final class ResetableSubscribingEventProcessor extends SubscribingEventProcessor {

	private final TransactionManager txManager;

	...

	public void reset( ) {
		final EventHandlerInvoker eventHandlerInvoker = eventHandlerInvoker( );

		txManager.executeInTransaction( ( ) -> {
			eventHandlerInvoker.performReset( );
		} );

		@SuppressWarnings( "unchecked" )
		final StreamableMessageSource<TrackedEventMessage<?>> messageSource = ( StreamableMessageSource<TrackedEventMessage<?>> ) getMessageSource( );

		final BlockingStream<TrackedEventMessage<?>> stream = messageSource.openStream( messageSource.createTailToken( ) );
		while ( stream.hasNextAvailable( ) ) {
			txManager.executeInTransaction( ( ) -> {
				try {
					final TrackedEventMessage<?> eventMessage = stream.nextAvailable( );
					if ( eventHandlerInvoker.canHandle( eventMessage, Segment.ROOT_SEGMENT ) ) {
						eventHandlerInvoker.handle( eventMessage, Segment.ROOT_SEGMENT );
					}
				} catch ( final Exception ex ) {
					// TODO Handle exception
					ex.printStackTrace( );
				}
			} );
		}
	}

}

So what it does is the following:

  • It resets the internal event handler invoker, thus triggering the @ResetHandler marked methods (this was always possible).
  • It opens the whole event stream (beginning from the tail) and feeds it into the subscribing event processor.

Regarding the API this would allow to reset it similar to the tracking event processor

final Optional<ResetableSubscribingEventProcessor> optionalEventProcessor = eventProcessingConfiguration.eventProcessorByProcessingGroup( ..., ResetableSubscribingEventProcessor.class );
final ResetableSubscribingEventProcessor eventProcessor = optionalEventProcessor.get( );
eventProcessor.shutDown( );
eventProcessor.reset( );
eventProcessor.start( );

My first impression would be that this could (and seems to) work.A flaw in this implementation is, that the event processor won’t see events that are published between the shutdown and the start. If one resets the event processor during the start of the application (as we tend to do it) we should be safe though, as there usually shouldn’t occur any new events. Also this works of course only if the message source is a streamable message source (which it usually is).

Some questions:

  • Do you see any further flaws or any reason why this is a rather bad idea?
  • (How) do we need to care about the actual segments?

Thank you and best regards,

Nils

Edit: Thinking about it, we would of course have a problem when an exception occurs during the processing. We don’t have any kind of retry mechanism in this case, so the handling is an open question.

For future readers: This has been discussed in this issue and it was decided that this will not be part of Axon.

That said, I still provide my current idea for a resettable subscribing event processor. Note that I haven’t tested it extensively. It is just an idea.

Edit: I fixed two bugs in the code below. The stream is now closed after it has been used and the Disallow-Replay-Annotation is correctly processed.

public final class ResettableSubscribingEventProcessor extends SubscribingEventProcessor {

	private final TransactionManager transactionManager;

	protected ResettableSubscribingEventProcessor( final Builder builder ) {
		super( builder );
		transactionManager = builder.transactionManager;
	}

	public static Builder builder( ) {
		return new Builder( );
	}

	public boolean supportsReset( ) {
		return eventHandlerInvoker( ).supportsReset( ) && getMessageSource( ) instanceof StreamableMessageSource;
	}

	public void reset( ) {
		if ( !supportsReset( ) ) {
			throw new UnsupportedOperationException( "This event processor cannot be reset. Either the underlying event handler invoker does not support a reset of the message source is not streamble." );
		}
		final EventHandlerInvoker eventHandlerInvoker = eventHandlerInvoker( );

		transactionManager.executeInTransaction( ( ) -> {
			eventHandlerInvoker.performReset( );
		} );

		@SuppressWarnings( "unchecked" )
		final StreamableMessageSource<TrackedEventMessage<?>> messageSource = ( StreamableMessageSource<TrackedEventMessage<?>> ) getMessageSource( );

		final TrackingToken tailToken = messageSource.createTailToken( );
		try ( final BlockingStream<TrackedEventMessage<?>> stream = messageSource.openStream( tailToken ) ) {
			while ( stream.hasNextAvailable( ) ) {
				transactionManager.executeInTransaction( ( ) -> {
					try {
						final TrackedEventMessage<?> eventMessage = stream.nextAvailable( );
						final TrackedEventMessage<?> eventMessageWithReplayToken = eventMessage.withTrackingToken( new ReplayToken( tailToken ) );
						if ( eventHandlerInvoker.canHandle( eventMessageWithReplayToken, Segment.ROOT_SEGMENT ) ) {
							eventHandlerInvoker.handle( eventMessageWithReplayToken, Segment.ROOT_SEGMENT );
						}
					} catch ( final Exception ex ) {
						throw new IllegalStateException( "The reset failed. The event processor is in an undefined state.", ex );
					}
				} );
			}
		}
	}

	public static final class Builder extends SubscribingEventProcessor.Builder {

		private TransactionManager transactionManager = NoTransactionManager.INSTANCE;

		@Override
		public Builder transactionManager( final TransactionManager transactionManager ) {
			super.transactionManager( transactionManager );
			this.transactionManager = transactionManager;
			return this;
		}

		@Override
		public ResettableSubscribingEventProcessor build( ) {
			return new ResettableSubscribingEventProcessor( this );
		}

	}

}

To use it:

c.registerEventProcessor( "MyEventProcessor", ( name, configuration, eventHandlerInvoker ) -> {
	final EventProcessingModule eventProcessingModule = configuration.findModules( EventProcessingModule.class ).get( 0 );
	
	return ResettableSubscribingEventProcessor
			.builder( )
			.name( name )
			.eventHandlerInvoker( eventHandlerInvoker )
			.rollbackConfiguration( eventProcessingModule.rollbackConfiguration( name ) )
			.errorHandler( eventProcessingModule.errorHandler( name ) )
			.messageMonitor( eventProcessingModule.messageMonitor( ResettableSubscribingEventProcessor.class, name ) )
			.messageSource( configuration.eventBus( ) )
			.processingStrategy( DirectEventProcessingStrategy.INSTANCE )
			.transactionManager( eventProcessingModule.transactionManager( name ) )
			.build( );
}

Best regards

Nils

1 Like