Implement dead letter queue

Hello everyone,
recently i watched a video by @Steven_van_Beelen that he implemented dead-letter-queue, and i tried to follow the exact scenario with him with one difference: command and query components are in different services! so the solution that steven provided in the video didn’t work for me.

so for more clarification i want to tell my story :slight_smile:
i read before if you want implement cqrs in ideal way you should separate command and query services to two different services on different ports so you have scaling option in the future, so i decided to do this:

I have core-api that have common components like dtos and events like CreateCustomerEvent

and another service called customer-command-service with these components:
CreateCustomerCommand class
CustomerCommandController that receives commands and send them to the commandGateway.
CustomerAggregate class that have CommandHandlers and EventSourcingHandlers so the commands catched here and CreateCustomerEvent publishes to capture by query service.

and i have customer-query-service with these components:
CustomerQueryController class, in this class i recieve quey requests and send the to the queryGateway.
CustomerProjection with methods annotated with @QueryHandler
and most important component i have AxonConfig:

@Configuration
public class AxonConfig {

    @Bean
    public TokenStore myTokenStore(Serializer serializer) {
        return JpaTokenStore.builder()
                .entityManagerProvider(primaryEntityManagerProvider())
                .serializer(serializer)
                .loadingLockMode(LockModeType.NONE)
                .build();
    }


    @Bean
    public EntityManagerProvider primaryEntityManagerProvider() {
        return new ContainerManagedEntityManagerProvider();
    }
    

    @Bean
    public ConfigurerModule deadLetterQueueConfigurerModule() {
        return configurer -> configurer.eventProcessing().registerDeadLetterQueue(
                "customer",
                config -> JpaSequencedDeadLetterQueue.builder()
                        .processingGroup("customer")
                        .maxSequences(256)
                        .maxSequenceSize(256)
                        .entityManagerProvider(config.getComponent(EntityManagerProvider.class))
                        .transactionManager(config.getComponent(TransactionManager.class))
                        .serializer(config.serializer())
                        .build()
        );
    }

    @Bean
    public ConfigurerModule enqueuePolicyConfigurerModule() {
        return configurer -> configurer.eventProcessing()
                .registerDefaultDeadLetterPolicy(
                        config -> new RetryConstrainedEnqueuePolicy()
                );
    }

}

and

public class RetryConstrainedEnqueuePolicy implements EnqueuePolicy<EventMessage<?>> {
    @Override
    public EnqueueDecision<EventMessage<?>> decide(DeadLetter<? extends EventMessage<?>> letter, Throwable cause) {
        return Decisions.evict();
    }
}

:point_up: i just want that evict the event if anything happens(just for testing)

the scenario that i want to follow is: when any exception happens in the query side i want to handle those exceptions and put them in dead-letter-queue.
the strange thing is the enqueue policy executes and the evict() method calles but first of all dead_letter_queue_entrly table does not changes and no record added to it (shouldn’t it?) and second of all the event reschedules again and again, so i think the dead-letter queueing not working correctly and completely right?
what i did wrong?

thanks for your patience and your time and i’m really sorry for my lack of english skill.
anyway this is GITHUB repo if you want to know what i done: GitHub - hamedABS/customer-service

The way you implement the policy, the events are evicted right away. Effectively disabling the dead letter queue. The policy is also use for the first time it might be put into the queue. This way you can prevent puting events in the queue which will always fail. For example when there is a nullpointer exception.

So likely the best way forward is just to remove the custom policy, and make sure the events are put into the queue.

I’m not sure way you think your use case is different. The dead letter queue typically is only used at the query/projection side. Where an error updating the projection would put the event (and any following event with the same sequencing id).

1 Like

the reason that i evict everything is that i want ignore everything in the first faze just for test that my dead letter queue works fine(i think with this way if any exception occures the event putted in the DLQ and i can send another command and publish another createEvent so on. is it correct?? )

in the future i need this Enqueueing Policy for grouping exeptions and decide between different scenarios.

Yes, I understand. But effectively you are not using the dead letter queue now. I’m not sure why you mention command. The event processor just continues to read events, and any event that will create an exception, no matter where it originally came from, might and up in de queue. But instead with your current policy, it will just be ignored instead.

1 Like

OK, i removed the custom policy and it does not work.(DLQ table has no record)
also i tried to use enqueue() and requeue() methods in custom enqueue policy and none of them worked.

ps: i’m using legacyjpa packages

It might be because one of the event handlers has the @Transactional annotation. The transactions are already manages from the transaction manager. I just took a quick look, it might also be something else. It shouldn’t matter you use the deprecated classes.

1 Like

