How to properly handle errors in EventHandlers

Hey folks,

I have an event handler, which creates and/or updates entities using JPA (spring-boot-data-jpa to be precise). Now, it can happen, that Exceptions occur during event handling, which might be resolvable by retry (e.g. locking issues) so I wrote an ErrorHandler, that propagates certain exception, just like the PropagatingErrorHandler from Axon would do, and registered it for my EventProcessor. So far so good.

In the logs I can verify, that the Exceptions are logged as WARN only and retry seems to work. But in some cases after such an Exception, I am getting another Exception that is related to flushed Hibernate sessions:

2022-01-17 13:26:34.544 EventProcessor[myOwnProjection]-0 [ERROR] [AssertionFailure] - HHH000099: an assertion failure occurred (this may indicate a bug in Hibernate, but is more likely due to unsafe use of the session): org.hibernate.AssertionFailure: null id in my.own.ExampleEntity entry (don't flush the Session after an exception occurs)org.hibernate.AssertionFailure: null id in my.own.ExampleEntity entry (don't flush the Session after an exception occurs)
    at org.hibernate.event.internal.DefaultFlushEntityEventListener.checkId(DefaultFlushEntityEventListener.java:76)
    at org.hibernate.event.internal.DefaultFlushEntityEventListener.getValues(DefaultFlushEntityEventListener.java:203)
    at org.hibernate.event.internal.DefaultFlushEntityEventListener.onFlushEntity(DefaultFlushEntityEventListener.java:162)
    at org.hibernate.event.service.internal.EventListenerGroupImpl.fireEventOnEachListener(EventListenerGroupImpl.java:107)
    at org.hibernate.event.internal.AbstractFlushingEventListener.flushEntities(AbstractFlushingEventListener.java:229)
    at org.hibernate.event.internal.AbstractFlushingEventListener.flushEverythingToExecutions(AbstractFlushingEventListener.java:93)
    at org.hibernate.event.internal.DefaultAutoFlushEventListener.onAutoFlush(DefaultAutoFlushEventListener.java:50)
    at org.hibernate.event.service.internal.EventListenerGroupImpl.fireEventOnEachListener(EventListenerGroupImpl.java:107)
    at org.hibernate.internal.SessionImpl.autoFlushIfRequired(SessionImpl.java:1367)
    at org.hibernate.internal.SessionImpl.executeUpdate(SessionImpl.java:1471)
    at org.hibernate.query.internal.AbstractProducedQuery.doExecuteUpdate(AbstractProducedQuery.java:1714)
    at org.hibernate.query.internal.AbstractProducedQuery.executeUpdate(AbstractProducedQuery.java:1696)
    at org.axonframework.eventhandling.tokenstore.jpa.JpaTokenStore.extendClaim(JpaTokenStore.java:211)
    at org.axonframework.eventhandling.TrackingEventProcessor.lambda$null$0(TrackingEventProcessor.java:181)
    at org.axonframework.messaging.unitofwork.MessageProcessingContext.notifyHandlers(MessageProcessingContext.java:72)
    at org.axonframework.messaging.unitofwork.BatchingUnitOfWork.lambda$notifyHandlers$2(BatchingUnitOfWork.java:155)
    at java.base/java.util.ArrayList$Itr.forEachRemaining(ArrayList.java:1033)
    at org.axonframework.messaging.unitofwork.BatchingUnitOfWork.notifyHandlers(BatchingUnitOfWork.java:155)
    at org.axonframework.messaging.unitofwork.AbstractUnitOfWork.changePhase(AbstractUnitOfWork.java:222)
    at org.axonframework.messaging.unitofwork.AbstractUnitOfWork.commitAsRoot(AbstractUnitOfWork.java:83)
    at org.axonframework.messaging.unitofwork.AbstractUnitOfWork.commit(AbstractUnitOfWork.java:71)
    at org.axonframework.messaging.unitofwork.BatchingUnitOfWork.executeWithResult(BatchingUnitOfWork.java:111)
    at org.axonframework.eventhandling.AbstractEventProcessor.processInUnitOfWork(AbstractEventProcessor.java:159)
    at org.axonframework.eventhandling.TrackingEventProcessor.processBatch(TrackingEventProcessor.java:462)
    at org.axonframework.eventhandling.TrackingEventProcessor.processingLoop(TrackingEventProcessor.java:294)
    at org.axonframework.eventhandling.TrackingEventProcessor$TrackingSegmentWorker.run(TrackingEventProcessor.java:1016)
    at org.axonframework.eventhandling.TrackingEventProcessor$WorkerLauncher.run(TrackingEventProcessor.java:1162)
    at java.base/java.lang.Thread.run(Thread.java:829)

It’s just a rough guess, but could it happen that a RuntimeException occurs during event processing, which marks the transaction as rollback-only (you know, because it’s Spring), but Axon still tries to flush at some point during its UnitOfWork handling?

I am pretty clueless at this point, so any input is appreciated.

Cheers,
Stefan

Hi Stefan,

when your exception handlers catches an exception, then the Event Processor will assume handling “succeeded”. As a result, it will commit the Unit of Work. However, whenever an exception occurs while interacting with the Entity Manager, Hibernate automatically marks it as “rollback only”. Such exceptions should preferably always be propagates to the Event Processor, so that it triggers a retry on that side.

I hope that clarifies things a little bit. If not, could you share how your Error Handler propagates the exceptions?

Hey Allard,

sure, my ErrorHandler looks like this:

class ConfigurablePropagatingErrorHandler(
    private vararg val propagatedExceptionTypes: KClass<out Exception>
) : LoggingErrorHandler() {

    companion object : KLogging()

    override fun onError(exception: Exception, event: EventMessage<*>, eventHandler: EventMessageHandler) {
        if (propagatedExceptionTypes.any { it.qualifiedName == exception.getRootCauseClassName() }) {
            logger.debug { "Propagating exception of type [${exception.getRootCauseClassName()}] during processing of event [${event.identifier}]" }
            throw exception
        } else {
            super.onError(exception, event, eventHandler)
        }
    }
}

fun Exception.getRootCauseClassName() = NestedExceptionUtils.getMostSpecificCause(this)::class.qualifiedName

it is registered like that:

    @Autowired
    fun configureProcessingGroupErrorHandling(processingConfigurer: EventProcessingConfigurer) {
        processingConfigurer.registerListenerInvocationErrorHandler(MyOwnProjection.PROCESSING_GROUP) {
            ConfigurablePropagatingErrorHandler(
                // TODO add Exception types to be propagated here:
                CannotAcquireLockException::class,
                DataIntegrityViolationException::class
            )
        }
    }

So it propagates these two Exception types. All Exceptions I can see being logged (apart from the above mentioned AssertionFailure) are of those types and logged as WARN, so I assume that the AssertionFailure happen after one of the Exception was propagated.

Thanks!
Stefan

You probably also need to propagate the javax.persistence.PersistenceException. When an EntityManager throws this exception from an interaction, it will rollback the current transaction.

More specifically:

All instances of PersistenceException except for instances of NoResultException, NonUniqueResultException, LockTimeoutException, and QueryTimeoutException will cause the current transaction, if one is active and the persistence context has been joined to it, to be marked for rollback.

Note that the second exception you mention, which only occurs “in some cases”, is most likely caused by a mechanism in Axon that extends a claim within the same transaction if processing took longer than half the claim timeout. It does that, to prevent “claim stealing”. The exception, in this case, is harmless, as it will force the processor to go into error mode.
Not catching the exceptions that caused the EntityManager to go into rollback mode will prevent these exceptions.

Thanks Allard, I will give it a try and report back if it solved it.

Hi Allard,

I have tried to propagate all PersistenceExceptions and the errors are gone now. However I get a lot of warnings, because exceptions are now propagated and events are scheduled for retry. That’s kind of expected. But how does retry work, by the way?

Because now I have events missing in my projection, that are in the event store. I cannot (yet) prove that those events are missing, that were scheduled for retry, but it would be too much of a coincidence.

Cheers,
Stefan

Hi Stefan,

when a processor receives an exception while processing a batch of events, it will release the token from the TokenStore and schedule a retry after a few moments.

If you have events that have been skipped make sure that if you have a TransactionManager configured, the TokenStore participates in this transaction. Otherwise, the transaction is rolled back, but the token might still have advanced in the meantime.