Event Replays general questions

Hi,
you can configure your application to use tracking event processor, that’s an approach. Just mind that you will have downtime (the need time for the processors to catchup and update projections). But since you are asking also about events replay on others processors, I think your idea is to replay a lot of events in general.

Regarding replay of events you can read my suggestion in this thread Re-create aggregate instance from events - #4 by Corrado_Musumeci and you can use code snippet below
Note that from AF 4.5 PooledStreamingEventProcessor was added to the available processors: this is based on tracking token, so it allow replay. We should consider this while replaying events.

@Component
@Endpoint(id="scheduletokenresetall")
public class TriggerResetAllEventEndpoint {

    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

    private final Configuration configuration;

    public TriggerResetAllEventEndpoint(Configuration configuration) {
        this.configuration = configuration;
    }

    @DeleteOperation()
    public void triggerResetAllEventEndpoints() {
        List<TrackingEventProcessor> trackingEventProcessors = configuration.eventProcessingConfiguration()
                                                                            .eventProcessors().values().stream()
                                                                            .filter(TrackingEventProcessor.class::isInstance)
                                                                            .map(TrackingEventProcessor.class::cast)
                                                                            .collect(Collectors.toList());
        trackingEventProcessors.stream().forEach(trackingEventProcessor -> {
            logger.info("Trigger ResetTriggeredEvent for trackingEventProcessor {}", trackingEventProcessor);
            trackingEventProcessor.shutDown();
            trackingEventProcessor.resetTokens();
            trackingEventProcessor.start();
        });

        List<StreamingEventProcessor> streamingEventProcessors = configuration.eventProcessingConfiguration()
                                                                            .eventProcessors().values().stream()
                                                                            .filter(StreamingEventProcessor.class::isInstance)
                                                                            .map(StreamingEventProcessor.class::cast)
                                                                            .collect(Collectors.toList());
        streamingEventProcessors.stream().forEach(streamingEventProcessor -> {
            if(streamingEventProcessor.supportsReset()) {
                logger.info("Trigger ResetTriggeredEvent for streamingEventProcessor {}", streamingEventProcessor);
                streamingEventProcessor.shutDown();
                streamingEventProcessor.resetTokens();
                streamingEventProcessor.start();
            }
        });

    }

}

Regarding Sagas I want to redirect you to this reply by Steven van Beelen on this question Can I replay a Saga in Axon Framework without rerunning old transactions? - Stack Overflow

And since you are asking a bit on event replay I can suggest you this podcast Podcast Exploring Axon: Season 2 Episode 4 - Event Replay & Event Processing with Steven van Beelen - YouTube

1 Like