Catch exception thrown by UnitOfWork

Hi,
I have a problem with the handling of an exception thrown during a call to UnitOfWork.attachTransaction.

Context : I test the resilience of my application. I simulate a significant latency between my application and an Oracle database with a toxiproxy.
Here the error stack I get :

01-03-2022 13:38:51 WARN o.h.e.jdbc.spi.SqlExceptionHelper - SQL Error: 0, SQLState: null 01-03-2022 13:38:51 ERROR o.h.e.jdbc.spi.SqlExceptionHelper - na.na.na.fwk.HUP_SPRING_HIKARI_POOL - Connection is not available, request timed out after 1000ms. Exception in thread "pool-5-thread-18" org.springframework.transaction.CannotCreateTransactionException: Could not open JPA EntityManager for transaction; nested exception is org.hibernate.exception.JDBCConnectionException: Unable to acquire JDBC Connection at org.springframework.orm.jpa.JpaTransactionManager.doBegin(JpaTransactionManager.java:467) at org.springframework.transaction.support.AbstractPlatformTransactionManager.startTransaction(AbstractPlatformTransactionManager.java:400) at org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:373) at org.axonframework.spring.messaging.unitofwork.SpringTransactionManager.startTransaction(SpringTransactionManager.java:59) at org.axonframework.messaging.unitofwork.UnitOfWork.attachTransaction(UnitOfWork.java:272) at org.axonframework.commandhandling.SimpleCommandBus.handle(SimpleCommandBus.java:173) at org.axonframework.commandhandling.AsynchronousCommandBus.lambda$handle$0(AsynchronousCommandBus.java:90) at com.xxx.infrastructure.framework.ThreadingConfiguration$MultiThreadPoolTaskExecutorStrategy.lambda$new$0(ThreadingConfiguration.java:52) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834) Caused by: org.hibernate.exception.JDBCConnectionException: Unable to acquire JDBC Connection at org.hibernate.exception.internal.SQLExceptionTypeDelegate.convert(SQLExceptionTypeDelegate.java:48) at org.hibernate.exception.internal.StandardSQLExceptionConverter.convert(StandardSQLExceptionConverter.java:42) at org.hibernate.engine.jdbc.spi.SqlExceptionHelper.convert(SqlExceptionHelper.java:113) at org.hibernate.engine.jdbc.spi.SqlExceptionHelper.convert(SqlExceptionHelper.java:99) at org.hibernate.resource.jdbc.internal.LogicalConnectionManagedImpl.acquireConnectionIfNeeded(LogicalConnectionManagedImpl.java:111) at org.hibernate.resource.jdbc.internal.LogicalConnectionManagedImpl.getPhysicalConnection(LogicalConnectionManagedImpl.java:138) at org.hibernate.resource.jdbc.internal.LogicalConnectionManagedImpl.getConnectionForTransactionManagement(LogicalConnectionManagedImpl.java:276) at org.hibernate.resource.jdbc.internal.LogicalConnectionManagedImpl.begin(LogicalConnectionManagedImpl.java:284) at org.hibernate.resource.transaction.backend.jdbc.internal.JdbcResourceLocalTransactionCoordinatorImpl$TransactionDriverControlImpl.begin(JdbcResourceLocalTransactionCoordinatorImpl.java:246) at org.hibernate.engine.transaction.internal.TransactionImpl.begin(TransactionImpl.java:83) at org.springframework.orm.jpa.vendor.HibernateJpaDialect.beginTransaction(HibernateJpaDialect.java:164) at org.springframework.orm.jpa.JpaTransactionManager.doBegin(JpaTransactionManager.java:421) ... 10 more Caused by: java.sql.SQLTransientConnectionException: na.na.na.fwk.HUP_SPRING_HIKARI_POOL - Connection is not available, request timed out after 1000ms. at com.zaxxer.hikari.pool.HikariPool.createTimeoutException(HikariPool.java:696) at com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:197) at com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:162) at com.zaxxer.hikari.HikariDataSource.getConnection(HikariDataSource.java:100) at org.hibernate.engine.jdbc.connections.internal.DatasourceConnectionProviderImpl.getConnection(DatasourceConnectionProviderImpl.java:122) at org.hibernate.internal.NonContextualJdbcConnectionAccess.obtainConnection(NonContextualJdbcConnectionAccess.java:38) at org.hibernate.resource.jdbc.internal.LogicalConnectionManagedImpl.acquireConnectionIfNeeded(LogicalConnectionManagedImpl.java:108) ... 17 more

I can’t catch this error to deal with it properly… I have a some listeners defined on my AxonConfiguration, but none are called when this exception occurs

