Replaying subscribing event processors

Hi,

I know that this topic has been talked about multipe times (Replaying Events and Subscribing Event Processors, Replay events with subscribing event processors, …), yet I would like to be a little bit more specific.

To provide a short background: For technical reasons we are forced in our application to use some subscribing event processors. This means that it is rather difficult to migrate the corresponding read models and we would like to simply reset the subscribing event processors as it is possible with tracking event processors.

I came up with the following implementation.

public final class ResetableSubscribingEventProcessor extends SubscribingEventProcessor {

	private final TransactionManager txManager;

	...

	public void reset( ) {
		final EventHandlerInvoker eventHandlerInvoker = eventHandlerInvoker( );

		txManager.executeInTransaction( ( ) -> {
			eventHandlerInvoker.performReset( );
		} );

		@SuppressWarnings( "unchecked" )
		final StreamableMessageSource<TrackedEventMessage<?>> messageSource = ( StreamableMessageSource<TrackedEventMessage<?>> ) getMessageSource( );

		final BlockingStream<TrackedEventMessage<?>> stream = messageSource.openStream( messageSource.createTailToken( ) );
		while ( stream.hasNextAvailable( ) ) {
			txManager.executeInTransaction( ( ) -> {
				try {
					final TrackedEventMessage<?> eventMessage = stream.nextAvailable( );
					if ( eventHandlerInvoker.canHandle( eventMessage, Segment.ROOT_SEGMENT ) ) {
						eventHandlerInvoker.handle( eventMessage, Segment.ROOT_SEGMENT );
					}
				} catch ( final Exception ex ) {
					// TODO Handle exception
					ex.printStackTrace( );
				}
			} );
		}
	}

}

So what it does is the following:

  • It resets the internal event handler invoker, thus triggering the @ResetHandler marked methods (this was always possible).
  • It opens the whole event stream (beginning from the tail) and feeds it into the subscribing event processor.

Regarding the API this would allow to reset it similar to the tracking event processor

final Optional<ResetableSubscribingEventProcessor> optionalEventProcessor = eventProcessingConfiguration.eventProcessorByProcessingGroup( ..., ResetableSubscribingEventProcessor.class );
final ResetableSubscribingEventProcessor eventProcessor = optionalEventProcessor.get( );
eventProcessor.shutDown( );
eventProcessor.reset( );
eventProcessor.start( );

My first impression would be that this could (and seems to) work.A flaw in this implementation is, that the event processor won’t see events that are published between the shutdown and the start. If one resets the event processor during the start of the application (as we tend to do it) we should be safe though, as there usually shouldn’t occur any new events. Also this works of course only if the message source is a streamable message source (which it usually is).

Some questions:

  • Do you see any further flaws or any reason why this is a rather bad idea?
  • (How) do we need to care about the actual segments?

Thank you and best regards,

Nils

Edit: Thinking about it, we would of course have a problem when an exception occurs during the processing. We don’t have any kind of retry mechanism in this case, so the handling is an open question.