DQL for all ProcessingGroups?

Hey folks,

I am looking at the new DLQ feature in Axon 4.6 and I was wondering whether it might be a good idea to configure a DLQ per default for all processing groups, so that I don’t have to remember registering one when I create a new processing group.

However, the API requires a processingGroup parameter - have I missed something in the API or is there a way to get all registered processing groups from the EventProcessingConfiguration (or from somewhere else) to iterate over it and automatically register DLQs? Other ideas?

Cheers!
Stefan

I tried something like this:

@Autowired
fun registerDeadLetterQueues(
  eventProcessingConfiguration: EventProcessingConfiguration,
  configurer: EventProcessingConfigurer,
) = eventProcessingConfiguration.eventProcessors()
    .map { it.key }
    .forEach { eventProcessor ->
      configurer.registerDeadLetterQueue(eventProcessor) {
        JpaSequencedDeadLetterQueue
          .builder<EventMessage<*>>()
          .processingGroup(eventProcessor)
          .maxSequences(256)
          .maxSequenceSize(256)
          .entityManagerProvider(it.getComponent(EntityManagerProvider::class.java))
          .transactionManager(it.getComponent(TransactionManager::class.java))
          .serializer(it.serializer())
          .build()
      }
    }

but that does not seem to work as it keeps telling me:

org.axonframework.common.AxonConfigurationException: Cannot proceed because the Configuration is not initialized for this module yet.

Hi Stefan!

We deliberately didn’t add such a configuration option to the EventProcessingConfigurer.
The majority of our time was spent on deducing an API for the dead-letter queue, which we were happy with. Configuring just the basic options felt most reasonable from that perspective.

Furthermore, we wanted to get a grasp how users expected to configure the dead-letter queue too. Thus, anticipating requests like this.

Having the EventProcessingConfigurer provide a means to configure a default SequencedDeadLetterQueue for events is most definitely doable, mind you.
The only possible concern I might have is that this may just fill your queue very quickly.

Although this depends on the number of processing groups and the configured sequence and queue size on the dead-letter queue, of course.

If you do want to configure a default queue for all processing groups, you will need to know all the processing group names up front. With that, you can do something like this:

@Bean
public ConfigurerModule deadLetterQueueOnAllProcessingGroups(EntityManagerProvider entityManagerProvider) {
    List<String> processingGroups; // Retrieve from some (static) context
    return configurer -> {
        for (String processingGroup : processingGroups) {

            configurer.eventProcessing()
                      .registerDeadLetterQueue(processingGroup, c ->
                              JpaSequencedDeadLetterQueue.builder()
                                                         .processingGroup(processingGroup)
                                                         .entityManagerProvider(entityManagerProvider)
                                                         .transactionManager(c.getComponent(TransactionManager.class))
                                                         .serializer(c.eventSerializer())
                                                         .build()
                      );
        }
    };
}

Doing the above will add a ConfigurerModule that Axon Framework automatically appends on the overall Configurer.

I hope this clarifies your options, for now, Stefan!

Thanks Steven! I tried to solve it a bit more generically and to receive the ProcessingGroups not from a static source, but from the EventProcessingConfiguration. To achieve that, I had to extend the EventProcessingModule and override the initialize method like that (sorry for Kotlin :wink:) :

class CustomEventProcessingModule: EventProcessingModule() {

  companion object : KLogging()

  override fun initialize(configuration: org.axonframework.config.Configuration) {
    super.initialize(configuration)
    logger.info { "##### register DLQs #####" }

    configuration.onStart(Int.MAX_VALUE, Runnable {
      configuration.eventProcessingConfiguration().eventProcessors().map { it.key }
        .forEach { processingGroup ->
          logger.info { "##### register DLQ for processingGroup [$processingGroup] #####" }
          this.registerDeadLetterQueue(processingGroup) {
            JpaSequencedDeadLetterQueue
              .builder<EventMessage<*>>()
              .processingGroup(processingGroup)
              .maxSequences(256)
              .maxSequenceSize(256)
              .entityManagerProvider(it.getComponent(EntityManagerProvider::class.java))
              .transactionManager(it.getComponent(TransactionManager::class.java))
              .serializer(it.serializer())
              .build()
          }
        }
    })
  }
}

That seems to work but if I have a failing EventHandler, no DeadLetterEntry is created, but the normal error handling is triggered (the default LoggingErrorHandler in my case).

I still haven’t figured out why, but maybe you have an idea?

Cheers!
Stefan