My custom AxonConfiguration :

import com.fasterxml.jackson.databind.ObjectMapper;
import com.thoughtworks.xstream.XStream;
import com.thoughtworks.xstream.security.AnyTypePermission;
import com.xxx.events.ImmutableTechnicalError;
import com.xxx.events.PersistedEvent;
import com.xxx.framework.axon.RedisSagaStore;
import com.xxx.framework.deadline.SupervisedDeadlineManager;
import com.xxx.framework.exceptions.HupRuntimeException;
import com.xxx.framework.json.JsonObjectSerializable;
import com.xxx.framework.logs.ColorizedLogs;
import comxxx.services.logs.LogGenerator;
import com.xxx.deckard.bean.XlLevels;
import com.xxx.logdorak.Logdorak;
import com.xxx.sumon.Sumon;
import org.axonframework.commandhandling.AsynchronousCommandBus;
import org.axonframework.commandhandling.CommandBus;
import org.axonframework.commandhandling.gateway.CommandGateway;
import org.axonframework.commandhandling.gateway.DefaultCommandGateway;
import org.axonframework.common.jpa.EntityManagerProvider;
import org.axonframework.common.transaction.TransactionManager;
import org.axonframework.config.ConfigurationScopeAwareProvider;
import org.axonframework.config.Configurer;
import org.axonframework.config.EventProcessingConfigurer;
import org.axonframework.deadline.DeadlineManager;
import org.axonframework.deadline.quartz.QuartzDeadlineManager;
import org.axonframework.eventhandling.EventBus;
import org.axonframework.eventhandling.ListenerInvocationErrorHandler;
import org.axonframework.eventhandling.gateway.EventGateway;
import org.axonframework.eventsourcing.FilteringEventStorageEngine;
import org.axonframework.eventsourcing.eventstore.EventStorageEngine;
import org.axonframework.messaging.Message;
import org.axonframework.messaging.MessageDispatchInterceptor;
import org.axonframework.messaging.MessageHandlerInterceptor;
import org.axonframework.messaging.MetaData;
import org.axonframework.messaging.correlation.CorrelationDataProvider;
import org.axonframework.messaging.correlation.SimpleCorrelationDataProvider;
import org.axonframework.messaging.interceptors.CorrelationDataInterceptor;
import org.axonframework.modelling.saga.repository.SagaStore;
import org.axonframework.modelling.saga.repository.jpa.JpaSagaStore;
import org.axonframework.queryhandling.*;
import org.axonframework.serialization.FixedValueRevisionResolver;
import org.axonframework.serialization.RevisionResolver;
import org.axonframework.serialization.Serializer;
import org.axonframework.spring.messaging.unitofwork.SpringTransactionManager;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.autoconfigure.quartz.QuartzProperties;
import org.springframework.boot.autoconfigure.quartz.SchedulerFactoryBeanCustomizer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.data.redis.core.RedisTemplate;

import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.function.Function;
import java.util.function.Supplier;

@Configuration
public class AxonConfiguration {

    private static final String MDC_MAP = "MDC_MAP";
    private static final Logger LOGGER = LoggerFactory.getLogger(AxonConfiguration.class);
    @Value("${application.version}")
    String version;

    @Value("${redis.retention.axon.saga.stores:PT80H}")
    String sagaStoresTtl;

    @Autowired
    EventGateway eventGateway;
    
    @Bean
    @Primary
    public XStream xStream() {
        var xstream = new XStream();
        xstream.addPermission(AnyTypePermission.ANY);
        return xstream;
    }

    @Bean
    public RevisionResolver revisionResolver() {
        return new FixedValueRevisionResolver(version);
    }

    @Autowired
    public void configureTrackingEventProcessors(EventProcessingConfigurer configurer) {
        // ok, ici on utilise un processeur d'event qui va être bien synchrone.
        configurer.usingSubscribingEventProcessors();
        // Configure error handler
        configurer.registerDefaultListenerInvocationErrorHandler(addErrorListener());
    }

    private Function<org.axonframework.config.Configuration, ListenerInvocationErrorHandler> addErrorListener() {
        return configuration -> (ListenerInvocationErrorHandler) (exception, eventMessage, eventHandler) -> {
            LOGGER.error(exception.getMessage(), exception);
            LogGenerator.stackTracelogGeneration(exception, exception.getMessage());
            LogGenerator.xllogGeneration("AXON_INCIDENT", XlLevels.SEVERE, "error on event %s, error (%s) ",
                    eventMessage.getPayload() == null ? eventMessage.getClass() : eventMessage.getPayload().getClass(),
                    exception.getMessage());

            if (eventMessage.getPayload() instanceof PersistedEvent) {
                eventGateway.publish(ImmutableTechnicalError.builder()
                        .id(((PersistedEvent) eventMessage.getPayload()).id())
                        .throwable(exception)
                        .payload((PersistedEvent) eventMessage.getPayload()).build());
            }
        };
    }

