TrackingSagaManager Configuration

Hi!

I’m having trouble with the tracking saga manager.
I have a reproducable problem where the execution of a saga takes about 10 seconds when I configure the tracking saga manager vs. less than half a second without using tracking saga manager.

Somehow the problematic Saga is getting started twice, but only when I use it with a tracking saga manager :confused:
At some point Axon gives me the following warning, as well:

Expected to be able to update a Saga instance, but no rows were found. Inserting instead.

I’ve already tried to use a SimpleCommandBus instead of an AsynchronousCommandBus, but this doesn’t help either – the same event is handled twice as well and the execution is still very slow.

I’m on Axon 3.07 with Spring Boot v1.4.7.RELEASE, Spring v4.3.9.RELEASE, Postgres 9.4 with default isolation level (READ_COMMITTED).
Here is my configuration class:

`

package com.my.sample.configuration;

import com.my.sample.lib.CustomCommandGateway;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.axonframework.commandhandling.AsynchronousCommandBus;
import org.axonframework.commandhandling.CommandBus;
import org.axonframework.commandhandling.gateway.CommandGatewayFactory;
import org.axonframework.common.caching.WeakReferenceCache;
import org.axonframework.common.jdbc.ConnectionProvider;
import org.axonframework.common.jdbc.UnitOfWorkAwareConnectionProviderWrapper;
import org.axonframework.config.EventHandlingConfiguration;
import org.axonframework.eventhandling.saga.ResourceInjector;
import org.axonframework.eventhandling.saga.repository.CachingSagaStore;
import org.axonframework.eventhandling.saga.repository.SagaStore;
import org.axonframework.eventhandling.saga.repository.jdbc.GenericSagaSqlSchema;
import org.axonframework.eventhandling.saga.repository.jdbc.JdbcSagaStore;
import org.axonframework.eventhandling.tokenstore.jdbc.JdbcTokenStore;
import org.axonframework.eventsourcing.AggregateSnapshotter;
import org.axonframework.eventsourcing.EventCountSnapshotTriggerDefinition;
import org.axonframework.eventsourcing.eventstore.EmbeddedEventStore;
import org.axonframework.eventsourcing.eventstore.jdbc.JdbcEventStorageEngine;
import org.axonframework.eventsourcing.eventstore.jdbc.JdbcSQLErrorCodesResolver;
import org.axonframework.messaging.Message;
import org.axonframework.messaging.interceptors.TransactionManagingInterceptor;
import org.axonframework.serialization.json.JacksonSerializer;
import org.axonframework.serialization.upcasting.event.NoOpEventUpcaster;
import org.axonframework.serialization.xml.XStreamSerializer;
import org.axonframework.spring.config.EnableAxon;
import org.axonframework.spring.eventsourcing.SpringAggregateSnapshotterFactoryBean;
import org.axonframework.spring.jdbc.SpringDataSourceConnectionProvider;
import org.axonframework.spring.messaging.unitofwork.SpringTransactionManager;
import org.axonframework.spring.saga.SpringResourceInjector;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;
import org.springframework.core.env.Environment;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;

import javax.sql.DataSource;
import java.util.Arrays;
import java.util.concurrent.Executor;

@Configuration
@EnableAxon
public class AxonConfiguration {

@Autowired
private ObjectMapper objectMapper;
@Autowired
private DataSourceTransactionManager platformTransactionManager;
@Autowired
private DataSource dataSource;

@Autowired
public void configure(EventHandlingConfiguration config, Environment env) {
if (!Arrays.asList(env.getActiveProfiles()).contains(“test”)
&& !Arrays.asList(env.getActiveProfiles()).contains(“localtest”)) {
config.usingTrackingProcessors(); // default all processors to tracking mode.
}
}

@Bean
public SpringTransactionManager axonSpringTransactionManager() {
return new SpringTransactionManager(platformTransactionManager);
}

@Bean
public CommandBus commandBus(Executor commandBusExecutor, SpringTransactionManager axonSpringTransactionManager) {
AsynchronousCommandBus commandBus = new AsynchronousCommandBus(commandBusExecutor);
TransactionManagingInterceptor<Message<?>> transactionManagingInterceptor = new TransactionManagingInterceptor<>(axonSpringTransactionManager);
commandBus.registerHandlerInterceptor(transactionManagingInterceptor);

return commandBus;
}

@Bean
public CustomCommandGateway customCommandGateway(CommandBus commandBus) {
return new CommandGatewayFactory(commandBus).createGateway(CustomCommandGateway.class);
}

@Bean
@Lazy
public ConnectionProvider springDataSourceConnectionProvider(DataSource dataSource) {
return new UnitOfWorkAwareConnectionProviderWrapper(new SpringDataSourceConnectionProvider(dataSource));
}

@Bean
@Lazy
public JdbcEventStorageEngine jdbcEventStorageEngine(ConnectionProvider springDataSourceConnectionProvider) {
return new JdbcEventStorageEngine(new JacksonSerializer(objectMapper),
NoOpEventUpcaster.INSTANCE, new JdbcSQLErrorCodesResolver(),
springDataSourceConnectionProvider, axonSpringTransactionManager());
}

@Bean
@Lazy
public JdbcTokenStore jdbcTokenStore(ConnectionProvider springDataSourceConnectionProvider) {
return new JdbcTokenStore(springDataSourceConnectionProvider, new XStreamSerializer());
}

@Bean
@Lazy
public EmbeddedEventStore eventBus(JdbcEventStorageEngine eventStorageEngine) {
return new EmbeddedEventStore(eventStorageEngine);
}

@Bean
public SpringAggregateSnapshotterFactoryBean springAggregateSnapshotterFactoryBean() {
return new SpringAggregateSnapshotterFactoryBean();
}

@Bean
public EventCountSnapshotTriggerDefinition eventCountSnapshotterTrigger(AggregateSnapshotter snapshotter) {
return new EventCountSnapshotTriggerDefinition(snapshotter, 100);
}

@Bean
public SagaStore sagaRepository(ConnectionProvider springDataSourceConnectionProvider) {
JdbcSagaStore jdbcStore = new JdbcSagaStore(springDataSourceConnectionProvider, new GenericSagaSqlSchema());
return new CachingSagaStore<>(jdbcStore, new WeakReferenceCache(), new WeakReferenceCache());
}

@Bean
public ResourceInjector resourceInjector() {
return new SpringResourceInjector();
}

//====================================
//====================================
// Individual Saga Configuration
//====================================
//====================================
@Bean
public SagaConfiguration UpdateCalendarSagaConfiguration() {
return SagaConfiguration.trackingSagaManager(UpdateCalendarSaga.class);
}
@Bean
public SagaConfiguration PrepareRemovalSagaConfiguration() {
return SagaConfiguration.trackingSagaManager(PrepareRemovalSaga.class);
}

}

`

When I remove the individual saga configurations, everything works fast and the event is handled by the saga only once.

Thanks in advance for your help,

Andreas

Hi Andreas,

Did you also annotate your Saga with ‘@Saga’?
If that’s the case, then in your current set up Axon will automatically create the SagaStore and SagaManager for you.

Now I see you’re also providing your own SagaStore for Object, so maybe that bean is interfering with the automatic ones which are created through the SagaConfiguration.trackingSagaManager().
Have you tried what happens if you remove the sagaRepository() bean you’re creating?
Additionally I see that the exception you’re sharing is thrown from the JpaSagaStore and JdbcSagaStore implementations, which maybe supports my hunch.

Lastly, I see you’re using the @EnableAxon annotation.
This annotation is used for non-Spring Boot application to wire the required Axon components, but as you’re in a Spring Boot environment I thus guess you’re also using the axon-spring-boot-starter module. If that’s the case, you should be able to drop the @EnableAxon annotation from your Config.

Hope this helps!

Cheers,

Steven

Thanks Steven, but unfortunately the events are still handled twice if I remove the sagaRepository and/or remove the EnableAxon annotation :confused:

I found out that the same Thread that dispatches the command first handles the saga (when the DefaultUnitWork.executeWithResult invokes commit():wink: and right after an other Thread (SagaProcessor-0) handles it again.

If the same Thread handles the Saga, shouldn’t it update the tokenstore right away to prevent the other Saga processor from handling it again?

Cheers,
Andreas

Hi Andreas,

I thing the problem is with the SagaConfiguration bean that you’re creating. The bean has a first letter as a capital. Axon currently has quite strict naming requirements for customized Saga Configuration if you use @Saga on it. It is the Saga’s class name, followed by “Configuration”, and having a lower case first character.

If you use Spring Boot, I strongly recommend removing the @EnableAxon annotation as well as any beans that just define the defaults. Looking at your configuration, these are: “resourceInjector”, “eventBus” and “axonSpringTransactionManager”.

Hope this helps.

Allard

Hi Allard!

Thank you so much, changing the names of the beans solved my problem!

Cheers,
Andreas