Partial Replay of Events. How to?

Hello!

I am trying to figure out how to replay events starting from a particular time. Came across the following code snippet in the documentation but not sure if this is what I need.

public class Configuration {

    public TrackingEventProcessorConfiguration customConfiguration() {
        return TrackingEventProcessorConfiguration
                .forSingleThreadedProcessing()
                .andInitialTrackingToken(streamableMessageSource -> streamableMessageSource.createTokenAt(
                        Instant.parse("2007-12-03T10:15:30.00Z")
                ));
    }
}

But I am not sure how to run this code. Should this class be annotated with @Configuration and a method with a @Bean annotation?

@Bean   
public TrackingEventProcessorConfiguration customConfiguration() {
        return TrackingEventProcessorConfiguration
                .forSingleThreadedProcessing()
                .andInitialTrackingToken(streamableMessageSource -> streamableMessageSource.createTokenAt(
                Instant.parse("2021-05-10T20:06:10.296Z")
        ));
    }

Tried running my application with this method annotated with a @Bean annotation but Events Replay still starts from the very beginning even though I specify the exact instant starting from when I want events to replay.

Is it a correct way to start replaying events starting from a particular Instant? or can someone advise a tutorial or a guide explaining how to replay events since some specific date?

I don’t know if there is another way, but I explicitly register my tracking event processors passing the TrackingEventProcessorConfiguration to configure the initial tracking token.

Example:

@Autowired
public void configure(EventProcessingConfigurer config) {
    config.usingTrackingEventProcessors();
    config.registerTrackingEventProcessor(
            "subscriptionSaga",
            org.axonframework.config.Configuration::eventStore,
            configuration ->
                    TrackingEventProcessorConfiguration
                            .forParallelProcessing(4)
                            .andInitialTrackingToken(StreamableMessageSource::createHeadToken)
    );
    config.registerTrackingEventProcessor(
            "subscriptionProjection",
            org.axonframework.config.Configuration::eventStore,
            configuration ->
                    TrackingEventProcessorConfiguration
                            .forParallelProcessing(4)
                            .andInitialTrackingToken(StreamableMessageSource::createHeadToken)
    );
}
1 Like

Thank you, Rodrigo!
I will give it a try.

I see that one of your Tracking Event Processors is called “subscriptionSaga”. Do you somehow replay Sagas? My understanding was that Sagas are not replayable.

Sorry Serj, the first declaration is a mistake. You’re right; Sagas are not replayable. This is an old code snippet before I understand how Saga works.

1 Like

Rodrigo, so in this case, you are registering a completely NEW Tracking Event Processor with the name “subscriptionProjection”?
…

The reason I am asking is that my application is using a @ProcessingGroup(“subscription-group”). So, when I do full replay, I get and then stop the tracking event processing using the same name “subscription-group”. My understanding is was that I need to stop this/existing processer using the “subscription-group” as a name.

In your code snippet it looks like you are registering a new one to Replay Events. It makes me think that you somehow stop the currently running event processer manually in the Axon Server and then register this new one to Replay Events?

Yes, In that case, this works for a new event tracking processor, for instance, if you are creating a new projection and want to replay events partially or even don’t replay. You can use a workaround, I’m not proud about that, deleting the correspondent tokens from token_entry table before deploying this code.

1 Like

Thank you, Rodrigo!

Your code snippet gave me an idea and I think I have figured it out now.

    TrackingEventProcessor eventProcessorToReset = trackingEventProcessor.get();

    eventProcessorToReset.shutDown();
    //eventProcessorToReset.resetTokens();
    eventProcessorToReset.resetTokens(streamableMessageSource
                        -> streamableMessageSource.createTokenAt(
                        Instant.parse("2021-05-10T23:29:46.792Z")));
    eventProcessorToReset.start();

When I run this code snippet, I see that the Replay is partial. Although, even though I have provided an exact timestamp, the replay has started from a wrong index… Will debug it further but at least I am one step closer to getting it to work now :slightly_smiling_face:.

1 Like

Hello! I was searching for some info about replaying events and found this topic.
Actually from the “solution-message” I see how to create token from specific moment of time. Thank you for sharing this example!

But I have additional questions:
First question
What happens with database ? I have view-representation postgres DB, and

  1. when I do full-replay - I clear all database and wait when all replaying events have come.
  2. when I do partial replay - what I have to do ? Do I need to delete data from DB for specific period?

Second question
I have many events, and full-replay could take up to 1 day. How to get the replay status ? Actually I tried

configuration.eventProcessingConfiguration()
    .eventProcessorByProcessingGroup(processorName,
            StreamingEventProcessor.class)
    .ifPresent(streamingEventProcessor -> {
        System.out.println("status: " + streamingEventProcessor.processingStatus());
    });

But it returns trackerStatus{segment=Segment[0/0], … ) so there is no segments probably
Is there some best-practices for getting ReplayStatus ?