    @Autowired
    public void configureEventStorageEngine(Configurer configurer, EventStorageEngine eventStorageEngine) {
        configurer.configureEmbeddedEventStore(c -> new FilteringEventStorageEngine(eventStorageEngine, eventMessage -> eventMessage.getPayload() instanceof PersistedEvent));
    }

    @Autowired
    public void configureEventBus(EventBus eventBus) {
        eventBus.registerDispatchInterceptor(mdcDataInterceptor());
        if (LOGGER.isDebugEnabled()) {
            eventBus.registerDispatchInterceptor(messages -> (integer, eventMessage) -> {
                LOGGER.info(">{}<", ColorizedLogs.brightViolet("Dispatching Event " + eventMessage.getPayloadType().getSimpleName()));
                return eventMessage;
            });
        }

    }

    @Bean
    @Primary
    public SimpleQueryBus queryBus(org.axonframework.spring.config.AxonConfiguration axonConfiguration, TransactionManager transactionManager, QueryUpdateEmitter queryUpdateEmitter) {
        SimpleQueryBus bus = SimpleQueryBus.builder()
                .messageMonitor(axonConfiguration.messageMonitor(QueryBus.class, "queryBus"))
                .transactionManager(transactionManager)
                .errorHandler(axonConfiguration.getComponent(
                        QueryInvocationErrorHandler.class,
                        () -> LoggingQueryInvocationErrorHandler.builder().build()
                ))
                .queryUpdateEmitter(queryUpdateEmitter)
                .build();
        bus.registerHandlerInterceptor(correlationIdThreadLocalInterceptor());
        return bus;
    }

    @Bean
    @Primary
    QueryUpdateEmitter queryUpdateEmitter() {
        ManagedQueryUpdateEmitter updateEmitter = new ManagedQueryUpdateEmitter();
        updateEmitter.registerDispatchInterceptor(mdcDataInterceptor());
        return updateEmitter;
    }

    private MessageHandlerInterceptor<Message<?>> correlationIdThreadLocalInterceptor() {
        return (unitOfWork, interceptorChain) -> {
            setMdc(
                    unitOfWork::getCorrelationData,
                    unitOfWork::getMessage,
                    mdcMap -> unitOfWork.transformMessage(message -> message.andMetaData(Map.of(MDC_MAP, mdcMap)))
            );

            Object result = interceptorChain.proceed();

            LOGGER.debug("> message handling done.");
            return result;
        };
    }

    private <T> T setMdc(Supplier<MetaData> metaDataSupplier, Supplier<Message<?>> messageSupplier, Function<Map<String, String>, T> mdcFunction) {
        Map<String, String> mdcMap = (Map<String, String>) metaDataSupplier.get().get(MDC_MAP);
        if (mdcMap != null) {
            MDC.setContextMap(mdcMap);

        } else {
            mdcMap = MDC.getCopyOfContextMap();
            if (mdcMap != null) {
                LOGGER.debug("=> setting correlation ID {} on message of type {}", Logdorak.getContextCorrelationId(), messageSupplier.get().getPayloadType().getSimpleName());
                return mdcFunction.apply(mdcMap);

            } else {
                LOGGER.warn("MDC context map has been lost somewhere in the process");
            }
        }

        return null;
    }

    private String getCorrelationIdFromMdc(Map<String, String> mdcMap) {
        return mdcMap.get("AVRO_LOG_HELPER_MDC_KEY_CORRELATION_ID");
    }

    @Bean
    public AsynchronousCommandBus commandBus(Executor mainExecutor, TransactionManager txManager, org.axonframework.spring.config.AxonConfiguration axonConfiguration) {
        AsynchronousCommandBus commandBus =
                AsynchronousCommandBus.builder()
                        .transactionManager(txManager)
                        .executor(mainExecutor)
                        .messageMonitor(axonConfiguration.messageMonitor(AsynchronousCommandBus.class, "commandBus"))
                        .build();
        commandBus.registerHandlerInterceptor(new CorrelationDataInterceptor<>(axonConfiguration.correlationDataProviders()));
        commandBus.registerHandlerInterceptor(correlationIdThreadLocalInterceptor());
        if (LOGGER.isDebugEnabled()) {
            commandBus.registerDispatchInterceptor(messages -> (integer, commandMessage) -> {
                LOGGER.info(">{}<", ColorizedLogs.brightBlack("Dispatching Command " + commandMessage.getPayloadType().getSimpleName()));
                return commandMessage;
            });
        }

        return commandBus;
    }

