Hi,
I have some questions about snapshotting:
-
Does axonserver remove old snapshots when a new snapshot is made for an aggregate. (This in order to save disk space)
-
The documentation taks about handling snapshot events separatly https://docs.axoniq.io/reference-guide/configuring-infrastructure-components/command-processing/optimizing-aggregate-loading
Is this really needed or is this transparent?
I have a snapshotter like follows. Do we really need a different eventhandler for the snapshot or is the snapshot deserialized automatically?
public class CountAndTimeBasedSnapshotTriggerDefinition implements SnapshotTriggerDefinition {
private final Snapshotter snapshotter;
private final int eventCountThreshold;
private final int loadTimeMilliesThreshold;
public CountAndTimeBasedSnapshotTriggerDefinition(Snapshotter snapshotter, int eventCountThreshold, int loadTimeMilliesThreshold) {
this.snapshotter = snapshotter;
this.eventCountThreshold = eventCountThreshold;
this.loadTimeMilliesThreshold = loadTimeMilliesThreshold;
}
@Override
public SnapshotTrigger prepareTrigger(Class<?> aggregateType) {
return new CountAndTimeBasedSnapshotTrigger(snapshotter, aggregateType, eventCountThreshold, loadTimeMilliesThreshold);
}
@Override
public SnapshotTrigger reconfigure(Class<?> aggregateType, SnapshotTrigger trigger) {
if (trigger instanceof CountAndTimeBasedSnapshotTrigger) {
((CountAndTimeBasedSnapshotTrigger) trigger).setSnapshotter(snapshotter);
return trigger;
}
return new CountAndTimeBasedSnapshotTrigger(snapshotter, aggregateType, eventCountThreshold, loadTimeMilliesThreshold);
}
private static class CountAndTimeBasedSnapshotTrigger implements SnapshotTrigger, Serializable {
private final Class<?> aggregateType;
private final int eventCountThreshold;
private final int loadTimeMilliesThreshold;
private transient Snapshotter snapshotter;
private int counter = 0;
private long startTime = System.currentTimeMillis();
public CountAndTimeBasedSnapshotTrigger(Snapshotter snapshotter, Class<?> aggregateType, int eventCountThreshold, int loadTimeMilliesThreshold) {
this.snapshotter = snapshotter;
this.aggregateType = aggregateType;
this.eventCountThreshold = eventCountThreshold;
this.loadTimeMilliesThreshold = loadTimeMilliesThreshold;
}
@Override
public void eventHandled(EventMessage<?> msg) {
if ((++counter >= eventCountThreshold || isLoadThresholdReached()) && msg instanceof DomainEventMessage) {
if (CurrentUnitOfWork.isStarted()) {
CurrentUnitOfWork.get().onPrepareCommit(
u -> scheduleSnapshot((DomainEventMessage) msg));
} else {
scheduleSnapshot((DomainEventMessage) msg);
}
resetCounters();
}
}
protected void scheduleSnapshot(DomainEventMessage msg) {
snapshotter.scheduleSnapshot(aggregateType, msg.getAggregateIdentifier());
resetCounters();
}
@Override
public void initializationFinished() {
}
private void resetCounters() {
counter = 0;
startTime = System.currentTimeMillis();
}
public void setSnapshotter(Snapshotter snapshotter) {
this.snapshotter = snapshotter;
}
private boolean isLoadThresholdReached() {
return (System.currentTimeMillis() - startTime) > loadTimeMilliesThreshold;
}
}
The snapshotter we are thinking of using is as follows:
public Snapshotter snapshotter(PlatformTransactionManager platformTransactionManager,
EventStore integrationEventStore) {
return SpringAggregateSnapshotter.builder()
.eventStore(integrationEventStore)
.transactionManager(new SpringTransactionManager(platformTransactionManager, new DefaultTransactionDefinition()))
.executor(Executors.newFixedThreadPool(10))
.build();
}
Kr
Tom