Problems with using JPAEventStoreEngine and JPATokenStore in container environment

Hi Folks,

I’m curiuos if someone has exepriences with AxonFramework inside of a JEE Container. Independent from the kind of configuration (with some additional CDI magic, or using the configurer), I’m not getting it working.
Especially, my problem seems to be the combination of EntityManager and TransactionManager by using event-sourced aggregates.

If I start the application, the event tracking processor begins to scan the DomainEventEntry table and update the TokenEntry table setting the timestamp periodically - this part works fine.

If I try to send a command, the operation fails with EventStoreException caused by TransactionRequiredException (thrown from JpaEventStorageEngine#appendEvents method).

I looked at the code and wonder if the TrackingEventProcessor Thread is somehow synchronized with the EmbeddedEventStoreThread if the latter is appending events. The relevant piece of code is:

`

protected void appendEvents(List<? extends EventMessage<?>> events, Serializer serializer) {
if (events.isEmpty()) {
return;
}
try {
events.stream().map(event -> createEventEntity(event, serializer)).forEach(entityManager()::persist);
if (explicitFlush) {
entityManager().flush();
}
} catch (Exception e) {
handlePersistenceException(e, events.get(0));
}
}

`
(from https://github.com/AxonFramework/AxonFramework/blob/master/core/src/main/java/org/axonframework/eventsourcing/eventstore/jpa/JpaEventStorageEngine.java#L231)

As far as I understand, it is relying on the fact, that the Entity Manager will open the transaction itself and is not relying on TransactionManager passed into the JpaEventStoreEngine.
Is there a reason why it is doing so? Most other methods in JpaEventStoreEngine rely on TransactionManager executing their queries in “fetchInTransaction” block.

I suppose, this is exactly the reason, why the code doesn’t work for me. Since I’m passing the EntityManager to an Axon thread, the container has no idea how to handle a JTA transaction on execution of arbitary code on EntityManager (Axon code is neither EJB, nor Transactional CDI).

Some details:

  • I’m using tracking event processors

  • I’m using provided JPAEventStoreEngine and JPATokenStore

  • I’m injecting an EntityManager and wrap it by passing to the configurerd components using provided SimpleEntityManagerProvider. In doing so I make sure that exactly one EntityManager is passed around.

Here is the piece of configuration for this:

`

@PersistenceContext
private EntityManager entityManager;

@Produces

public Configurer configurer() {
final EntityManagerProvider emp = getEntityManagerProvider();

return DefaultConfigurer.defaultConfiguration()
.registerComponent(EntityManagerProvider.class, c -> emp)
.registerComponent(TokenStore.class, c -> new JpaTokenStore(c.getComponent(EntityManagerProvider.class, () -> emp), c.serializer()))
.configureEmbeddedEventStore(c -> new JpaEventStorageEngine(c.getComponent(EntityManagerProvider.class, () -> emp), c.getComponent(TransactionManager.class)))
.registerComponent(SagaStore.class, c -> new JpaSagaStore(c.getComponent(EntityManagerProvider.class, () -> emp)));

}

public EntityManagerProvider getEntityManagerProvider() {
return new SimpleEntityManagerProvider(entityManager);
}

@Produces
public TransactionManager txManager() {
return new ContainerTransactionManager(entityManager, JBOSS_USER_TRANSACTION);
}

@Produces
public EventHandlingConfiguration eventHandlingConfiguration() {
final EventHandlingConfiguration eventHandlingConfiguration = new EventHandlingConfiguration();
eventHandlingConfiguration.usingTrackingProcessors();
return eventHandlingConfiguration;
}

`

  • I’ve implemented a Container-based Transaction Manager, which works with provided EntityManager and retrieves the JTA UserTransaction from the Container via JNDI lookup

`

import javax.naming.InitialContext;
import javax.naming.NamingException;
import javax.persistence.EntityManager;
import javax.transaction.HeuristicMixedException;
import javax.transaction.HeuristicRollbackException;
import javax.transaction.NotSupportedException;
import javax.transaction.RollbackException;
import javax.transaction.Status;
import javax.transaction.SystemException;
import javax.transaction.UserTransaction;

import org.axonframework.common.transaction.Transaction;

@Slf4j
public class ContainerTransactionManager implements TransactionManager {

private final EntityManager entityManager;
private final String userTransactionJndiName;

public ContainerTransactionManager(final EntityManager entityManager, final String userTransactionJndiName) {
this.entityManager = entityManager;
this.userTransactionJndiName = userTransactionJndiName;
}

@Override
public Transaction startTransaction() {

Transaction startedTransaction = LoggingTransactionManager.EMPTY_FAKE;
try {
final UserTransaction transaction = (UserTransaction) new InitialContext().lookup(this.userTransactionJndiName);

if (transaction == null) {
return startedTransaction;
}

if (transaction.getStatus() != Status.STATUS_ACTIVE) {
transaction.begin();
}

if (!this.entityManager.isJoinedToTransaction()) {
this.entityManager.joinTransaction();
}

startedTransaction = new Transaction() {

@Override
public void commit() {

try {
switch (transaction.getStatus()) {
case Status.STATUS_ACTIVE:
transaction.commit();
break;
case Status.STATUS_MARKED_ROLLBACK:
rollback();
break;

default:
break;
}
} catch (final IllegalStateException | SystemException | SecurityException | RollbackException | HeuristicMixedException

HeuristicRollbackException e) {
log.error(“Error commiting TX”, e);
}
}

@Override
public void rollback() {

try {
switch (transaction.getStatus()) {
case Status.STATUS_ACTIVE:
case Status.STATUS_MARKED_ROLLBACK:
transaction.rollback();
break;
default:
break;
}
} catch (final IllegalStateException | SystemException | SecurityException e) {
log.error(“Error rollbacking TX”, e);
}
}
};

} catch (final NotSupportedException | SystemException | NamingException e) {
log.error(“Error getting a TX”, e);
}

return startedTransaction;
}
}

`

Any comments are very appreciated. I think I got stuck at this point and need some help.

Kind regards,

Simon

By the way, I just fixed the problem by subclassing the JpaEventStorageEngine, overwriting the methods and putting them inside of transactional execution:

`

public class TxAwareJPAEventStorageEngine extends JpaEventStorageEngine {

private final TransactionManager transactionManager;

public TxAwareJPAEventStorageEngine(final EntityManagerProvider entityManagerProvider, final TransactionManager transactionManager) {
super(entityManagerProvider, transactionManager);
this.transactionManager = transactionManager;
}

public TxAwareJPAEventStorageEngine(final Serializer serializer, final EventUpcaster upcasterChain, final DataSource dataSource,
final EntityManagerProvider entityManagerProvider, final TransactionManager transactionManager) throws SQLException {
super(serializer, upcasterChain, dataSource, entityManagerProvider, transactionManager);
this.transactionManager = transactionManager;
}

public TxAwareJPAEventStorageEngine(final Serializer serializer, final EventUpcaster upcasterChain,
final PersistenceExceptionResolver persistenceExceptionResolver, final Integer batchSize, final EntityManagerProvider entityManagerProvider,
final TransactionManager transactionManager, final Long lowestGlobalSequence, final Integer maxGapOffset, final boolean explicitFlush) {
super(serializer, upcasterChain, persistenceExceptionResolver, batchSize, entityManagerProvider, transactionManager, lowestGlobalSequence, maxGapOffset,
explicitFlush);
this.transactionManager = transactionManager;
}

@Override
protected void appendEvents(final List<? extends EventMessage<?>> events, final Serializer serializer) {
if (events.isEmpty()) {
return;
}
transactionManager.executeInTransaction(() -> super.appendEvents(events, serializer));
}

@Override
protected void storeSnapshot(final DomainEventMessage<?> snapshot, final Serializer serializer) {
transactionManager.executeInTransaction(() -> super.storeSnapshot(snapshot, serializer));
}

`

So the question still remains, why you decided not to implement it like this from the beginning?

Am I overseeing something serious?

Kind regards,

Simon

Hi,

the Event Store relies on the fact that the Command Handling component has already started a transaction. The issue should be resolved by defining a TransactionManagingInterceptor on your CommandBus.

Cheers,

Allard