Hello everyone,
recently i watched a video by @Steven_van_Beelen that he implemented dead-letter-queue, and i tried to follow the exact scenario with him with one difference: command and query components are in different services! so the solution that steven provided in the video didn’t work for me.
so for more clarification i want to tell my story
i read before if you want implement cqrs in ideal way you should separate command and query services to two different services on different ports so you have scaling option in the future, so i decided to do this:
I have core-api that have common components like dtos and events like CreateCustomerEvent
and another service called customer-command-service with these components:
CreateCustomerCommand class
CustomerCommandController that receives commands and send them to the commandGateway.
CustomerAggregate class that have CommandHandlers and EventSourcingHandlers so the commands catched here and CreateCustomerEvent publishes to capture by query service.
and i have customer-query-service with these components:
CustomerQueryController class, in this class i recieve quey requests and send the to the queryGateway.
CustomerProjection with methods annotated with @QueryHandler
and most important component i have AxonConfig:
@Configuration
public class AxonConfig {
@Bean
public TokenStore myTokenStore(Serializer serializer) {
return JpaTokenStore.builder()
.entityManagerProvider(primaryEntityManagerProvider())
.serializer(serializer)
.loadingLockMode(LockModeType.NONE)
.build();
}
@Bean
public EntityManagerProvider primaryEntityManagerProvider() {
return new ContainerManagedEntityManagerProvider();
}
@Bean
public ConfigurerModule deadLetterQueueConfigurerModule() {
return configurer -> configurer.eventProcessing().registerDeadLetterQueue(
"customer",
config -> JpaSequencedDeadLetterQueue.builder()
.processingGroup("customer")
.maxSequences(256)
.maxSequenceSize(256)
.entityManagerProvider(config.getComponent(EntityManagerProvider.class))
.transactionManager(config.getComponent(TransactionManager.class))
.serializer(config.serializer())
.build()
);
}
@Bean
public ConfigurerModule enqueuePolicyConfigurerModule() {
return configurer -> configurer.eventProcessing()
.registerDefaultDeadLetterPolicy(
config -> new RetryConstrainedEnqueuePolicy()
);
}
}
and
public class RetryConstrainedEnqueuePolicy implements EnqueuePolicy<EventMessage<?>> {
@Override
public EnqueueDecision<EventMessage<?>> decide(DeadLetter<? extends EventMessage<?>> letter, Throwable cause) {
return Decisions.evict();
}
}
i just want that evict the event if anything happens(just for testing)
the scenario that i want to follow is: when any exception happens in the query side i want to handle those exceptions and put them in dead-letter-queue.
the strange thing is the enqueue policy executes and the evict() method calles but first of all dead_letter_queue_entrly table does not changes and no record added to it (shouldn’t it?) and second of all the event reschedules again and again, so i think the dead-letter queueing not working correctly and completely right?
what i did wrong?
thanks for your patience and your time and i’m really sorry for my lack of english skill.
anyway this is GITHUB repo if you want to know what i done: GitHub - hamedABS/customer-service