    @Bean
    public CommandGateway commandGateway(CommandBus commandBus) {
        return DefaultCommandGateway.builder().commandBus(commandBus)
                .dispatchInterceptors(Collections.singletonList(mdcDataInterceptor()))
                .build();
    }

    /**
     * customize le scheduler quartz pour ajouter le nom. Il me semble que c'est un bug spring boot qui devrait être réglé dans
     * des futurs versions. à voir si on a toujours besoin de ça suite aux migrations springboot.
     *
     * @param properties
     * @return
     */
    @Bean
    public SchedulerFactoryBeanCustomizer quartzCustomizer(QuartzProperties properties) {
        return schedulerFactoryBean -> {
            LOGGER.debug("Customization quartz, naming cluster like this: {}", properties.getProperties().get("org.quartz.scheduler.instanceName"));
            schedulerFactoryBean.setSchedulerName(properties.getProperties().get("org.quartz.scheduler.instanceName"));
        };
    }

    @Bean
    public DeadlineManager deadlineManager(Scheduler scheduler,
            org.axonframework.spring.config.AxonConfiguration configuration,
            SpringTransactionManager transactionManager,
            Serializer serializer,
            Sumon sumon,
            ObjectMapper mapper,
            @Value("${spring.profiles.active:std}") String springProfile) {
        if ("blackboxtests".equals(springProfile) || "perf".equals(springProfile)) {
            LOGGER.info("Clean schedule tasks (quartz");
            try {
                scheduler.clear();
            } catch (SchedulerException e) {
                throw new HupRuntimeException(e);
            }
        }
        QuartzDeadlineManager deadlineManager = QuartzDeadlineManager.builder()
                .scheduler(scheduler)
                .scopeAwareProvider(new ConfigurationScopeAwareProvider(configuration))
                .serializer(serializer)
                .transactionManager(transactionManager)
                .refireImmediatelyPolicy(throwable -> false) // never retry a failed deadline
                .build();
        return new SupervisedDeadlineManager(deadlineManager, sumon, mapper);
    }

    @Bean
    public ObjectMapper defaultAxonObjectMapper() {
        return JsonObjectSerializable.mapperForStore;
    }

    @Bean
    CorrelationDataProvider simpleCorrelationDataProvider() {
        return new SimpleCorrelationDataProvider(MDC_MAP);
    }

    private MessageDispatchInterceptor<Message<?>> mdcDataInterceptor() {
        return messages -> (integer, message) -> {
            var modifiedMessage = setMdc(
                    message::getMetaData,
                    () -> message,
                    mdcMap -> message.andMetaData(Map.of(MDC_MAP, mdcMap))
            );

            return modifiedMessage != null ? modifiedMessage : message;
        };
    }

    @Qualifier("redisSagaStore")
    @Bean
    @ConditionalOnProperty(prefix = "redis", value = "enabled", havingValue = "true")
    public SagaStore redisSagaStore(RedisTemplate<String, String> redisTemplate, Sumon sumon) {
        return new RedisSagaStore(redisTemplate, sumon, Duration.parse(sagaStoresTtl));
    }

    @Qualifier("oracleSagaStore")
    @Primary
    @Bean
    public JpaSagaStore oracleSagaStore(Serializer serializer, EntityManagerProvider entityManagerProvider) {
        return JpaSagaStore.builder()
                .entityManagerProvider(entityManagerProvider)
                .serializer(serializer)
                .build();
    }
}

I’ve also a custom Executor :

import com.codahale.metrics.Timer;
import com.xxxx.logdorak.mdc.PreserveMDCTask;
import com.xxx.sumon.Sumon;
import io.reactivex.plugins.RxJavaPlugins;
import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.jmx.export.annotation.ManagedAttribute;
import org.springframework.jmx.export.annotation.ManagedResource;
import org.springframework.scheduling.concurrent.ConcurrentTaskExecutor;

import javax.annotation.PreDestroy;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;

@Configuration
public class ThreadingConfiguration {

    /**
     * setup rxjava pour garder les infos du mdc.
     */
    static {
        RxJavaPlugins.setScheduleHandler(PreserveMDCTask::new);
    }