i removed it and it does not worked either,
also i upgraded the spring boot to 3.1.5 and not worked again.

Spring Boot 3 also requires at least to use the Jakarta versions instead of the legacy javax components. I might have time tomorrow to run the project and try to get the DLQ to work.

yes exactly :ok_hand: i was aware of it. i used jakarata instead of javax everywhere.
‌Big Thanks for doing this for me.

i made a series of changes and a little progress was made.
the enqueue() method works fine and JpaSequencedDeadLetterQueue wants to persist DLQ in db but it can’t. because of:

org.hibernate.AssertionFailure: null id in ir.negah.bank.domain.CustomerEntity entry (don't flush the Session after an exception occurs)
	at org.hibernate.event.internal.DefaultFlushEntityEventListener.checkId(DefaultFlushEntityEventListener.java:81) ~[hibernate-core-6.2.13.Final.jar:6.2.13.Final]
	at org.hibernate.event.internal.DefaultFlushEntityEventListener.getValues(DefaultFlushEntityEventListener.java:175) ~[hibernate-core-6.2.13.Final.jar:6.2.13.Final]
	at org.hibernate.event.internal.DefaultFlushEntityEventListener.onFlushEntity(DefaultFlushEntityEventListener.java:134) ~[hibernate-core-6.2.13.Final.jar:6.2.13.Final]
	at org.hibernate.event.service.internal.EventListenerGroupImpl.fireEventOnEachListener(EventListenerGroupImpl.java:127) ~[hibernate-core-6.2.13.Final.jar:6.2.13.Final]
	at org.hibernate.event.internal.AbstractFlushingEventListener.flushEntities(AbstractFlushingEventListener.java:227) ~[hibernate-core-6.2.13.Final.jar:6.2.13.Final]
	at org.hibernate.event.internal.AbstractFlushingEventListener.flushEverythingToExecutions(AbstractFlushingEventListener.java:90) ~[hibernate-core-6.2.13.Final.jar:6.2.13.Final]
	at org.hibernate.event.internal.DefaultAutoFlushEventListener.onAutoFlush(DefaultAutoFlushEventListener.java:48) ~[hibernate-core-6.2.13.Final.jar:6.2.13.Final]
	at org.hibernate.event.service.internal.EventListenerGroupImpl.fireEventOnEachListener(EventListenerGroupImpl.java:127) ~[hibernate-core-6.2.13.Final.jar:6.2.13.Final]
	at org.hibernate.internal.SessionImpl.autoFlushIfRequired(SessionImpl.java:1375) ~[hibernate-core-6.2.13.Final.jar:6.2.13.Final]
	at org.hibernate.query.sqm.internal.ConcreteSqmSelectQueryPlan.lambda$new$0(ConcreteSqmSelectQueryPlan.java:108) ~[hibernate-core-6.2.13.Final.jar:6.2.13.Final]
	at org.hibernate.query.sqm.internal.ConcreteSqmSelectQueryPlan.withCacheableSqmInterpretation(ConcreteSqmSelectQueryPlan.java:303) ~[hibernate-core-6.2.13.Final.jar:6.2.13.Final]
	at org.hibernate.query.sqm.internal.ConcreteSqmSelectQueryPlan.performList(ConcreteSqmSelectQueryPlan.java:244) ~[hibernate-core-6.2.13.Final.jar:6.2.13.Final]
	at org.hibernate.query.sqm.internal.QuerySqmImpl.doList(QuerySqmImpl.java:518) ~[hibernate-core-6.2.13.Final.jar:6.2.13.Final]
	at org.hibernate.query.spi.AbstractSelectionQuery.list(AbstractSelectionQuery.java:367) ~[hibernate-core-6.2.13.Final.jar:6.2.13.Final]
	at org.hibernate.query.spi.AbstractSelectionQuery.getSingleResult(AbstractSelectionQuery.java:473) ~[hibernate-core-6.2.13.Final.jar:6.2.13.Final]
	at org.axonframework.eventhandling.deadletter.jpa.JpaSequencedDeadLetterQueue.lambda$sequenceSize$14(JpaSequencedDeadLetterQueue.java:509) ~[axon-messaging-4.9.0.jar:4.9.0]
	at org.axonframework.common.transaction.TransactionManager.fetchInTransaction(TransactionManager.java:70) ~[axon-messaging-4.9.0.jar:4.9.0]
	at org.axonframework.eventhandling.deadletter.jpa.JpaSequencedDeadLetterQueue.sequenceSize(JpaSequencedDeadLetterQueue.java:500) ~[axon-messaging-4.9.0.jar:4.9.0]
	at org.axonframework.eventhandling.deadletter.jpa.JpaSequencedDeadLetterQueue.isFull(JpaSequencedDeadLetterQueue.java:318) ~[axon-messaging-4.9.0.jar:4.9.0]
	at org.axonframework.eventhandling.deadletter.jpa.JpaSequencedDeadLetterQueue.enqueue(JpaSequencedDeadLetterQueue.java:132) ~[axon-messaging-4.9.0.jar:4.9.0]
	at org.axonframework.eventhandling.deadletter.DeadLetteringEventHandlerInvoker.invokeHandlers(DeadLetteringEventHandlerInvoker.java:181) ~[axon-messaging-4.9.0.jar:4.9.0]
	at org.axonframework.eventhandling.deadletter.DeadLetteringEventHandlerInvoker.handle(DeadLetteringEventHandlerInvoker.java:146) ~[axon-messaging-4.9.0.jar:4.9.0]
	at org.axonframework.eventhandling.MultiEventHandlerInvoker.handle(MultiEventHandlerInvoker.java:91) ~[axon-messaging-4.9.0.jar:4.9.0]
	at org.axonframework.eventhandling.AbstractEventProcessor.processMessageInUnitOfWork(AbstractEventProcessor.java:195) ~[axon-messaging-4.9.0.jar:4.9.0]
	at org.axonframework.eventhandling.AbstractEventProcessor.lambda$null$1(AbstractEventProcessor.java:173) ~[axon-messaging-4.9.0.jar:4.9.0]
	at org.axonframework.messaging.DefaultInterceptorChain.proceed(DefaultInterceptorChain.java:57) ~[axon-messaging-4.9.0.jar:4.9.0]
	at org.axonframework.messaging.interceptors.CorrelationDataInterceptor.handle(CorrelationDataInterceptor.java:67) ~[axon-messaging-4.9.0.jar:4.9.0]
	at org.axonframework.messaging.DefaultInterceptorChain.proceed(DefaultInterceptorChain.java:55) ~[axon-messaging-4.9.0.jar:4.9.0]
	at org.axonframework.eventhandling.AbstractEventProcessor.lambda$null$2(AbstractEventProcessor.java:174) ~[axon-messaging-4.9.0.jar:4.9.0]
	at org.axonframework.tracing.Span.runCallable(Span.java:132) ~[axon-messaging-4.9.0.jar:4.9.0]
	at org.axonframework.eventhandling.AbstractEventProcessor.lambda$null$3(AbstractEventProcessor.java:170) ~[axon-messaging-4.9.0.jar:4.9.0]
	at org.axonframework.messaging.unitofwork.BatchingUnitOfWork.executeWithResult(BatchingUnitOfWork.java:92) ~[axon-messaging-4.9.0.jar:4.9.0]
	at org.axonframework.eventhandling.AbstractEventProcessor.lambda$processInUnitOfWork$4(AbstractEventProcessor.java:166) ~[axon-messaging-4.9.0.jar:4.9.0]
	at org.axonframework.tracing.Span.runCallable(Span.java:132) ~[axon-messaging-4.9.0.jar:4.9.0]
	at org.axonframework.eventhandling.AbstractEventProcessor.processInUnitOfWork(AbstractEventProcessor.java:165) ~[axon-messaging-4.9.0.jar:4.9.0]
	at org.axonframework.eventhandling.pooled.WorkPackage.processEvents(WorkPackage.java:320) ~[axon-messaging-4.9.0.jar:4.9.0]
	at org.axonframework.eventhandling.pooled.WorkPackage.lambda$scheduleWorker$5(WorkPackage.java:281) ~[axon-messaging-4.9.0.jar:4.9.0]
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[na:na]
	at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264) ~[na:na]
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java) ~[na:na]
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[na:na]
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[na:na]
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[na:na]
	at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]

the event is CreateEvent and the customer didn’t persisted to the db so it’s normal that the id of CustomerEntity be null, so whats the problem?

I’m not sure about the error. The cause seems to be from getting the sequence size which would use the aggregate id from the event. But that’s not related to the CustomerEntity.

it’s not about sequence size. i tried to set it to 256 but it didn’t help

Not the configuration of the maximum size itself, but when it does the query for the current size. It’s almost as the two tables are mixed up. If any error should pop up it’s about a DeadLetterEntry.

The Problem was in managing transactions by hibernate, this is the common problem in hibernate.
so for solving this problem:

org.hibernate.AssertionFailure: null id in entry (don't flush the Session after an exception occurs)

i put:

@Transactional(value = Transactional.TxType.REQUIRES_NEW)

on top of my EventHandler so everything works fine :smiling_face_with_tear: :sunglasses:

thank you @Gerard for your help.