Hi, I use AxonFramework 4.5.5
I have TrackingEventProcessor for parallel processing with 2 threads and segments.
configuration -> TrackingEventProcessorConfiguration.forParallelProcessing(2).andInitialTrackingToken(StreamableMessageSource::createHeadToken).andInitialSegmentsCount(2))
I want to start replay, by calling this:
public void replay() {
String processorName = "mark.test.axonreplay";
eventProcessingConfiguration.eventProcessor(processorName, TrackingEventProcessor.class).ifPresent(processor -> {
processor.shutDown();
processor.resetTokens();
processor.start();
});
}
But replay does not start. The output is like this:
2022-10-11 15:14:43.254 INFO 2100704 --- [nio-8080-exec-5] o.a.e.TrackingEventProcessor : Shutdown state set for Processor 'mark.test.axonreplay'.
2022-10-11 15:14:43.254 INFO 2100704 --- [nio-8080-exec-5] o.a.e.TrackingEventProcessor : Processor 'mark.test.axonreplay' awaiting termination...
2022-10-11 15:14:43.547 INFO 2100704 --- [t.axonreplay]-0] o.a.e.TrackingEventProcessor : Released claim
2022-10-11 15:14:43.548 INFO 2100704 --- [t.axonreplay]-0] o.a.e.TrackingEventProcessor : Worker for segment Segment[1/1] stopped.
2022-10-11 15:14:43.738 INFO 2100704 --- [t.axonreplay]-1] o.a.e.TrackingEventProcessor : Released claim
2022-10-11 15:14:43.738 INFO 2100704 --- [t.axonreplay]-1] o.a.e.TrackingEventProcessor : Worker for segment Segment[0/1] stopped.
2022-10-11 15:14:43.775 INFO 2100704 --- [t.axonreplay]-2] o.a.e.TrackingEventProcessor : Worker assigned to segment Segment[0/1] for processing
2022-10-11 15:14:43.775 INFO 2100704 --- [t.axonreplay]-2] o.a.e.TrackingEventProcessor : Dispatching new tracking segment worker: TrackingSegmentWorker{processor=mark.test.axonreplay, segment=Segment[0/1]}
2022-10-11 15:14:43.779 INFO 2100704 --- [t.axonreplay]-2] o.a.e.TrackingEventProcessor : Worker assigned to segment Segment[1/1] for processing
2022-10-11 15:14:43.779 INFO 2100704 --- [t.axonreplay]-3] o.a.e.TrackingEventProcessor : Fetched token: GapAwareTrackingToken{index=24, gaps=[]} for segment: Segment[0/1]
2022-10-11 15:14:43.780 INFO 2100704 --- [t.axonreplay]-2] o.a.e.TrackingEventProcessor : Using current Thread for last segment worker: TrackingSegmentWorker{processor=mark.test.axonreplay, segment=Segment[1/1]}
2022-10-11 15:14:43.782 INFO 2100704 --- [t.axonreplay]-2] o.a.e.TrackingEventProcessor : Fetched token: GapAwareTrackingToken{index=24, gaps=[]} for segment: Segment[1/1]
HOWEVER when I configure for single threaded event processing
configuration -> TrackingEventProcessorConfiguration.forSingleThreadedProcessing());
it works ok with the output like below:
2022-10-11 14:46:42.986 INFO 2098340 --- [nio-8080-exec-8] o.a.e.TrackingEventProcessor : Shutdown state set for Processor 'mark.test.axonreplay'.
2022-10-11 14:46:42.987 INFO 2098340 --- [nio-8080-exec-8] o.a.e.TrackingEventProcessor : Processor 'mark.test.axonreplay' awaiting termination...
2022-10-11 14:46:43.184 INFO 2098340 --- [t.axonreplay]-0] o.a.e.TrackingEventProcessor : Released claim
2022-10-11 14:46:43.185 INFO 2098340 --- [t.axonreplay]-0] o.a.e.TrackingEventProcessor : Worker for segment Segment[0/0] stopped.
2022-10-11 14:46:43.220 INFO 2098340 --- [t.axonreplay]-1] o.a.e.TrackingEventProcessor : Worker assigned to segment Segment[0/0] for processing
2022-10-11 14:46:43.221 INFO 2098340 --- [t.axonreplay]-1] o.a.e.TrackingEventProcessor : Using current Thread for last segment worker: TrackingSegmentWorker{processor=mark.test.axonreplay, segment=Segment[0/0]}
2022-10-11 14:46:43.224 INFO 2098340 --- [t.axonreplay]-1] o.a.e.TrackingEventProcessor : Fetched token: ReplayToken{currentToken=GapAwareTrackingToken{index=0, gaps=[]}, tokenAtReset=GapAwareTrackingToken{index=27, gaps=[]}} for segment: Segment[0/0]
Sounds like in the latter (single threaded) case Axon creates a new ReplayToken (it also visible in TOKEN_ENTRY db) - and it works.
But in case of parallel processing it does not create a ReplayToken, and I do not see it in TOKEN_STORE.
I have no idea how to proceed. What do I miss.
Thanks a lot.
BR
Mark