    @Bean
    public MultiThreadPoolTaskExecutorStrategy multiThreadPoolTaskExecutorStrategy(Sumon sumon, @Value("${taskExecutor.poolSize:30}")int nThreads) {
        return new MultiThreadPoolTaskExecutorStrategy(sumon, nThreads);
    }

    @Bean
    public Executor mainExecutor(MultiThreadPoolTaskExecutorStrategy taskExecutorStrategy) {
        return taskExecutorStrategy.getTaskExecutor();
    }

    @ManagedResource(objectName = "com.xxxx.infrastructure.framework:name=multiThreadPool,type=MultiThreadPool")
    public class MultiThreadPoolTaskExecutorStrategy {

        private final ThreadPoolExecutor executor;
        private final ConcurrentTaskExecutor taskExecutor;

        public MultiThreadPoolTaskExecutorStrategy(Sumon sumon, int nThreads) {
            this.executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(nThreads);
            this.taskExecutor = new ConcurrentTaskExecutor(executor);
            this.taskExecutor.setTaskDecorator(runnable -> () -> {
                try (Timer.Context ignored = sumon.getMetricRegistry().timer("na.na.na.fwk.MultiThreadPool.tasks").time()) {
                    runnable.run();
                }finally {
                    // Nettoyage du MDC pour les threads du pool utilisés par Axon
                    // En effet, il n'existe pas d'interceptors pour nettoyer le MDC après fin d'un message /unit of work
                    // Voir #AxonConfiguration.mdcDataInterceptor pour l'alimentation du MDC
                    MDC.clear();
                }
            });
        }

        @ManagedAttribute
        public int getQueueSize() {
            return executor.getQueue().size();
        }

        @ManagedAttribute
        public int getQueueRemainingCapacity() {
            return executor.getQueue().remainingCapacity();
        }

        @ManagedAttribute
        public int getActiveCount() {
            return executor.getActiveCount();
        }

        @ManagedAttribute
        public int getPoolSize() {
            return executor.getPoolSize();
        }

        @ManagedAttribute
        public long getCompletedTaskCount() {
            return executor.getCompletedTaskCount();
        }

        @ManagedAttribute
        public long getTaskCount() {
            return executor.getTaskCount();
        }

        public TaskExecutor getTaskExecutor() {
            return taskExecutor;
        }

        @PreDestroy
        public void done() {
            executor.shutdown();
        }
    }
}

what am i doing wrong ?
Thanks :slight_smile:

I think the problem is relatively simple, @orwel!
The error handler you configure is the ListenerInvocationErrorHandler.
This error handling level deals with exceptions thrown in the Event Handling function.

However, transactional failures like those you’re imposing on your system are not Event Handling failures but occur deeper down. They’re most likely captured by the ErrorHandler configured on the Event Processor. The default ErrorHandler is the PropagatingErrorHandler that’ll rethrow the caught exception. This implementation causes the Event Processor to fail, thus imposing a different result depending on whether you’ve chosen a SubscribingEventProcessor, TrackingEventProcessor, or a PooledStreamingEventProcessor.

I refer to this page in our documentation if you want to read more on the subject.


By the way, I spot something odd in your configuration.
The configureTrackingEventProcessors method starts with usingSubscribingEventProcessors. This implementation means you make the entire command handling and event handling process synchronous. Although an option, this is very likely not what you want for an end system. It, in essence, breaks the CQRS concept on a transactional level. So please be careful with such a configuration.

Secondly, I see you configure a RedisSagaStore. Is this open-source, perchance? If so, would you mind pointing me to the repository containing this?

Hello @Steven_van_Beelen,
thank you very much for your feedback (and sorry for my delay in answering…). I tried to set up a handler at the level of the ErrorHandler, but without success (processingConfigurer.registerDefaultErrorHandler(conf → /* create error handler */)

This handler is not called during errors on the initialization of the UnitOfWork transaction.

Thank you for your remark on the configuration of usingSubscribingEventProcessors!
This is indeed not the behavior that was imagined for our application, I will dig to understand why this was put in place…

Regarding the RedisSagaStore, it’s a very simple implementation that we have made ourselves (based on Spring’s RedisTemplate and which implements the SagaStore interface)

Regards

That’s problematic, and not what I’d expect, honestly.
Have you debugged the application plus framework perhaps and seen where it falls between the cracks?
Maybe the ErrorHandler you set up didn’t end up on the Event Processor as intended.
Spotting this should be relatively easy in a debug session, I’d assume.

In short, I would expect Axon’s error handling logic on the Event Processor’s side to also catch these cases of failing to build the transaction. And if it doesn’t there’s something to fix.

Based on your reply here, I am assuming it’s not something you and the team are going to open source? :slight_smile: