Hi Folks,
I’m curiuos if someone has exepriences with AxonFramework inside of a JEE Container. Independent from the kind of configuration (with some additional CDI magic, or using the configurer), I’m not getting it working.
Especially, my problem seems to be the combination of EntityManager and TransactionManager by using event-sourced aggregates.
If I start the application, the event tracking processor begins to scan the DomainEventEntry table and update the TokenEntry table setting the timestamp periodically - this part works fine.
If I try to send a command, the operation fails with EventStoreException caused by TransactionRequiredException (thrown from JpaEventStorageEngine#appendEvents method).
I looked at the code and wonder if the TrackingEventProcessor Thread is somehow synchronized with the EmbeddedEventStoreThread if the latter is appending events. The relevant piece of code is:
`
protected void appendEvents(List<? extends EventMessage<?>> events, Serializer serializer) {
if (events.isEmpty()) {
return;
}
try {
events.stream().map(event → createEventEntity(event, serializer)).forEach(entityManager()::persist);
if (explicitFlush) {
entityManager().flush();
}
} catch (Exception e) {
handlePersistenceException(e, events.get(0));
}
}
As far as I understand, it is relying on the fact, that the Entity Manager will open the transaction itself and is not relying on TransactionManager passed into the JpaEventStoreEngine.
Is there a reason why it is doing so? Most other methods in JpaEventStoreEngine rely on TransactionManager executing their queries in “fetchInTransaction” block.
I suppose, this is exactly the reason, why the code doesn’t work for me. Since I’m passing the EntityManager to an Axon thread, the container has no idea how to handle a JTA transaction on execution of arbitary code on EntityManager (Axon code is neither EJB, nor Transactional CDI).
Some details:
-
I’m using tracking event processors
-
I’m using provided JPAEventStoreEngine and JPATokenStore
-
I’m injecting an EntityManager and wrap it by passing to the configurerd components using provided SimpleEntityManagerProvider. In doing so I make sure that exactly one EntityManager is passed around.
Here is the piece of configuration for this:
`
@PersistenceContext
private EntityManager entityManager;
@Produces
public Configurer configurer() {
final EntityManagerProvider emp = getEntityManagerProvider();
return DefaultConfigurer.defaultConfiguration()
.registerComponent(EntityManagerProvider.class, c → emp)
.registerComponent(TokenStore.class, c → new JpaTokenStore(c.getComponent(EntityManagerProvider.class, () → emp), c.serializer()))
.configureEmbeddedEventStore(c → new JpaEventStorageEngine(c.getComponent(EntityManagerProvider.class, () → emp), c.getComponent(TransactionManager.class)))
.registerComponent(SagaStore.class, c → new JpaSagaStore(c.getComponent(EntityManagerProvider.class, () → emp)));
}
public EntityManagerProvider getEntityManagerProvider() {
return new SimpleEntityManagerProvider(entityManager);
}
@Produces
public TransactionManager txManager() {
return new ContainerTransactionManager(entityManager, JBOSS_USER_TRANSACTION);
}
@Produces
public EventHandlingConfiguration eventHandlingConfiguration() {
final EventHandlingConfiguration eventHandlingConfiguration = new EventHandlingConfiguration();
eventHandlingConfiguration.usingTrackingProcessors();
return eventHandlingConfiguration;
}
`
- I’ve implemented a Container-based Transaction Manager, which works with provided EntityManager and retrieves the JTA UserTransaction from the Container via JNDI lookup
`
import javax.naming.InitialContext;
import javax.naming.NamingException;
import javax.persistence.EntityManager;
import javax.transaction.HeuristicMixedException;
import javax.transaction.HeuristicRollbackException;
import javax.transaction.NotSupportedException;
import javax.transaction.RollbackException;
import javax.transaction.Status;
import javax.transaction.SystemException;
import javax.transaction.UserTransaction;
import org.axonframework.common.transaction.Transaction;
@Slf4j
public class ContainerTransactionManager implements TransactionManager {
private final EntityManager entityManager;
private final String userTransactionJndiName;
public ContainerTransactionManager(final EntityManager entityManager, final String userTransactionJndiName) {
this.entityManager = entityManager;
this.userTransactionJndiName = userTransactionJndiName;
}
@Override
public Transaction startTransaction() {
Transaction startedTransaction = LoggingTransactionManager.EMPTY_FAKE;
try {
final UserTransaction transaction = (UserTransaction) new InitialContext().lookup(this.userTransactionJndiName);
if (transaction == null) {
return startedTransaction;
}
if (transaction.getStatus() != Status.STATUS_ACTIVE) {
transaction.begin();
}
if (!this.entityManager.isJoinedToTransaction()) {
this.entityManager.joinTransaction();
}
startedTransaction = new Transaction() {
@Override
public void commit() {
try {
switch (transaction.getStatus()) {
case Status.STATUS_ACTIVE:
transaction.commit();
break;
case Status.STATUS_MARKED_ROLLBACK:
rollback();
break;
default:
break;
}
} catch (final IllegalStateException | SystemException | SecurityException | RollbackException | HeuristicMixedException
HeuristicRollbackException e) {
log.error(“Error commiting TX”, e);
}
}
@Override
public void rollback() {
try {
switch (transaction.getStatus()) {
case Status.STATUS_ACTIVE:
case Status.STATUS_MARKED_ROLLBACK:
transaction.rollback();
break;
default:
break;
}
} catch (final IllegalStateException | SystemException | SecurityException e) {
log.error(“Error rollbacking TX”, e);
}
}
};
} catch (final NotSupportedException | SystemException | NamingException e) {
log.error(“Error getting a TX”, e);
}
return startedTransaction;
}
}
`
Any comments are very appreciated. I think I got stuck at this point and need some help.
Kind regards,
Simon