I’d like to swap the InMemoryTokenStore out for a custom build token store based on hazelcast & still be able to use mongo as the event store.
Only difference between InMemoryTokenStore & HazelCastTokenStore will the underlying map used:
public class HazelcastTokenStore implements TokenStore{
private static final GlobalSequenceTrackingToken NULL_TOKEN = new GlobalSequenceTrackingToken(-1);
private ConcurrentMap<ProcessAndSegment, TrackingToken> tokens;
I defined custom token store for a specific event stream which I’d like to track using HazelcastToken:
@Autowired
public void configureEventHandling(EventHandlingConfiguration eventHandlingConfiguration, HazelcastInstance hazelcastInstance) {
eventHandlingConfiguration.registerTrackingProcessor("myViewEvents");
eventHandlingConfiguration.registerTokenStore("myViewEvents", c -> new HazelcastTokenStore(hazelcastInstance));
When I start spring-boot application I get the following exception:
`
java.lang.IllegalArgumentException: Token IndexTrackingToken{globalIndex=-1} is of the wrong type
at org.axonframework.common.Assert.isTrue(Assert.java:54)
at org.axonframework.mongo.eventsourcing.eventstore.AbstractMongoEventStorageStrategy.findTrackedEvents(AbstractMongoEventStorageStrategy.java:157)
at org.axonframework.mongo.eventsourcing.eventstore.MongoEventStorageEngine.fetchTrackedEvents(MongoEventStorageEngine.java:202)
at org.axonframework.eventsourcing.eventstore.BatchingEventStorageEngine.lambda$readEventData$1(BatchingEventStorageEngine.java:123)
at org.axonframework.eventsourcing.eventstore.BatchingEventStorageEngine$EventStreamSpliterator.tryAdvance(BatchingEventStorageEngine.java:161)
at java.util.stream.StreamSpliterators$WrappingSpliterator.lambda$initPartialTraversalState$0(StreamSpliterators.java:294)
at java.util.stream.StreamSpliterators$AbstractWrappingSpliterator.fillBuffer(StreamSpliterators.java:206)
at java.util.stream.StreamSpliterators$AbstractWrappingSpliterator.doAdvance(StreamSpliterators.java:161)
at java.util.stream.StreamSpliterators$WrappingSpliterator.tryAdvance(StreamSpliterators.java:300)
at java.util.Spliterators$1Adapter.hasNext(Spliterators.java:681)
at org.axonframework.eventsourcing.eventstore.EmbeddedEventStore$EventConsumer.peekPrivateStream(EmbeddedEventStore.java:380)
at org.axonframework.eventsourcing.eventstore.EmbeddedEventStore$EventConsumer.peek(EmbeddedEventStore.java:341)
at org.axonframework.eventsourcing.eventstore.EmbeddedEventStore$EventConsumer.hasNextAvailable(EmbeddedEventStore.java:318)
at org.axonframework.messaging.MessageStream.hasNextAvailable(MessageStream.java:38)
at org.axonframework.eventhandling.TrackingEventProcessor.checkSegmentCaughtUp(TrackingEventProcessor.java:294)
at org.axonframework.eventhandling.TrackingEventProcessor.processBatch(TrackingEventProcessor.java:246)
at org.axonframework.eventhandling.TrackingEventProcessor.processingLoop(TrackingEventProcessor.java:209)
at org.axonframework.eventhandling.TrackingEventProcessor$TrackingSegmentWorker.run(TrackingEventProcessor.java:620)
at org.axonframework.eventhandling.TrackingEventProcessor$WorkerLauncher.run(TrackingEventProcessor.java:715)
at org.axonframework.eventhandling.TrackingEventProcessor$CountingRunnable.run(TrackingEventProcessor.java:547)
at java.lang.Thread.run(Thread.java:745)
`
Additional info:
- Axon 3.2
- spring-boot 1.5.10
- command & queries live within same JVM
Perhaps my understanding of TokenStore is flawed, but I was hoping to define a TokenStore for a given processing group, but still be able to allow rest of processors to be subscribing processors that can consume events as-is.
Any pointers will be greatly appreciated!