Issues swapping InMemorySagaStore with Postgres -- Axon Framework With Kafka Extension

Hi There. I have been having difficulties with exchanging my InMemorySaga Store with Postgres as a Saga Store instead and was hoping for some insight.

I have three services using Axon Framework+Springboot, with Kafka Extension+Springboot starter, using Kafka as a message broker to communicate between the three services. I have swapped Axon-Server with Postgres as an event store. All use Subscribing Consumer and Producers.

In addition to regular aggregate event handlers in all 3 services, I have an @Saga class in Service A with saga event handlers and utilizing an in memory saga store. In this case, all works fine. All the saga and non-saga event handlers get triggered and Kafka is allowing communication nicely.

My issue comes in when I want to swap In-Memory Saga Store that I am currently using, with postgres as the saga store. To do this I am using the same processing-group via annotation on my @Saga class that the regular event handlers use and removing the current InMemorySagaStore bean, as it is my understanding that Axon will auto-detect JPA Postgres configured. This is the approach I took to swap the Event Store with Postgres which worked fine. However, using this approach for replacing In Memory Saga Store with Postgres the following Exception below is thrown. I have read previous threads about adding @Transactional to help with this type of error but it has not worked for me.

//   removed
//    @Bean
//    InMemorySagaStore sagaStore() {
//        return new InMemorySagaStore();
//    }

Service App Log:

2020-11-05 16:25:01.798  INFO 48433 --- [ing.producer]-0] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.6.0
2020-11-05 16:25:01.799  INFO 48433 --- [ing.producer]-0] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 62abe01bee039651
2020-11-05 16:25:01.799  INFO 48433 --- [ing.producer]-0] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1604611501798
2020-11-05 16:25:01.809  INFO 48433 --- [ad | saga-o] org.apache.kafka.clients.Metadata        : [Producer clientId=saga-o] Cluster ID: dnjKyuqiRwK8rqQTSsJewQ
2020-11-05 16:25:01.857 DEBUG 48433 --- [ AsyncFetcher-0] c.a.sagas.OrderManagementSaga            : [Saga Started] Order ID: c4e2ac0b-48b0-4dc9-87e4-ff7556edd4dd
2020-11-05 16:25:01.866 ERROR 48433 --- [ AsyncFetcher-0] o.a.e.k.e.consumer.FetchEventsTask       : Cannot proceed with fetching ConsumerRecords since we encountered an exception

javax.persistence.TransactionRequiredException: No EntityManager with actual transaction available for current thread - cannot reliably process 'persist' call
	at org.springframework.orm.jpa.SharedEntityManagerCreator$SharedEntityManagerInvocationHandler.invoke(SharedEntityManagerCreator.java:298) ~[spring-orm-5.2.9.RELEASE.jar!/:5.2.9.RELEASE]
	at com.sun.proxy.$Proxy111.persist(Unknown Source) ~[na:na]
	at org.axonframework.modelling.saga.repository.jpa.JpaSagaStore.insertSaga(JpaSagaStore.java:300) ~[axon-modelling-4.1.2.jar!/:4.1.2]
	at org.axonframework.modelling.saga.repository.AnnotatedSagaRepository.storeSaga(AnnotatedSagaRepository.java:217) ~[axon-modelling-4.1.2.jar!/:4.1.2]
	at org.axonframework.modelling.saga.repository.AnnotatedSagaRepository.lambda$doCreateInstance$3(AnnotatedSagaRepository.java:139) ~[axon-modelling-4.1.2.jar!/:4.1.2]
	at org.axonframework.messaging.unitofwork.MessageProcessingContext.notifyHandlers(MessageProcessingContext.java:71) ~[axon-messaging-4.1.2.jar!/:4.1.2]
	at org.axonframework.messaging.unitofwork.BatchingUnitOfWork.lambda$notifyHandlers$2(BatchingUnitOfWork.java:155) ~[axon-messaging-4.1.2.jar!/:4.1.2]
	at java.util.ArrayList$Itr.forEachRemaining(ArrayList.java:899) ~[na:1.8.0_212]
	at org.axonframework.messaging.unitofwork.BatchingUnitOfWork.notifyHandlers(BatchingUnitOfWork.java:155) ~[axon-messaging-4.1.2.jar!/:4.1.2]
	at org.axonframework.messaging.unitofwork.AbstractUnitOfWork.changePhase(AbstractUnitOfWork.java:222) ~[axon-messaging-4.1.2.jar!/:4.1.2]
	at org.axonframework.messaging.unitofwork.AbstractUnitOfWork.commitAsRoot(AbstractUnitOfWork.java:83) ~[axon-messaging-4.1.2.jar!/:4.1.2]
	at org.axonframework.messaging.unitofwork.AbstractUnitOfWork.commit(AbstractUnitOfWork.java:71) ~[axon-messaging-4.1.2.jar!/:4.1.2]
	at org.axonframework.messaging.unitofwork.BatchingUnitOfWork.executeWithResult(BatchingUnitOfWork.java:111) ~[axon-messaging-4.1.2.jar!/:4.1.2]
	at org.axonframework.eventhandling.AbstractEventProcessor.processInUnitOfWork(AbstractEventProcessor.java:151) ~[axon-messaging-4.1.2.jar!/:4.1.2]
	at org.axonframework.eventhandling.AbstractEventProcessor.processInUnitOfWork(AbstractEventProcessor.java:135) ~[axon-messaging-4.1.2.jar!/:4.1.2]
	at org.axonframework.eventhandling.SubscribingEventProcessor.process(SubscribingEventProcessor.java:104) ~[axon-messaging-4.1.2.jar!/:4.1.2]
	at org.axonframework.eventhandling.DirectEventProcessingStrategy.handle(DirectEventProcessingStrategy.java:35) ~[axon-messaging-4.1.2.jar!/:4.1.2]
	at org.axonframework.eventhandling.SubscribingEventProcessor.lambda$start$0(SubscribingEventProcessor.java:90) ~[axon-messaging-4.1.2.jar!/:4.1.2]
	at org.axonframework.extensions.kafka.eventhandling.consumer.subscribable.SubscribableKafkaMessageSource.lambda$null$2(SubscribableKafkaMessageSource.java:177) ~[axon-kafka-4.0-RC3.jar!/:4.0-RC3]
	at java.util.concurrent.CopyOnWriteArrayList.forEach(CopyOnWriteArrayList.java:891) ~[na:1.8.0_212]
	at java.util.concurrent.CopyOnWriteArraySet.forEach(CopyOnWriteArraySet.java:404) ~[na:1.8.0_212]
	at org.axonframework.extensions.kafka.eventhandling.consumer.subscribable.SubscribableKafkaMessageSource.lambda$start$3(SubscribableKafkaMessageSource.java:177) ~[axon-kafka-4.0-RC3.jar!/:4.0-RC3]
	at org.axonframework.extensions.kafka.eventhandling.consumer.FetchEventsTask.run(FetchEventsTask.java:90) ~[axon-kafka-4.0-RC3.jar!/:4.0-RC3]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_212]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_212]
	at java.lang.Thread.run(Thread.java:748) [na:1.8.0_212]

