Hi Koen,
Perhaps I can help. I recently wrote a very rudimentary TrackingProcessorMonitor that would raise an event as soon as a tracking processor caught up.
public class TrackingProcessorMonitor {
private EventHandlingConfiguration eventHandlingConfiguration;
private EventBus eventBus;
public TrackingProcessorMonitor(EventHandlingConfiguration eventHandlingConfiguration, EventBus eventBus) {
this.eventHandlingConfiguration = eventHandlingConfiguration;
this.eventBus = eventBus;
}
@EventListener
public void init(ApplicationReadyEvent e) {
final ExecutorService executorService = Executors.newFixedThreadPool(10);
eventHandlingConfiguration.getProcessors().stream()
.filter(p -> p instanceof TrackingEventProcessor).forEach(p -> executorService.submit(new Worker((TrackingEventProcessor) p, this.eventBus)));
}
@Slf4j
private static class Worker implements Runnable {
private TrackingEventProcessor processor;
private EventBus eventBus;
Worker(TrackingEventProcessor processor, EventBus eventBus) {
this.processor = processor;
this.eventBus = eventBus;
}
@Override
public void run() {
log.debug("Monitoring tracking processor '{}' initialization...", processor.getName());
boolean caughtUp;
do {
caughtUp = this.processor.processingStatus().values().stream().allMatch(EventTrackerStatus::isCaughtUp);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
}
}
while (!caughtUp);
log.debug("Tracking processor '{}' caught-up", processor.getName());
this.eventBus.publish(GenericEventMessage.asEventMessage(new ProcessorCaughtUpEvent(this.processor.getName())));
}
}
}
The line of interest for your use-case:
eventHandlingConfiguration.getProcessors()
Sounds like a status we’d also be interested in using so please keep us posted.
Regards,
Dylan