Hello,
I’m trying to implement exception handlers for events so that when an exception occurs at the level of the event handler, it will retry a specific number of times before inserting the event into a dead letter entry.
my axon configuration:
@Configuration
public class AxonConfig {
@Value("${axon.terminal.processing.group.name}")
private String axonProcessingGroupName;
@Value("${axon.event-handler.retry}")
private int numberOfRetries;
/**
* Bean defined to configure tms terminal event handler processing group
* Dead letter invoked to handle event handler exceptions
* Dead letter behavior dependent on RetryConstrainedEnqueuePolicy class definition
* RetryConstrainedEnqueuePolicy is assigned to the default dead letter policy
*
* @return
*/
@Bean
public ConfigurerModule deadLetterConfigurerModule() {
return configurer -> configurer.eventProcessing()
.byDefaultAssignTo(axonProcessingGroupName)
.registerDeadLetterQueue(
axonProcessingGroupName,
config -> JpaSequencedDeadLetterQueue.builder()
.processingGroup(axonProcessingGroupName)
.entityManagerProvider(config.getComponent(EntityManagerProvider.class))
.transactionManager(config.getComponent(TransactionManager.class))
.serializer(config.serializer())
.build()
)
.registerDefaultDeadLetterPolicy(
conf -> retryConstrainedEnqueuePolicy()
);
}
@Bean
public RetryConstrainedEnqueuePolicy retryConstrainedEnqueuePolicy() {
return new RetryConstrainedEnqueuePolicy(numberOfRetries);
}
}
the RetryConstrainedEnqueuePolicy
public class RetryConstrainedEnqueuePolicy implements EnqueuePolicy<EventMessage<?>> {
private final int numberOfRetries;
public RetryConstrainedEnqueuePolicy(int numberOfRetries) {
this.numberOfRetries = numberOfRetries;
}
/**
* Method implemented to add custom number of retries constraint on event handler
* An event handler and in case of exception will keep trying replay of the event endlessly
* custom invocation of event exception handler is implemented
*
* @param letter The {@link DeadLetter dead letter} implementation to make a decision on.
* @param cause The {@link Throwable} causing the given {@code letter} to be decided on.
* @return
*/
@Override
public EnqueueDecision<EventMessage<?>> decide(DeadLetter<? extends EventMessage<?>> letter, Throwable cause) {
// get the number of existing retires or else initialize to zero
int retries = (int) letter.diagnostics().getOrDefault("retries", 0);
if (retries == 0) {
// in this case we need to enqueue number of retries
return Decisions.enqueue(cause, l -> MetaData.with("retries", 0));
} else if (retries > numberOfRetries) {
// we don't need to store any more, exit the entry
return Decisions.evict();
}
// otherwise requeue the decisions
return Decisions.requeue(cause, l -> l.diagnostics().and("retries", retries + 1));
}
}
the propagation or event handler:
@ProcessingGroup("tmsTerminals")
@Component
@Slf4j
@Transactional(value = Transactional.TxType.REQUIRES_NEW)
public class TerminalEventHandler {
private TerminalService terminalService;
public TerminalEventHandler(TerminalService terminalService) {
this.terminalService = terminalService;
}
@EventHandler
public void on(TmsTerminalSavedEvent tmsTerminalSavedEvent) throws CustomServiceException {
log.debug("saving new terminal with terminal id: {}", tmsTerminalSavedEvent.getTerminalId());
TerminalRequestDTO terminalRequestDTO = new TerminalRequestDTO();
BeanUtils.copyProperties(tmsTerminalSavedEvent, terminalRequestDTO, "id");
terminalService.saveNew(terminalRequestDTO, tmsTerminalSavedEvent.getFiId());
}
}
on save new in case of any exception it is inserting directly to dead letter entry, instead of retrying a specific number of times before the insert.
I’m new to Axon and most of the configuration code is based on Axon IQ tuts so I think I might have wrongly configured something.
Thanks for any help in advance.