2020-11-05 16:25:01.869  INFO 48433 --- [ AsyncFetcher-0] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=saga-o, groupId=groupname1] Revoke previously assigned partitions axon.event.saga-payment-0, axon.event.saga-o-0, axon.event.saga-shipping-0
2020-11-05 16:25:01.870  INFO 48433 --- [ AsyncFetcher-0] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=saga-o, groupId=groupname1] Member saga-o-a327127f-e042-4483-8a2a-d6e88c97002f sending LeaveGroup request to coordinator localhost:9092 (id: 2147483646 rack: null) due to the consumer is being closed
2020-11-05 16:25:01.886  INFO 48433 --- [ AsyncFetcher-0] o.a.e.k.e.consumer.FetchEventsTask       : Fetch events task and used Consumer instance [org.apache.kafka.clients.consumer.KafkaConsumer@7b5c92a7] have been closed

Alternatively, if I remove the @ProcessingGroup annotation on my @Saga class, I do not get this exception but some of my Saga Event Handlers that are listening to events thrown by Command Handlers in separate Services are not being triggered.

Here is my saga setup

@Saga
@ProcessingGroup("groupname1")  // shares same processing group as aggregate 
@AllowReplay(value = false)

public class ExampleSaga {
@StartSaga
    @SagaEventHandler(associationProperty = " aggregrateId ") //triggered, published by same service A , log shows avax.persistence.TransactionRequiredException:
    public void handle(CreatedEvent event) {
        LOGGER.debug("[Saga Started]”);
    }


@SagaEventHandler(associationProperty = "aggregrateId").  // Published by Command Handler in Service B 
public void handle(CreatedEvent event) {
LOGGER.debug(""[Saga Update] Handling …” );
}


@SagaEventHandler(associationProperty = " aggregrateId ")	
public void handle(OrderStatusUpdatedEvent event) {
LOGGER.debug("[Saga Finished] Order ID: {}, Status: {}");
SagaLifecycle.end();
}

}

Server A application.properties:
#Postgresql
spring.datasource.url=jdbc:postgresql:[Dedicated PG Server for Service A]
spring.datasource.driver-class-name=org.postgresql.Driver
# Spring JPA / Hibernate
spring.jpa.database-platform=org.hibernate.dialect.PostgreSQL92Dialect
spring.jpa.properties.hibernate.jdbc.lob.non_contextual_creation=true
spring.jpa.show-sql=false
spring.jpa.hibernate.ddl-auto=update
# Kafka Extension
axon.kafka.client-id=saga-o
axon.kafka.default-topic=axon.event1
axon.kafka.producer.retries=0
axon.kafka.producer.bootstrap-servers=. X1
axon.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
axon.kafka.producer.value-serializer=org.apache.kafka.common.serialization.ByteArraySerializer
axon.kafka.consumer.bootstrap-servers=X1,X2,X3
axon.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
axon.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
axon.kafka.properties.security.protocol=PLAINTEXT
# Kafka Extension: Add’l Custom Properties
axon.kafka.consumer.properties.topics= axon.event1, axon.event2, axon.event3
axon.kafka.consumer.properties.group-id= groupname1
axon.kafka.consumer.properties.processor-name= groupname1

Server B and Server C are similar but with different PG, and different topic/client-id/consumer.topics/consumer.properties.group-id/ onsumer.properties.processor name

Kafka Configuration:

 @Bean
public KafkaMessageSourceConfigurer kafkaMessageSourceConfigurer() {
    return new KafkaMessageSourceConfigurer();
}

   @Bean
    public SubscribableKafkaMessageSource<String, byte[]> subscribableKafkaMessageSource(KafkaProperties kafkaProperties,
                                                                                         ConsumerFactory<String, byte[]> consumerFactory,
                                                                                         Fetcher<String, byte[], EventMessage<?>> fetcher,
                                                                                         KafkaMessageConverter<String, byte[]> messageConverter,
                                                                                         KafkaMessageSourceConfigurer kafkaMessageSourceConfigurer) {
        SubscribableKafkaMessageSource<String, byte[]> subscribableKafkaMessageSource = SubscribableKafkaMessageSource.<String, byte[]>builder()
                .topics(Arrays.asList(
                        kafkaProperties.getConsumer().getProperties().get("topics").split(","))
                )
                .groupId(kafkaProperties.getConsumer().getProperties().get("group-id"))
                .consumerFactory(consumerFactory)
                .fetcher(fetcher)
                .messageConverter(messageConverter)
                .build();
        // Registering the source is required to tie into the Configurers lifecycle to start the source at the right stage
        kafkaMessageSourceConfigurer.registerSubscribableSource(configuration -> subscribableKafkaMessageSource);
        return subscribableKafkaMessageSource;
    }


@Autowired
public void registerKafkaMessageSourceConfigurer(
    Configurer configurer, KafkaMessageSourceConfigurer kafkaMessageSourceConfigurer) {
configurer.registerModule(kafkaMessageSourceConfigurer);
}

@Autowired
public void configureSubscribableKafkaSource(
    KafkaProperties kafkaProperties,
    EventProcessingConfigurer eventProcessingConfigurer,
    SubscribableKafkaMessageSource<String, byte[]> subscribableKafkaMessageSource) {
eventProcessingConfigurer.registerSubscribingEventProcessor(
        kafkaProperties.getConsumer().getProperties().get("processor-name"),
        configuration -> subscribableKafkaMessageSource);
}

POM XML Service A

<!-- Axon -->
<dependency>
    <groupId>org.axonframework</groupId>
    <artifactId>axon-spring-boot-starter</artifactId>
    <version>4.1.2</version>
    <exclusions>
        <exclusion>
            <groupId>org.axonframework</groupId>
            <artifactId>axon-server-connector</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<!-- Postgres -->
<dependency>
    <artifactId>postgresql</artifactId>
    <groupId>org.postgresql</groupId>
    <scope>runtime</scope>
    <version>42.2.9</version>
</dependency>

<!-- Axon Kafka Extension -->
<dependency>
    <groupId>org.axonframework.extensions.kafka</groupId>
    <artifactId>axon-kafka-spring-boot-starter</artifactId>
    <version>${axon-kafka.version}</version>
</dependency>

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>${spring-kafka.version}</version>
</dependency>

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>${kafka.version}</version>
</dependency>

Thanks in advance.

Interesting issue you’re having here @marcy.
It’s a hunch, but I am guessing the issue stems from the following line you’ve shared:

All use Subscribing Consumer and Producers.

That, in conjunction with the TransactionRequiredException you are receiving feels like the culprit here. One way or another, Axon’s TransactionManager is likely not being used all over your setup. Typically I would anticipate this to be auto configured correctly, but the exception you are getting suggests otherwise.

I’d firstly check whether SubscribingEventProcessor used to provide events to your Saga actually has a working TransactionManager. Thus, that it’s not a NoTransactionManager.INSTANCE, as that implementation doesn’t do anything for you when it comes to transaction management. It is however the default in absence of any more specific implementation of the TransactionManager.

There is however no easier way to suggest how to do this, than by debugging your set up really.
In a Spring Boot environment I would by the way anticipate the TransactionManager to be of type SpringTransactionManager. On top of that if it’s not set to this type, I’d first recommend to add an @Autowired method which expects the EventProcessingConfigurer and TransactionManager, wherein you should invoke the EventProcessingConfigurer#registerTransactionManager(String, Function<Configuration, TransactionManager>).

The String parameter in there refers to the name of your processor, which for sagas defaults to [saga-class-name]Processor. You’ve however defined a @ProcessingGroup on your saga, so the name would reflect the name provided to that annotation.

Please give the above a try and let us know whether it resolved your issue!

Cheers,
Steven