Hi,
I have a problem with the handling of an exception thrown during a call to UnitOfWork.attachTransaction.
Context : I test the resilience of my application. I simulate a significant latency between my application and an Oracle database with a toxiproxy.
Here the error stack I get :
01-03-2022 13:38:51 WARN o.h.e.jdbc.spi.SqlExceptionHelper - SQL Error: 0, SQLState: null 01-03-2022 13:38:51 ERROR o.h.e.jdbc.spi.SqlExceptionHelper - na.na.na.fwk.HUP_SPRING_HIKARI_POOL - Connection is not available, request timed out after 1000ms. Exception in thread "pool-5-thread-18" org.springframework.transaction.CannotCreateTransactionException: Could not open JPA EntityManager for transaction; nested exception is org.hibernate.exception.JDBCConnectionException: Unable to acquire JDBC Connection at org.springframework.orm.jpa.JpaTransactionManager.doBegin(JpaTransactionManager.java:467) at org.springframework.transaction.support.AbstractPlatformTransactionManager.startTransaction(AbstractPlatformTransactionManager.java:400) at org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:373) at org.axonframework.spring.messaging.unitofwork.SpringTransactionManager.startTransaction(SpringTransactionManager.java:59) at org.axonframework.messaging.unitofwork.UnitOfWork.attachTransaction(UnitOfWork.java:272) at org.axonframework.commandhandling.SimpleCommandBus.handle(SimpleCommandBus.java:173) at org.axonframework.commandhandling.AsynchronousCommandBus.lambda$handle$0(AsynchronousCommandBus.java:90) at com.xxx.infrastructure.framework.ThreadingConfiguration$MultiThreadPoolTaskExecutorStrategy.lambda$new$0(ThreadingConfiguration.java:52) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834) Caused by: org.hibernate.exception.JDBCConnectionException: Unable to acquire JDBC Connection at org.hibernate.exception.internal.SQLExceptionTypeDelegate.convert(SQLExceptionTypeDelegate.java:48) at org.hibernate.exception.internal.StandardSQLExceptionConverter.convert(StandardSQLExceptionConverter.java:42) at org.hibernate.engine.jdbc.spi.SqlExceptionHelper.convert(SqlExceptionHelper.java:113) at org.hibernate.engine.jdbc.spi.SqlExceptionHelper.convert(SqlExceptionHelper.java:99) at org.hibernate.resource.jdbc.internal.LogicalConnectionManagedImpl.acquireConnectionIfNeeded(LogicalConnectionManagedImpl.java:111) at org.hibernate.resource.jdbc.internal.LogicalConnectionManagedImpl.getPhysicalConnection(LogicalConnectionManagedImpl.java:138) at org.hibernate.resource.jdbc.internal.LogicalConnectionManagedImpl.getConnectionForTransactionManagement(LogicalConnectionManagedImpl.java:276) at org.hibernate.resource.jdbc.internal.LogicalConnectionManagedImpl.begin(LogicalConnectionManagedImpl.java:284) at org.hibernate.resource.transaction.backend.jdbc.internal.JdbcResourceLocalTransactionCoordinatorImpl$TransactionDriverControlImpl.begin(JdbcResourceLocalTransactionCoordinatorImpl.java:246) at org.hibernate.engine.transaction.internal.TransactionImpl.begin(TransactionImpl.java:83) at org.springframework.orm.jpa.vendor.HibernateJpaDialect.beginTransaction(HibernateJpaDialect.java:164) at org.springframework.orm.jpa.JpaTransactionManager.doBegin(JpaTransactionManager.java:421) ... 10 more Caused by: java.sql.SQLTransientConnectionException: na.na.na.fwk.HUP_SPRING_HIKARI_POOL - Connection is not available, request timed out after 1000ms. at com.zaxxer.hikari.pool.HikariPool.createTimeoutException(HikariPool.java:696) at com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:197) at com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:162) at com.zaxxer.hikari.HikariDataSource.getConnection(HikariDataSource.java:100) at org.hibernate.engine.jdbc.connections.internal.DatasourceConnectionProviderImpl.getConnection(DatasourceConnectionProviderImpl.java:122) at org.hibernate.internal.NonContextualJdbcConnectionAccess.obtainConnection(NonContextualJdbcConnectionAccess.java:38) at org.hibernate.resource.jdbc.internal.LogicalConnectionManagedImpl.acquireConnectionIfNeeded(LogicalConnectionManagedImpl.java:108) ... 17 more
I can’t catch this error to deal with it properly… I have a some listeners defined on my AxonConfiguration, but none are called when this exception occurs
My custom AxonConfiguration :
import com.fasterxml.jackson.databind.ObjectMapper;
import com.thoughtworks.xstream.XStream;
import com.thoughtworks.xstream.security.AnyTypePermission;
import com.xxx.events.ImmutableTechnicalError;
import com.xxx.events.PersistedEvent;
import com.xxx.framework.axon.RedisSagaStore;
import com.xxx.framework.deadline.SupervisedDeadlineManager;
import com.xxx.framework.exceptions.HupRuntimeException;
import com.xxx.framework.json.JsonObjectSerializable;
import com.xxx.framework.logs.ColorizedLogs;
import comxxx.services.logs.LogGenerator;
import com.xxx.deckard.bean.XlLevels;
import com.xxx.logdorak.Logdorak;
import com.xxx.sumon.Sumon;
import org.axonframework.commandhandling.AsynchronousCommandBus;
import org.axonframework.commandhandling.CommandBus;
import org.axonframework.commandhandling.gateway.CommandGateway;
import org.axonframework.commandhandling.gateway.DefaultCommandGateway;
import org.axonframework.common.jpa.EntityManagerProvider;
import org.axonframework.common.transaction.TransactionManager;
import org.axonframework.config.ConfigurationScopeAwareProvider;
import org.axonframework.config.Configurer;
import org.axonframework.config.EventProcessingConfigurer;
import org.axonframework.deadline.DeadlineManager;
import org.axonframework.deadline.quartz.QuartzDeadlineManager;
import org.axonframework.eventhandling.EventBus;
import org.axonframework.eventhandling.ListenerInvocationErrorHandler;
import org.axonframework.eventhandling.gateway.EventGateway;
import org.axonframework.eventsourcing.FilteringEventStorageEngine;
import org.axonframework.eventsourcing.eventstore.EventStorageEngine;
import org.axonframework.messaging.Message;
import org.axonframework.messaging.MessageDispatchInterceptor;
import org.axonframework.messaging.MessageHandlerInterceptor;
import org.axonframework.messaging.MetaData;
import org.axonframework.messaging.correlation.CorrelationDataProvider;
import org.axonframework.messaging.correlation.SimpleCorrelationDataProvider;
import org.axonframework.messaging.interceptors.CorrelationDataInterceptor;
import org.axonframework.modelling.saga.repository.SagaStore;
import org.axonframework.modelling.saga.repository.jpa.JpaSagaStore;
import org.axonframework.queryhandling.*;
import org.axonframework.serialization.FixedValueRevisionResolver;
import org.axonframework.serialization.RevisionResolver;
import org.axonframework.serialization.Serializer;
import org.axonframework.spring.messaging.unitofwork.SpringTransactionManager;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.autoconfigure.quartz.QuartzProperties;
import org.springframework.boot.autoconfigure.quartz.SchedulerFactoryBeanCustomizer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.data.redis.core.RedisTemplate;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.function.Function;
import java.util.function.Supplier;
@Configuration
public class AxonConfiguration {
private static final String MDC_MAP = "MDC_MAP";
private static final Logger LOGGER = LoggerFactory.getLogger(AxonConfiguration.class);
@Value("${application.version}")
String version;
@Value("${redis.retention.axon.saga.stores:PT80H}")
String sagaStoresTtl;
@Autowired
EventGateway eventGateway;
@Bean
@Primary
public XStream xStream() {
var xstream = new XStream();
xstream.addPermission(AnyTypePermission.ANY);
return xstream;
}
@Bean
public RevisionResolver revisionResolver() {
return new FixedValueRevisionResolver(version);
}
@Autowired
public void configureTrackingEventProcessors(EventProcessingConfigurer configurer) {
// ok, ici on utilise un processeur d'event qui va être bien synchrone.
configurer.usingSubscribingEventProcessors();
// Configure error handler
configurer.registerDefaultListenerInvocationErrorHandler(addErrorListener());
}
private Function<org.axonframework.config.Configuration, ListenerInvocationErrorHandler> addErrorListener() {
return configuration -> (ListenerInvocationErrorHandler) (exception, eventMessage, eventHandler) -> {
LOGGER.error(exception.getMessage(), exception);
LogGenerator.stackTracelogGeneration(exception, exception.getMessage());
LogGenerator.xllogGeneration("AXON_INCIDENT", XlLevels.SEVERE, "error on event %s, error (%s) ",
eventMessage.getPayload() == null ? eventMessage.getClass() : eventMessage.getPayload().getClass(),
exception.getMessage());
if (eventMessage.getPayload() instanceof PersistedEvent) {
eventGateway.publish(ImmutableTechnicalError.builder()
.id(((PersistedEvent) eventMessage.getPayload()).id())
.throwable(exception)
.payload((PersistedEvent) eventMessage.getPayload()).build());
}
};
}
@Autowired
public void configureEventStorageEngine(Configurer configurer, EventStorageEngine eventStorageEngine) {
configurer.configureEmbeddedEventStore(c -> new FilteringEventStorageEngine(eventStorageEngine, eventMessage -> eventMessage.getPayload() instanceof PersistedEvent));
}
@Autowired
public void configureEventBus(EventBus eventBus) {
eventBus.registerDispatchInterceptor(mdcDataInterceptor());
if (LOGGER.isDebugEnabled()) {
eventBus.registerDispatchInterceptor(messages -> (integer, eventMessage) -> {
LOGGER.info(">{}<", ColorizedLogs.brightViolet("Dispatching Event " + eventMessage.getPayloadType().getSimpleName()));
return eventMessage;
});
}
}
@Bean
@Primary
public SimpleQueryBus queryBus(org.axonframework.spring.config.AxonConfiguration axonConfiguration, TransactionManager transactionManager, QueryUpdateEmitter queryUpdateEmitter) {
SimpleQueryBus bus = SimpleQueryBus.builder()
.messageMonitor(axonConfiguration.messageMonitor(QueryBus.class, "queryBus"))
.transactionManager(transactionManager)
.errorHandler(axonConfiguration.getComponent(
QueryInvocationErrorHandler.class,
() -> LoggingQueryInvocationErrorHandler.builder().build()
))
.queryUpdateEmitter(queryUpdateEmitter)
.build();
bus.registerHandlerInterceptor(correlationIdThreadLocalInterceptor());
return bus;
}
@Bean
@Primary
QueryUpdateEmitter queryUpdateEmitter() {
ManagedQueryUpdateEmitter updateEmitter = new ManagedQueryUpdateEmitter();
updateEmitter.registerDispatchInterceptor(mdcDataInterceptor());
return updateEmitter;
}
private MessageHandlerInterceptor<Message<?>> correlationIdThreadLocalInterceptor() {
return (unitOfWork, interceptorChain) -> {
setMdc(
unitOfWork::getCorrelationData,
unitOfWork::getMessage,
mdcMap -> unitOfWork.transformMessage(message -> message.andMetaData(Map.of(MDC_MAP, mdcMap)))
);
Object result = interceptorChain.proceed();
LOGGER.debug("> message handling done.");
return result;
};
}
private <T> T setMdc(Supplier<MetaData> metaDataSupplier, Supplier<Message<?>> messageSupplier, Function<Map<String, String>, T> mdcFunction) {
Map<String, String> mdcMap = (Map<String, String>) metaDataSupplier.get().get(MDC_MAP);
if (mdcMap != null) {
MDC.setContextMap(mdcMap);
} else {
mdcMap = MDC.getCopyOfContextMap();
if (mdcMap != null) {
LOGGER.debug("=> setting correlation ID {} on message of type {}", Logdorak.getContextCorrelationId(), messageSupplier.get().getPayloadType().getSimpleName());
return mdcFunction.apply(mdcMap);
} else {
LOGGER.warn("MDC context map has been lost somewhere in the process");
}
}
return null;
}
private String getCorrelationIdFromMdc(Map<String, String> mdcMap) {
return mdcMap.get("AVRO_LOG_HELPER_MDC_KEY_CORRELATION_ID");
}
@Bean
public AsynchronousCommandBus commandBus(Executor mainExecutor, TransactionManager txManager, org.axonframework.spring.config.AxonConfiguration axonConfiguration) {
AsynchronousCommandBus commandBus =
AsynchronousCommandBus.builder()
.transactionManager(txManager)
.executor(mainExecutor)
.messageMonitor(axonConfiguration.messageMonitor(AsynchronousCommandBus.class, "commandBus"))
.build();
commandBus.registerHandlerInterceptor(new CorrelationDataInterceptor<>(axonConfiguration.correlationDataProviders()));
commandBus.registerHandlerInterceptor(correlationIdThreadLocalInterceptor());
if (LOGGER.isDebugEnabled()) {
commandBus.registerDispatchInterceptor(messages -> (integer, commandMessage) -> {
LOGGER.info(">{}<", ColorizedLogs.brightBlack("Dispatching Command " + commandMessage.getPayloadType().getSimpleName()));
return commandMessage;
});
}
return commandBus;
}
@Bean
public CommandGateway commandGateway(CommandBus commandBus) {
return DefaultCommandGateway.builder().commandBus(commandBus)
.dispatchInterceptors(Collections.singletonList(mdcDataInterceptor()))
.build();
}
/**
* customize le scheduler quartz pour ajouter le nom. Il me semble que c'est un bug spring boot qui devrait être réglé dans
* des futurs versions. à voir si on a toujours besoin de ça suite aux migrations springboot.
*
* @param properties
* @return
*/
@Bean
public SchedulerFactoryBeanCustomizer quartzCustomizer(QuartzProperties properties) {
return schedulerFactoryBean -> {
LOGGER.debug("Customization quartz, naming cluster like this: {}", properties.getProperties().get("org.quartz.scheduler.instanceName"));
schedulerFactoryBean.setSchedulerName(properties.getProperties().get("org.quartz.scheduler.instanceName"));
};
}
@Bean
public DeadlineManager deadlineManager(Scheduler scheduler,
org.axonframework.spring.config.AxonConfiguration configuration,
SpringTransactionManager transactionManager,
Serializer serializer,
Sumon sumon,
ObjectMapper mapper,
@Value("${spring.profiles.active:std}") String springProfile) {
if ("blackboxtests".equals(springProfile) || "perf".equals(springProfile)) {
LOGGER.info("Clean schedule tasks (quartz");
try {
scheduler.clear();
} catch (SchedulerException e) {
throw new HupRuntimeException(e);
}
}
QuartzDeadlineManager deadlineManager = QuartzDeadlineManager.builder()
.scheduler(scheduler)
.scopeAwareProvider(new ConfigurationScopeAwareProvider(configuration))
.serializer(serializer)
.transactionManager(transactionManager)
.refireImmediatelyPolicy(throwable -> false) // never retry a failed deadline
.build();
return new SupervisedDeadlineManager(deadlineManager, sumon, mapper);
}
@Bean
public ObjectMapper defaultAxonObjectMapper() {
return JsonObjectSerializable.mapperForStore;
}
@Bean
CorrelationDataProvider simpleCorrelationDataProvider() {
return new SimpleCorrelationDataProvider(MDC_MAP);
}
private MessageDispatchInterceptor<Message<?>> mdcDataInterceptor() {
return messages -> (integer, message) -> {
var modifiedMessage = setMdc(
message::getMetaData,
() -> message,
mdcMap -> message.andMetaData(Map.of(MDC_MAP, mdcMap))
);
return modifiedMessage != null ? modifiedMessage : message;
};
}
@Qualifier("redisSagaStore")
@Bean
@ConditionalOnProperty(prefix = "redis", value = "enabled", havingValue = "true")
public SagaStore redisSagaStore(RedisTemplate<String, String> redisTemplate, Sumon sumon) {
return new RedisSagaStore(redisTemplate, sumon, Duration.parse(sagaStoresTtl));
}
@Qualifier("oracleSagaStore")
@Primary
@Bean
public JpaSagaStore oracleSagaStore(Serializer serializer, EntityManagerProvider entityManagerProvider) {
return JpaSagaStore.builder()
.entityManagerProvider(entityManagerProvider)
.serializer(serializer)
.build();
}
}
I’ve also a custom Executor :
import com.codahale.metrics.Timer;
import com.xxxx.logdorak.mdc.PreserveMDCTask;
import com.xxx.sumon.Sumon;
import io.reactivex.plugins.RxJavaPlugins;
import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.jmx.export.annotation.ManagedAttribute;
import org.springframework.jmx.export.annotation.ManagedResource;
import org.springframework.scheduling.concurrent.ConcurrentTaskExecutor;
import javax.annotation.PreDestroy;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
@Configuration
public class ThreadingConfiguration {
/**
* setup rxjava pour garder les infos du mdc.
*/
static {
RxJavaPlugins.setScheduleHandler(PreserveMDCTask::new);
}
@Bean
public MultiThreadPoolTaskExecutorStrategy multiThreadPoolTaskExecutorStrategy(Sumon sumon, @Value("${taskExecutor.poolSize:30}")int nThreads) {
return new MultiThreadPoolTaskExecutorStrategy(sumon, nThreads);
}
@Bean
public Executor mainExecutor(MultiThreadPoolTaskExecutorStrategy taskExecutorStrategy) {
return taskExecutorStrategy.getTaskExecutor();
}
@ManagedResource(objectName = "com.xxxx.infrastructure.framework:name=multiThreadPool,type=MultiThreadPool")
public class MultiThreadPoolTaskExecutorStrategy {
private final ThreadPoolExecutor executor;
private final ConcurrentTaskExecutor taskExecutor;
public MultiThreadPoolTaskExecutorStrategy(Sumon sumon, int nThreads) {
this.executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(nThreads);
this.taskExecutor = new ConcurrentTaskExecutor(executor);
this.taskExecutor.setTaskDecorator(runnable -> () -> {
try (Timer.Context ignored = sumon.getMetricRegistry().timer("na.na.na.fwk.MultiThreadPool.tasks").time()) {
runnable.run();
}finally {
// Nettoyage du MDC pour les threads du pool utilisés par Axon
// En effet, il n'existe pas d'interceptors pour nettoyer le MDC après fin d'un message /unit of work
// Voir #AxonConfiguration.mdcDataInterceptor pour l'alimentation du MDC
MDC.clear();
}
});
}
@ManagedAttribute
public int getQueueSize() {
return executor.getQueue().size();
}
@ManagedAttribute
public int getQueueRemainingCapacity() {
return executor.getQueue().remainingCapacity();
}
@ManagedAttribute
public int getActiveCount() {
return executor.getActiveCount();
}
@ManagedAttribute
public int getPoolSize() {
return executor.getPoolSize();
}
@ManagedAttribute
public long getCompletedTaskCount() {
return executor.getCompletedTaskCount();
}
@ManagedAttribute
public long getTaskCount() {
return executor.getTaskCount();
}
public TaskExecutor getTaskExecutor() {
return taskExecutor;
}
@PreDestroy
public void done() {
executor.shutdown();
}
}
}
what am i doing wrong ?
Thanks