Implementing Event Snapshots in Axon 3

Hello,

I am new to the Axon. I tried implementing Snapshots by adding the following to my configuration file

@Bean
public SpringAggregateSnapshotter snapshotter(ParameterResolverFactory parameterResolverFactory, EventStore eventStore, TransactionManager transactionManager) {
Executor executor = Executors.newSingleThreadExecutor(); //Or any other executor of course
return new SpringAggregateSnapshotter(eventStore, parameterResolverFactory, executor, transactionManager);
}

@Bean
public SnapshotTriggerDefinition snapshotTriggerDefinition(Snapshotter snapshotter) throws Exception {
return new EventCountSnapshotTriggerDefinition(snapshotter, 3);
}

Is this all I have to do? Are there more configurations to be added? I expected to see entries in the table snapshot_event_entry after doing a couple of inserts, but still the table is empty.

I found a post with these other configurations

`

@Bean
public Repository orderBookRepository(EventStore eventStore, SnapshotTriggerDefinition snapshotTriggerDefinition, Cache cache) {
return new CachingEventSourcingRepository<>(new GenericAggregateFactory<>(Sale.class), eventStore, cache, snapshotTriggerDefinition);
}

@Bean
public Repository individualRepository(EventStore eventStore, SnapshotTriggerDefinition snapshotTriggerDefinition, Cache cache, ParameterResolverFactory parameterResolverFactory) {
return new CachingEventSourcingRepository<>(new GenericAggregateFactory<>(Sale.class), eventStore, new PessimisticLockFactory(), cache, parameterResolverFactory, snapshotTriggerDefinition);
// }

`

But when I add these, I get the error: Parameter 2 of method orderBookRepository in config.SaleConfig required a bean of type ‘org.axonframework.common.caching.Cache’ that could not be found.

What am I doing wrong please? Or what am I not doing?

Thanks in advance

Hi Diane,
you need to define a Cache implementation, like this:

@Bean public Cache cache() { return new WeakReferenceCache(); }

or you can use a EventSourcingRepository instead of a CachingEventSourcingRepository.

Hello Simone,
Thanks for replying. I added the Cache bean, but still nothing gets written to the snapshot_event_entry table

My Config file looks like this at the moment:

`

package config;

import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import org.axonframework.common.transaction.TransactionManager;
import org.axonframework.common.caching.Cache;
import org.axonframework.eventsourcing.AggregateFactory;
import org.axonframework.eventsourcing.CachingEventSourcingRepository;
import org.axonframework.eventsourcing.EventCountSnapshotTriggerDefinition;
import org.axonframework.eventsourcing.Snapshotter;
import org.axonframework.eventsourcing.eventstore.EventStore;
import org.axonframework.spring.eventsourcing.SpringPrototypeAggregateFactory;
import org.softech.flexbet.aggregates.Sale;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.ExchangeBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;
import org.axonframework.commandhandling.model.Repository;
import org.axonframework.common.Registration;
import org.axonframework.common.caching.WeakReferenceCache;
import org.axonframework.common.lock.PessimisticLockFactory;
import org.axonframework.eventsourcing.GenericAggregateFactory;
import org.axonframework.eventsourcing.SnapshotTriggerDefinition;
import org.axonframework.messaging.annotation.ParameterResolverFactory;
import org.axonframework.spring.eventsourcing.SpringAggregateSnapshotter;
import org.springframework.cache.annotation.EnableCaching;

@Configuration
public class SaleConfig {

@Autowired
public void configure(AmqpAdmin admin) {
admin.declareExchange(exchange());
admin.declareQueue(queue());
admin.declareBinding(binding());
}

@Bean
public Exchange exchange() {
return ExchangeBuilder.fanoutExchange(“SaleEvents”).build();
}

@Bean
public Queue queue() {
return QueueBuilder.durable(“SaleEvents”).build();
}

@Bean
public Binding binding() {
return BindingBuilder.bind(queue()).to(exchange()).with("*").noargs();
}

@Bean
public AggregateFactory saleAggregateFactory() {
SpringPrototypeAggregateFactory aggregateFactory = new SpringPrototypeAggregateFactory<>();
aggregateFactory.setPrototypeBeanName(“sale”);

return aggregateFactory;
}

@Bean
public Cache cache(){
return new WeakReferenceCache();
}

@Bean
public SpringAggregateSnapshotter snapshotter(ParameterResolverFactory parameterResolverFactory, EventStore eventStore, TransactionManager transactionManager) {
Executor executor = Executors.newSingleThreadExecutor(); //Or any other executor of course
return new SpringAggregateSnapshotter(eventStore, parameterResolverFactory, executor, transactionManager);
}

@Bean
public SnapshotTriggerDefinition snapshotTriggerDefinition(Snapshotter snapshotter) throws Exception {
return new EventCountSnapshotTriggerDefinition(snapshotter, 3);
}

@Bean
public Repository saleRepository(EventStore eventStore, SnapshotTriggerDefinition snapshotTriggerDefinition, Cache cache) {
return new CachingEventSourcingRepository<>(new GenericAggregateFactory<>(Sale.class), eventStore, cache, snapshotTriggerDefinition);
}

}

`

Hi Diane,

the problem is in your bean name. Axon isn’t currently using your defined “Repository”, but an autoconfigured one.

public Repository individualRepository()

should be:

public Repository saleRepository()

Alternatively, configure the bean name to use in the @Aggregate annotation (it defaults to [aggregate class name] + “Repository”).

Cheers,

Allard

Hello Allard,

I made the correction you pointed out. Now i get the error:

`

Are you importing

import org.axonframework.commandhandling.model.Repository; ???

Hello Frank

Yes I am doing the import: import org.axonframework.commandhandling.model.Repository;

As shown in my configuration class code above.

However, My SaleRepository class imports:

import org.springframework.stereotype.Repository;

Hi Diane,
In that case you have two beans with the same name.

If you rename the axon repository to, for instance, saleAggregateRepository, you can annotate the Sale class with:
@Aggregate(repository=“saleAggregateRepository”)

Hey Frank, thanks for the help so far.

I renamed by repository. I still have my main issue though, which is implementing event snapshots. When I execute commands, I can see the events being registered in the domain_event_entry table.

But still no entry on snapshot_event_entry table.

Which repository did you rename? If it was the Axon Repository, did you define the name of the repository in the @Aggregate annotation?

Allard

`

`
Hey Allard, yes I did.

My repository currently looks like this:

`

package org.softech.flexbet.query;

import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;

@Repository
public interface SaleAggregrateRepository extends JpaRepository<SaleEntry, String> {

}

`

The Aggregate:

`

package org.softech.flexbet.aggregates;

import org.axonframework.commandhandling.CommandHandler;
import org.axonframework.commandhandling.model.AggregateIdentifier;
import static org.axonframework.commandhandling.model.AggregateLifecycle.apply;
import org.axonframework.eventsourcing.EventSourcingHandler;
import org.axonframework.spring.stereotype.Aggregate;

@Aggregate(repository=“SaleAggregateRepository”)
public class Sale {

@AggregateIdentifier
String sale_id;

public Sale() {
}

@CommandHandler
public Sale(MakeSaleCommand command) {
apply(new SaleMadeEvent(command.getId(), command.getAgent_id(), command.getProduct_id(),
command.getProduct_price(), command.getSale_date_time()));
}

@CommandHandler
public void update(UpdateSaleCommand updateSaleCommand) {
apply (new UpdateSaleEvent(sale_id, updateSaleCommand.getProduct_price()));
}

/*

  • Will be invoked by apply method or during replay
    */
    @EventSourcingHandler
    public void on(SaleMadeEvent event) {
    this.sale_id = event.getId();
    }

}

`

The SaleAPI:

`

package org.softech.flexbet.query;

import org.axonframework.commandhandling.gateway.CommandGateway;
import org.softech.flexbet.aggregates.MakeSaleCommand;
import org.springframework.web.bind.annotation.*;

import java.sql.Timestamp;
import java.util.List;
import java.util.Map;
import java.util.UUID;

@RestController
public class SaleAPI {

private final SaleAggregrateRepository repository;
private final CommandGateway commandGateway;

// @Autowired
public SaleAPI(SaleAggregrateRepository repository, CommandGateway commandGateway){
this.repository = repository;
this.commandGateway = commandGateway;
}

@PostMapping
public void makeSale(@RequestBody Map<String,String> request){
String id = UUID.randomUUID().toString();
commandGateway.send(new MakeSaleCommand(id, request.get(“agent_id”), request.get(“product_id”),
Double.parseDouble(request.get(“product_price”)), Timestamp.valueOf(request.get(“sale_date_time”))));
}

@GetMapping
public List findAll(){
return repository.findAll();
}

@GetMapping("/{id}")
public SaleEntry findOne(@PathVariable(value = “id”,required = true)String id){
return repository.findOne(id);
}

}

