Snapshotting

Hi,

I have some questions about snapshotting:

  1. Does axonserver remove old snapshots when a new snapshot is made for an aggregate. (This in order to save disk space)

  2. The documentation taks about handling snapshot events separatly https://docs.axoniq.io/reference-guide/configuring-infrastructure-components/command-processing/optimizing-aggregate-loading
    Is this really needed or is this transparent?

I have a snapshotter like follows. Do we really need a different eventhandler for the snapshot or is the snapshot deserialized automatically?

public class CountAndTimeBasedSnapshotTriggerDefinition implements SnapshotTriggerDefinition {

    private final Snapshotter snapshotter;
    private final int eventCountThreshold;
    private final int loadTimeMilliesThreshold;

    public CountAndTimeBasedSnapshotTriggerDefinition(Snapshotter snapshotter, int eventCountThreshold, int loadTimeMilliesThreshold) {
        this.snapshotter = snapshotter;
        this.eventCountThreshold = eventCountThreshold;
        this.loadTimeMilliesThreshold = loadTimeMilliesThreshold;
    }

    @Override
    public SnapshotTrigger prepareTrigger(Class<?> aggregateType) {
        return new CountAndTimeBasedSnapshotTrigger(snapshotter, aggregateType, eventCountThreshold, loadTimeMilliesThreshold);
    }

    @Override
    public SnapshotTrigger reconfigure(Class<?> aggregateType, SnapshotTrigger trigger) {
        if (trigger instanceof CountAndTimeBasedSnapshotTrigger) {
            ((CountAndTimeBasedSnapshotTrigger) trigger).setSnapshotter(snapshotter);
            return trigger;
        }
        return new CountAndTimeBasedSnapshotTrigger(snapshotter, aggregateType, eventCountThreshold, loadTimeMilliesThreshold);
    }

    private static class CountAndTimeBasedSnapshotTrigger implements SnapshotTrigger, Serializable {

        private final Class<?> aggregateType;
        private final int eventCountThreshold;
        private final int loadTimeMilliesThreshold;

        private transient Snapshotter snapshotter;
        private int counter = 0;
        private long startTime = System.currentTimeMillis();

        public CountAndTimeBasedSnapshotTrigger(Snapshotter snapshotter, Class<?> aggregateType, int eventCountThreshold, int loadTimeMilliesThreshold) {
            this.snapshotter = snapshotter;
            this.aggregateType = aggregateType;
            this.eventCountThreshold = eventCountThreshold;
            this.loadTimeMilliesThreshold = loadTimeMilliesThreshold;
        }

        @Override
        public void eventHandled(EventMessage<?> msg) {
            if ((++counter >= eventCountThreshold || isLoadThresholdReached()) && msg instanceof DomainEventMessage) {
                if (CurrentUnitOfWork.isStarted()) {
                    CurrentUnitOfWork.get().onPrepareCommit(
                        u -> scheduleSnapshot((DomainEventMessage) msg));
                } else {
                    scheduleSnapshot((DomainEventMessage) msg);
                }
                resetCounters();
            }
        }

        protected void scheduleSnapshot(DomainEventMessage msg) {
            snapshotter.scheduleSnapshot(aggregateType, msg.getAggregateIdentifier());
            resetCounters();
        }

        @Override
        public void initializationFinished() {
        }

        private void resetCounters() {
            counter = 0;
            startTime = System.currentTimeMillis();
        }

        public void setSnapshotter(Snapshotter snapshotter) {
            this.snapshotter = snapshotter;
        }

        private boolean isLoadThresholdReached() {
            return (System.currentTimeMillis() - startTime) > loadTimeMilliesThreshold;
        }
    }

The snapshotter we are thinking of using is as follows:

public Snapshotter snapshotter(PlatformTransactionManager platformTransactionManager,
                               EventStore integrationEventStore) {
    return SpringAggregateSnapshotter.builder()
        .eventStore(integrationEventStore)
        .transactionManager(new SpringTransactionManager(platformTransactionManager, new DefaultTransactionDefinition()))
        .executor(Executors.newFixedThreadPool(10))
        .build();
}

Kr

Tom

Another question. In order to create a snapshot when the load time is too slow I am currently using this inline with eventHandled.
Is this the proper way or is it better to use the initializationFinished method for this?

Hi Tom,

  1. Does axonserver remove old snapshots when a new snapshot is made for an aggregate. (This in order to save disk space)

No, it does not at this moment. Be aware this is something which is being looked into.
As a (not to fancy, I know) workaround, you can always drop snapshot files yourselves.

  1. The documentation taks about handling snapshot events separatly

This would only be necessary if you are using a different Snapshotter implementation than the AggregateSnapshotter.

From your sample, I see you are using the SpringAggregateSnapshotter.
This basically does the same as the AggregateSnapshotter, ensuring you the entire Aggregate is “snapshotted” and de-/serialized for use to recreate the aggregate.

Another question. In order to create a snapshot when the load time is too slow I am currently using this inline with eventHandled.
Is this the proper way or is it better to use the initializationFinished method for this?

I would use the SnapshotTrigger#initializationFinished() for creating snapshots if loading takes too long here.
It best reflects when loading is finished, although the window between finishing loading and the first SnapshotTrigger#eventHandled(EventMessage<?>) is rather slim.

Hopes this helps you further Tom!

Cheers,
Steven

PS. It is important to note that this mailing list will be discontinued as specified in this thread.
The thread also specifies where to look further for help when it comes to Axon.