Hi There. I have been having difficulties with exchanging my InMemorySaga Store with Postgres as a Saga Store instead and was hoping for some insight.
I have three services using Axon Framework+Springboot, with Kafka Extension+Springboot starter, using Kafka as a message broker to communicate between the three services. I have swapped Axon-Server with Postgres as an event store. All use Subscribing Consumer and Producers.
In addition to regular aggregate event handlers in all 3 services, I have an @Saga class in Service A with saga event handlers and utilizing an in memory saga store. In this case, all works fine. All the saga and non-saga event handlers get triggered and Kafka is allowing communication nicely.
My issue comes in when I want to swap In-Memory Saga Store that I am currently using, with postgres as the saga store. To do this I am using the same processing-group via annotation on my @Saga class that the regular event handlers use and removing the current InMemorySagaStore bean, as it is my understanding that Axon will auto-detect JPA Postgres configured. This is the approach I took to swap the Event Store with Postgres which worked fine. However, using this approach for replacing In Memory Saga Store with Postgres the following Exception below is thrown. I have read previous threads about adding @Transactional to help with this type of error but it has not worked for me.
// removed
// @Bean
// InMemorySagaStore sagaStore() {
// return new InMemorySagaStore();
// }
Service App Log:
2020-11-05 16:25:01.798 INFO 48433 --- [ing.producer]-0] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.6.0
2020-11-05 16:25:01.799 INFO 48433 --- [ing.producer]-0] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 62abe01bee039651
2020-11-05 16:25:01.799 INFO 48433 --- [ing.producer]-0] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1604611501798
2020-11-05 16:25:01.809 INFO 48433 --- [ad | saga-o] org.apache.kafka.clients.Metadata : [Producer clientId=saga-o] Cluster ID: dnjKyuqiRwK8rqQTSsJewQ
2020-11-05 16:25:01.857 DEBUG 48433 --- [ AsyncFetcher-0] c.a.sagas.OrderManagementSaga : [Saga Started] Order ID: c4e2ac0b-48b0-4dc9-87e4-ff7556edd4dd
2020-11-05 16:25:01.866 ERROR 48433 --- [ AsyncFetcher-0] o.a.e.k.e.consumer.FetchEventsTask : Cannot proceed with fetching ConsumerRecords since we encountered an exception
javax.persistence.TransactionRequiredException: No EntityManager with actual transaction available for current thread - cannot reliably process 'persist' call
at org.springframework.orm.jpa.SharedEntityManagerCreator$SharedEntityManagerInvocationHandler.invoke(SharedEntityManagerCreator.java:298) ~[spring-orm-5.2.9.RELEASE.jar!/:5.2.9.RELEASE]
at com.sun.proxy.$Proxy111.persist(Unknown Source) ~[na:na]
at org.axonframework.modelling.saga.repository.jpa.JpaSagaStore.insertSaga(JpaSagaStore.java:300) ~[axon-modelling-4.1.2.jar!/:4.1.2]
at org.axonframework.modelling.saga.repository.AnnotatedSagaRepository.storeSaga(AnnotatedSagaRepository.java:217) ~[axon-modelling-4.1.2.jar!/:4.1.2]
at org.axonframework.modelling.saga.repository.AnnotatedSagaRepository.lambda$doCreateInstance$3(AnnotatedSagaRepository.java:139) ~[axon-modelling-4.1.2.jar!/:4.1.2]
at org.axonframework.messaging.unitofwork.MessageProcessingContext.notifyHandlers(MessageProcessingContext.java:71) ~[axon-messaging-4.1.2.jar!/:4.1.2]
at org.axonframework.messaging.unitofwork.BatchingUnitOfWork.lambda$notifyHandlers$2(BatchingUnitOfWork.java:155) ~[axon-messaging-4.1.2.jar!/:4.1.2]
at java.util.ArrayList$Itr.forEachRemaining(ArrayList.java:899) ~[na:1.8.0_212]
at org.axonframework.messaging.unitofwork.BatchingUnitOfWork.notifyHandlers(BatchingUnitOfWork.java:155) ~[axon-messaging-4.1.2.jar!/:4.1.2]
at org.axonframework.messaging.unitofwork.AbstractUnitOfWork.changePhase(AbstractUnitOfWork.java:222) ~[axon-messaging-4.1.2.jar!/:4.1.2]
at org.axonframework.messaging.unitofwork.AbstractUnitOfWork.commitAsRoot(AbstractUnitOfWork.java:83) ~[axon-messaging-4.1.2.jar!/:4.1.2]
at org.axonframework.messaging.unitofwork.AbstractUnitOfWork.commit(AbstractUnitOfWork.java:71) ~[axon-messaging-4.1.2.jar!/:4.1.2]
at org.axonframework.messaging.unitofwork.BatchingUnitOfWork.executeWithResult(BatchingUnitOfWork.java:111) ~[axon-messaging-4.1.2.jar!/:4.1.2]
at org.axonframework.eventhandling.AbstractEventProcessor.processInUnitOfWork(AbstractEventProcessor.java:151) ~[axon-messaging-4.1.2.jar!/:4.1.2]
at org.axonframework.eventhandling.AbstractEventProcessor.processInUnitOfWork(AbstractEventProcessor.java:135) ~[axon-messaging-4.1.2.jar!/:4.1.2]
at org.axonframework.eventhandling.SubscribingEventProcessor.process(SubscribingEventProcessor.java:104) ~[axon-messaging-4.1.2.jar!/:4.1.2]
at org.axonframework.eventhandling.DirectEventProcessingStrategy.handle(DirectEventProcessingStrategy.java:35) ~[axon-messaging-4.1.2.jar!/:4.1.2]
at org.axonframework.eventhandling.SubscribingEventProcessor.lambda$start$0(SubscribingEventProcessor.java:90) ~[axon-messaging-4.1.2.jar!/:4.1.2]
at org.axonframework.extensions.kafka.eventhandling.consumer.subscribable.SubscribableKafkaMessageSource.lambda$null$2(SubscribableKafkaMessageSource.java:177) ~[axon-kafka-4.0-RC3.jar!/:4.0-RC3]
at java.util.concurrent.CopyOnWriteArrayList.forEach(CopyOnWriteArrayList.java:891) ~[na:1.8.0_212]
at java.util.concurrent.CopyOnWriteArraySet.forEach(CopyOnWriteArraySet.java:404) ~[na:1.8.0_212]
at org.axonframework.extensions.kafka.eventhandling.consumer.subscribable.SubscribableKafkaMessageSource.lambda$start$3(SubscribableKafkaMessageSource.java:177) ~[axon-kafka-4.0-RC3.jar!/:4.0-RC3]
at org.axonframework.extensions.kafka.eventhandling.consumer.FetchEventsTask.run(FetchEventsTask.java:90) ~[axon-kafka-4.0-RC3.jar!/:4.0-RC3]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_212]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_212]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_212]
2020-11-05 16:25:01.869 INFO 48433 --- [ AsyncFetcher-0] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=saga-o, groupId=groupname1] Revoke previously assigned partitions axon.event.saga-payment-0, axon.event.saga-o-0, axon.event.saga-shipping-0
2020-11-05 16:25:01.870 INFO 48433 --- [ AsyncFetcher-0] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=saga-o, groupId=groupname1] Member saga-o-a327127f-e042-4483-8a2a-d6e88c97002f sending LeaveGroup request to coordinator localhost:9092 (id: 2147483646 rack: null) due to the consumer is being closed
2020-11-05 16:25:01.886 INFO 48433 --- [ AsyncFetcher-0] o.a.e.k.e.consumer.FetchEventsTask : Fetch events task and used Consumer instance [org.apache.kafka.clients.consumer.KafkaConsumer@7b5c92a7] have been closed
Alternatively, if I remove the @ProcessingGroup annotation on my @Saga class, I do not get this exception but some of my Saga Event Handlers that are listening to events thrown by Command Handlers in separate Services are not being triggered.
Here is my saga setup
@Saga
@ProcessingGroup("groupname1") // shares same processing group as aggregate
@AllowReplay(value = false)
public class ExampleSaga {
@StartSaga
@SagaEventHandler(associationProperty = " aggregrateId ") //triggered, published by same service A , log shows avax.persistence.TransactionRequiredException:
public void handle(CreatedEvent event) {
LOGGER.debug("[Saga Started]”);
}
@SagaEventHandler(associationProperty = "aggregrateId"). // Published by Command Handler in Service B
public void handle(CreatedEvent event) {
LOGGER.debug(""[Saga Update] Handling …” );
}
@SagaEventHandler(associationProperty = " aggregrateId ")
public void handle(OrderStatusUpdatedEvent event) {
LOGGER.debug("[Saga Finished] Order ID: {}, Status: {}");
SagaLifecycle.end();
}
}
Server A application.properties:
#Postgresql
spring.datasource.url=jdbc:postgresql:[Dedicated PG Server for Service A]
spring.datasource.driver-class-name=org.postgresql.Driver
# Spring JPA / Hibernate
spring.jpa.database-platform=org.hibernate.dialect.PostgreSQL92Dialect
spring.jpa.properties.hibernate.jdbc.lob.non_contextual_creation=true
spring.jpa.show-sql=false
spring.jpa.hibernate.ddl-auto=update
# Kafka Extension
axon.kafka.client-id=saga-o
axon.kafka.default-topic=axon.event1
axon.kafka.producer.retries=0
axon.kafka.producer.bootstrap-servers=. X1
axon.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
axon.kafka.producer.value-serializer=org.apache.kafka.common.serialization.ByteArraySerializer
axon.kafka.consumer.bootstrap-servers=X1,X2,X3
axon.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
axon.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
axon.kafka.properties.security.protocol=PLAINTEXT
# Kafka Extension: Add’l Custom Properties
axon.kafka.consumer.properties.topics= axon.event1, axon.event2, axon.event3
axon.kafka.consumer.properties.group-id= groupname1
axon.kafka.consumer.properties.processor-name= groupname1
Server B and Server C are similar but with different PG, and different topic/client-id/consumer.topics/consumer.properties.group-id/ onsumer.properties.processor name
Kafka Configuration:
@Bean
public KafkaMessageSourceConfigurer kafkaMessageSourceConfigurer() {
return new KafkaMessageSourceConfigurer();
}
@Bean
public SubscribableKafkaMessageSource<String, byte[]> subscribableKafkaMessageSource(KafkaProperties kafkaProperties,
ConsumerFactory<String, byte[]> consumerFactory,
Fetcher<String, byte[], EventMessage<?>> fetcher,
KafkaMessageConverter<String, byte[]> messageConverter,
KafkaMessageSourceConfigurer kafkaMessageSourceConfigurer) {
SubscribableKafkaMessageSource<String, byte[]> subscribableKafkaMessageSource = SubscribableKafkaMessageSource.<String, byte[]>builder()
.topics(Arrays.asList(
kafkaProperties.getConsumer().getProperties().get("topics").split(","))
)
.groupId(kafkaProperties.getConsumer().getProperties().get("group-id"))
.consumerFactory(consumerFactory)
.fetcher(fetcher)
.messageConverter(messageConverter)
.build();
// Registering the source is required to tie into the Configurers lifecycle to start the source at the right stage
kafkaMessageSourceConfigurer.registerSubscribableSource(configuration -> subscribableKafkaMessageSource);
return subscribableKafkaMessageSource;
}
@Autowired
public void registerKafkaMessageSourceConfigurer(
Configurer configurer, KafkaMessageSourceConfigurer kafkaMessageSourceConfigurer) {
configurer.registerModule(kafkaMessageSourceConfigurer);
}
@Autowired
public void configureSubscribableKafkaSource(
KafkaProperties kafkaProperties,
EventProcessingConfigurer eventProcessingConfigurer,
SubscribableKafkaMessageSource<String, byte[]> subscribableKafkaMessageSource) {
eventProcessingConfigurer.registerSubscribingEventProcessor(
kafkaProperties.getConsumer().getProperties().get("processor-name"),
configuration -> subscribableKafkaMessageSource);
}
POM XML Service A
<!-- Axon -->
<dependency>
<groupId>org.axonframework</groupId>
<artifactId>axon-spring-boot-starter</artifactId>
<version>4.1.2</version>
<exclusions>
<exclusion>
<groupId>org.axonframework</groupId>
<artifactId>axon-server-connector</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- Postgres -->
<dependency>
<artifactId>postgresql</artifactId>
<groupId>org.postgresql</groupId>
<scope>runtime</scope>
<version>42.2.9</version>
</dependency>
<!-- Axon Kafka Extension -->
<dependency>
<groupId>org.axonframework.extensions.kafka</groupId>
<artifactId>axon-kafka-spring-boot-starter</artifactId>
<version>${axon-kafka.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>${spring-kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
Thanks in advance.