`

ConfigurationClass:

`

`

package config;

import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import org.axonframework.common.transaction.TransactionManager;
import org.axonframework.common.caching.Cache;
import org.axonframework.eventsourcing.AggregateFactory;
import org.axonframework.eventsourcing.CachingEventSourcingRepository;
import org.axonframework.eventsourcing.EventCountSnapshotTriggerDefinition;
import org.axonframework.eventsourcing.Snapshotter;
import org.axonframework.eventsourcing.eventstore.EventStore;
import org.axonframework.spring.eventsourcing.SpringPrototypeAggregateFactory;
import org.softech.flexbet.aggregates.Sale;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.ExchangeBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.axonframework.commandhandling.model.Repository;
import org.axonframework.common.caching.WeakReferenceCache;
import org.axonframework.common.lock.PessimisticLockFactory;
import org.axonframework.eventsourcing.GenericAggregateFactory;
import org.axonframework.eventsourcing.SnapshotTriggerDefinition;
import org.axonframework.messaging.annotation.ParameterResolverFactory;
import org.axonframework.spring.eventsourcing.SpringAggregateSnapshotter;
import org.springframework.cache.annotation.EnableCaching;

@Configuration
@EnableCaching
public class SaleConfig {

@Autowired
public void configure(AmqpAdmin admin) {
admin.declareExchange(exchange());
admin.declareQueue(queue());
admin.declareBinding(binding());
}

@Bean
public Exchange exchange() {
return ExchangeBuilder.fanoutExchange(“SaleEvents”).build();
}

@Bean
public Queue queue() {
return QueueBuilder.durable(“SaleEvents”).build();
}

@Bean
public Binding binding() {
return BindingBuilder.bind(queue()).to(exchange()).with("*").noargs();
}

@Bean
public AggregateFactory saleAggregateFactory() {
SpringPrototypeAggregateFactory aggregateFactory = new SpringPrototypeAggregateFactory<>();
aggregateFactory.setPrototypeBeanName(“sale”);

return aggregateFactory;
}

@Bean
public Cache cache(){
return new WeakReferenceCache();
}

@Bean
public SpringAggregateSnapshotter snapshotter(ParameterResolverFactory parameterResolverFactory, EventStore eventStore, TransactionManager transactionManager) {
Executor executor = Executors.newSingleThreadExecutor(); //Or any other executor of course
return new SpringAggregateSnapshotter(eventStore, parameterResolverFactory, executor, transactionManager);
}

@Bean
public SnapshotTriggerDefinition snapshotTriggerDefinition(Snapshotter snapshotter) throws Exception {
return new EventCountSnapshotTriggerDefinition(snapshotter, 3);
}

@Bean
public Repository SaleAggregateRepository(EventStore eventStore, SnapshotTriggerDefinition snapshotTriggerDefinition, Cache cache, ParameterResolverFactory parameterResolverFactory) {
return new CachingEventSourcingRepository<>(new GenericAggregateFactory<>(Sale.class), eventStore, new PessimisticLockFactory(), cache, parameterResolverFactory, snapshotTriggerDefinition);
}

}

`

`

You’ve renamed the wrong repository. The one that extends JpaRepository is a Spring Data Jpa Repository, which you probably use for the query model.

Also note that the default bean name for a class named “MyRepository” is “myRepository”, with a small “m”. In JavaConfig, the bean name corresponds to the method name that defines the bean.

Oh, ok.

But I’m sorry, I really don’t know which repository I’m to rename. I’m new to Axon. Please where do I locate the repository?

Originally, you had:

@Bean
public Repository orderBookRepository(EventStore eventStore, SnapshotTriggerDefinition snapshotTriggerDefinition, Cache cache) {
return new CachingEventSourcingRepository<>(new GenericAggregateFactory<>(Sale.class), eventStore, cache, snapshotTriggerDefinition);
}

@Bean
public Repository individualRepository(EventStore eventStore, SnapshotTriggerDefinition snapshotTriggerDefinition, Cache cache, ParameterResolverFactory parameterResolverFactory) {
return new CachingEventSourcingRepository<>(new GenericAggregateFactory<>(Sale.class), eventStore, new PessimisticLockFactory(), cache, parameterResolverFactory, snapshotTriggerDefinition);
// }

You want to change that to:

@Bean
public Repository saleAggregateRepository(EventStore eventStore, SnapshotTriggerDefinition snapshotTriggerDefinition, Cache cache, ParameterResolverFactory parameterResolverFactory) {
return new CachingEventSourcingRepository<>(new GenericAggregateFactory<>(Sale.class), eventStore, new PessimisticLockFactory(), cache, parameterResolverFactory, snapshotTriggerDefinition);
// }

or, if you don’t need/use caching, to:
@Bean
public Repository saleAggregateRepository(EventStore eventStore, SnapshotTriggerDefinition snapshotTriggerDefinition, ParameterResolverFactory parameterResolverFactory) {
return new EventSourcingRepository<>(new GenericAggregateFactory<>(Sale.class), eventStore, parameterResolverFactory, snapshotTriggerDefinition);
// }

On your aggregate, put the @Aggregate(repository=“saleAggregateRepository”) annotation to tell Axon to use one of your existing repositories instead of one itself.

Cheers,

Allard

Thanks for the help, I finally got Snapshots working

Best Regards

Hi Diane,

What is your event store(SQL, NoSQL), because I am using MongoDB but snapshot are not stored in the table.

Thanks,
Brahma

Hello Brahma,

I use a MySQL database for my event store.

But I however ran the axon-trader application which uses MongoDB and it worked fine.

Maybe you could post your configuration here so we try to identify if there may be something you’re not doing.

Cheers!

Thanks Diane for quick reply,

I just want to understand that what is the use of yours below code, Is it relevant for snapshot creation.

Problem is snapshot is not created in snapshot table, but events are gets stored in the table. You can post your working code.

@Autowired
public void configure(AmqpAdmin admin) {
admin.declareExchange(exchange());
admin.declareQueue(queue());
admin.declareBinding(binding());
}

@Bean
public Exchange exchange() {
return ExchangeBuilder.fanoutExchange(“SaleEvents”).build();
}

@Bean
public Queue queue() {
return QueueBuilder.durable(“SaleEvents”).build();
}

@Bean
public Binding binding() {
return BindingBuilder.bind(queue()).to(exchange()).with("*").noargs();
}

@Bean
@ConditionalOnSingleCandidate(ConnectionFactory.class)
@ConditionalOnProperty(prefix = “spring.rabbitmq”, name = “dynamic”, matchIfMissing = true)
@ConditionalOnMissingBean(AmqpAdmin.class)
public AmqpAdmin amqpAdmin(ConnectionFactory connectionFactory) {
return new RabbitAdmin(connectionFactory);
}

Below is my configuration file,

Thanks Diane for quick reply,

I just want to understand that what is the use of yours below code, Is it relevant for snapshot creation.

Problem is snapshot is not created in snapshot table, but events are gets stored in the table. You can post your working code.

@Autowired
public void configure(AmqpAdmin admin) {
admin.declareExchange(exchange());
admin.declareQueue(queue());
admin.declareBinding(binding());
}

@Bean
public Exchange exchange() {
return ExchangeBuilder.fanoutExchange(“SaleEvents”).build();
}

@Bean
public Queue queue() {
return QueueBuilder.durable(“SaleEvents”).build();
}

@Bean
public Binding binding() {
return BindingBuilder.bind(queue()).to(exchange()).with(“*”).noargs();
}

@Bean
@ConditionalOnSingleCandidate(ConnectionFactory.class)
@ConditionalOnProperty(prefix = “spring.rabbitmq”, name = “dynamic”, matchIfMissing = true)
@ConditionalOnMissingBean(AmqpAdmin.class)
public AmqpAdmin amqpAdmin(ConnectionFactory connectionFactory) {
return new RabbitAdmin(connectionFactory);
}

The above configuration is to configure event distribution with RabbitMQ so events can be sent to external applications. It has nothing to do with snapshot creation.

The latest Axon release has significantly reduced the amount of configurations to do to enable snapshotting:

@Bean
public SpringAggregateSnapshotterFactoryBean snapshotterFactoryBean() {
    return new SpringAggregateSnapshotterFactoryBean();
}

@Bean("saleRepository")
public EventSourcingRepository<Sale> saleRepository(EventStore eventStore, Snapshotter snapshotter) {
    return new EventSourcingRepository<>(
            Sale.class,
            eventStore,
            new EventCountSnapshotTriggerDefinition(snapshotter, 100));
}

100 is the number of events that triggers the snapshotter in this case.
This is all you need to configure a snapshot.

Hi Harvey,

Thanks for reply,

The problem is every event that stored having same sequence no. (zero for each event) its not gets incremented.
For that the following if condition gets failed and snapshot storing is not executed.

AbstractSnapshotter