Custom TrackingStore (HazelCastInMemory) against mongo event store

I’d like to swap the InMemoryTokenStore out for a custom build token store based on hazelcast & still be able to use mongo as the event store.

Only difference between InMemoryTokenStore & HazelCastTokenStore will the underlying map used:

public class HazelcastTokenStore implements TokenStore{

   private static final GlobalSequenceTrackingToken NULL_TOKEN = new GlobalSequenceTrackingToken(-1);

   private ConcurrentMap<ProcessAndSegment, TrackingToken> tokens;

I defined custom token store for a specific event stream which I’d like to track using HazelcastToken:

@Autowired
public void configureEventHandling(EventHandlingConfiguration eventHandlingConfiguration, HazelcastInstance hazelcastInstance) {
   eventHandlingConfiguration.registerTrackingProcessor("myViewEvents");
   eventHandlingConfiguration.registerTokenStore("myViewEvents", c -> new HazelcastTokenStore(hazelcastInstance));

When I start spring-boot application I get the following exception:

`

java.lang.IllegalArgumentException: Token IndexTrackingToken{globalIndex=-1} is of the wrong type
at org.axonframework.common.Assert.isTrue(Assert.java:54)
at org.axonframework.mongo.eventsourcing.eventstore.AbstractMongoEventStorageStrategy.findTrackedEvents(AbstractMongoEventStorageStrategy.java:157)
at org.axonframework.mongo.eventsourcing.eventstore.MongoEventStorageEngine.fetchTrackedEvents(MongoEventStorageEngine.java:202)
at org.axonframework.eventsourcing.eventstore.BatchingEventStorageEngine.lambda$readEventData$1(BatchingEventStorageEngine.java:123)
at org.axonframework.eventsourcing.eventstore.BatchingEventStorageEngine$EventStreamSpliterator.tryAdvance(BatchingEventStorageEngine.java:161)
at java.util.stream.StreamSpliterators$WrappingSpliterator.lambda$initPartialTraversalState$0(StreamSpliterators.java:294)
at java.util.stream.StreamSpliterators$AbstractWrappingSpliterator.fillBuffer(StreamSpliterators.java:206)
at java.util.stream.StreamSpliterators$AbstractWrappingSpliterator.doAdvance(StreamSpliterators.java:161)
at java.util.stream.StreamSpliterators$WrappingSpliterator.tryAdvance(StreamSpliterators.java:300)
at java.util.Spliterators$1Adapter.hasNext(Spliterators.java:681)
at org.axonframework.eventsourcing.eventstore.EmbeddedEventStore$EventConsumer.peekPrivateStream(EmbeddedEventStore.java:380)
at org.axonframework.eventsourcing.eventstore.EmbeddedEventStore$EventConsumer.peek(EmbeddedEventStore.java:341)
at org.axonframework.eventsourcing.eventstore.EmbeddedEventStore$EventConsumer.hasNextAvailable(EmbeddedEventStore.java:318)
at org.axonframework.messaging.MessageStream.hasNextAvailable(MessageStream.java:38)
at org.axonframework.eventhandling.TrackingEventProcessor.checkSegmentCaughtUp(TrackingEventProcessor.java:294)
at org.axonframework.eventhandling.TrackingEventProcessor.processBatch(TrackingEventProcessor.java:246)
at org.axonframework.eventhandling.TrackingEventProcessor.processingLoop(TrackingEventProcessor.java:209)
at org.axonframework.eventhandling.TrackingEventProcessor$TrackingSegmentWorker.run(TrackingEventProcessor.java:620)
at org.axonframework.eventhandling.TrackingEventProcessor$WorkerLauncher.run(TrackingEventProcessor.java:715)
at org.axonframework.eventhandling.TrackingEventProcessor$CountingRunnable.run(TrackingEventProcessor.java:547)
at java.lang.Thread.run(Thread.java:745)

`

Additional info:

  • Axon 3.2
  • spring-boot 1.5.10
  • command & queries live within same JVM
    Perhaps my understanding of TokenStore is flawed, but I was hoping to define a TokenStore for a given processing group, but still be able to allow rest of processors to be subscribing processors that can consume events as-is.

Any pointers will be greatly appreciated!

Hi Dylan,

The token store registers the state for tracking event processors. The type of token stored in the tokenstore therefore depends on the event store type. In your case, you could store the token in a Hazelcast tokenstore without any problem, but the type of the Token must be a MongoTrackingToken.

You can check if the fetchToken operation in your HazelCastTokenStore returns null if no token available.

Marc

Thank you Marc for you quick concise response. It now does make sense that a MongoTrackingToken should be stored inside token store.

A follow-up question: I need to serialize/deserialize MongoTrackingToken as binary (HazelCast requirement) As it stands java serialization won’t suffice as MongoTrackingToken does not implement Serializable. Went down the path of trying to use XStream/Jackson but it seems MongoTracking token does not have a no-args constructor that’s a prerequisite for most serialization libraries out there. Also not sure if the factory method MongoTrackingToken.of(timestamp,eventIdentifier) is actually going to be able to deserialize the token. What would you suggest would be the way of achieving serialization/deserialization so that HazelCast & Axon requirements are met?

Hi Dylan,

XStream does a pretty good job in avoiding the need for default constructors. You should be able to serialize the token with it.
That said, I think the token should actually implement Serializable. That’s something we should fix in the framework.

Cheers,

Allard