Spring Boot App Axon Server configuration

Hi all,

First, I have a spring boot application using axon 4.5.5 libraries connecting to server 4.2.4. Is this supported?

This spring boot app is suppose to listen to several events coming from the (main app emitted to the) axon server and here is my sprint boot axon client configuration in application.yml below.

Second, this application connects to the axon server but fails to handle any events causing all the said events to be blacklisted. I have trace it to the event handler registration is probably causing the issue. We are using the EventStore as the StreamableMessageSource when calling registerTrackingEventProcessor().

Do you guys have any ideas why the registered event handlers are not firing? I can see the spring boot app is connected to the axon server on the dashboard as well as the main app that fires the events. I can also see the fired events when searching it in the dashboard and their blacklisting in the axon server log. So my guess it is the configuration causing issues.

axon:
  axonserver:
    client-id: reporting-etl-client
    component-name: reporting-etl
    query-threads: ${AXON_QUERY_THREADS_MAX:50}
    servers: ${AXON_AXONSERVER_SERVERS:axonserver}
  serializer:
    events: jackson

and AxonConfig.java:

package com.fedeee.reporting.config;

import com.fedeee.reporting.axon.events.EventHandlerProjector;
import com.thoughtworks.xstream.XStream;
import org.axonframework.axonserver.connector.query.AxonServerQueryBus;
import org.axonframework.config.EventProcessingConfigurer;
import org.axonframework.config.ProcessingGroup;
import org.axonframework.eventhandling.EventBus;
import org.axonframework.eventhandling.TrackingEventProcessorConfiguration;
import org.axonframework.eventhandling.gateway.DefaultEventGateway;
import org.axonframework.eventhandling.gateway.EventGateway;
import org.axonframework.queryhandling.DefaultQueryGateway;
import org.axonframework.queryhandling.QueryBus;
import org.axonframework.queryhandling.QueryGateway;
import org.axonframework.serialization.AbstractXStreamSerializer;
import org.axonframework.serialization.Serializer;
import org.axonframework.serialization.xml.XStreamSerializer;
import org.reflections.Reflections;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.Assert;

import java.util.Set;

@Configuration
public class AxonConfig {

  private static final Logger LOG = LoggerFactory.getLogger(AxonConfig.class);

  /**
   * Correctly configuring the XStream serializer to avoid security warnings.
   */
  @Autowired
  public void configureXStream(Serializer serializer) {
    if (serializer instanceof AbstractXStreamSerializer) {
      XStream xStream = ((XStreamSerializer) serializer).getXStream();
      XStream.setupDefaultSecurity(xStream);
      xStream.allowTypesByWildcard(new String[] {"com.fedeee.pkg.api.events.**", "org.axonframework.**"});
    }
  }

  /**
   *
   * @param configurer
   * @param context
   */
  @Autowired
  public void configure(EventProcessingConfigurer configurer, ApplicationContext context) {
    LOG.info("Setting up TrackingEventProcessors for threads, batch size and other configurations..."
        + " annotated with @ProcessingGroup...");
    // find classes in the com.fedeee.* package that has methods annotated with @ProcessingGroup to configure
    Reflections reflections = new Reflections("com.fedeee.reporting.axon.events");
    Set<Class<?>> annotatedClasses = reflections.getTypesAnnotatedWith(ProcessingGroup.class);

    // Configure each identified class
    annotatedClasses.stream().forEach(annotatedClass -> {
      // Locate the appropriate spring bean to get appropriate values from each one.
      String beanName = annotatedClass.getName().substring(annotatedClass.getName().lastIndexOf(".") + 1);
      beanName = beanName.substring(0,1).toLowerCase() + beanName.substring(1);
      Object projObj = context.getBean(beanName);
      if (projObj instanceof EventHandlerProjector) {
        EventHandlerProjector projector = (EventHandlerProjector) projObj;
            LOG.info("Configuring EventHandlerProjector Bean '{}' with maxThreads: {} and batchSize: {}.",
            beanName, projector.getMaxThreads(), projector.getBatchSize());
        ProcessingGroup pgAnnotation = annotatedClass.getAnnotation(ProcessingGroup.class);
        String processingGroup = pgAnnotation.value();

        configurer.registerTrackingEventProcessor(
                processingGroup,
                org.axonframework.config.Configuration::eventStore,
                conf -> TrackingEventProcessorConfiguration.forParallelProcessing(projector.getMaxThreads())
                        .andBatchSize(projector.getBatchSize())
        ).registerHandlerInterceptor(processingGroup, configuration -> new EventHandlerLoggingInterceptor());
        // Enable logging for EventHandlers

        LOG.info(".. '{}' successfully configured with processing group '{}'.", beanName, processingGroup);
      } else {
        LOG.info(".. '{}' failed to configure with any processing group.", beanName);
      }
    });

    // TODO: handle tracking event processor initialization. See the axon mailing list thread:
    // *****************************************************************************************************************
    // https://groups.google.com/forum/#!topic/axonframework/eyw0rRiSzUw
    // In that thread there is a discussion about properly initializing the token store to avoid recreating query models.
    // I still need to understand more about this...
    // *****************************************************************************************************************
  }

//  @Autowired
//  public void configureErrorHandling(
//      EventProcessingConfigurer configurer, ErrorHandler errorHandler
//  ) {
//    configurer.registerDefaultListenerInvocationErrorHandler(c -> errorHandler);
//  }

  @Autowired
  public void registerInterceptors(QueryBus queryBus) {
    Assert.notNull(queryBus, "Invalid configuration, queryBus is null!");
    if (AxonServerQueryBus.class.isAssignableFrom(queryBus.getClass())) {
      queryBus.registerHandlerInterceptor(InterceptorSupport.authorizationHandlerInterceptor());
    }

  }

  @Bean
  public QueryGateway queryGateway(QueryBus queryBus) {
    return DefaultQueryGateway.builder().queryBus(queryBus).build();
  }

  @Bean
  public EventGateway eventGateway(EventBus eventBus) {
    return DefaultEventGateway.builder().eventBus(eventBus).build();
  }

}

Hey Ed,
May I ask why you are running such an old version of Axon Server? I mean, yes, it is definitely possible, but a lot has been improved and fixed since then. The most recent version of Axon Server SE is 4.6.7.

Bert,

Thanks for your comment. I am not in a position to make a decision to upgrade to newer versions. But I will ask my team lead and see if he is ok with that. What will be the version that you would recommend the latest 4.6.7?

BTW, do you know if this is ok to use event store as the StreamableMessageSource to register tracking event processor classes?

package com.fedeee.reporting.config;

import com.fedeee.reporting.axon.events.EventHandlerProjector;
import com.thoughtworks.xstream.XStream;
import org.axonframework.axonserver.connector.query.AxonServerQueryBus;
import org.axonframework.config.EventProcessingConfigurer;
import org.axonframework.config.ProcessingGroup;
import org.axonframework.eventhandling.EventBus;
import org.axonframework.eventhandling.TrackingEventProcessorConfiguration;
import org.axonframework.eventhandling.gateway.DefaultEventGateway;
import org.axonframework.eventhandling.gateway.EventGateway;
import org.axonframework.queryhandling.DefaultQueryGateway;
import org.axonframework.queryhandling.QueryBus;
import org.axonframework.queryhandling.QueryGateway;
import org.axonframework.serialization.AbstractXStreamSerializer;
import org.axonframework.serialization.Serializer;
import org.axonframework.serialization.xml.XStreamSerializer;
import org.reflections.Reflections;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.Assert;

import java.util.Set;

@Configuration
public class AxonConfig {

  private static final Logger LOG = LoggerFactory.getLogger(AxonConfig.class);

  /**
   * Correctly configuring the XStream serializer to avoid security warnings.
   */
  @Autowired
  public void configureXStream(Serializer serializer) {
    if (serializer instanceof AbstractXStreamSerializer) {
      XStream xStream = ((XStreamSerializer) serializer).getXStream();
      XStream.setupDefaultSecurity(xStream);
      xStream.allowTypesByWildcard(new String[] {"com.fedeee.pkg.api.events.**", "org.axonframework.**"});
    }
  }

  /**
   *
   * @param configurer
   * @param context
   */
  @Autowired
  public void configure(EventProcessingConfigurer configurer, ApplicationContext context) {
    LOG.info("Setting up TrackingEventProcessors for threads, batch size and other configurations..."
        + " annotated with @ProcessingGroup...");
    // find classes in the com.fedeee.* package that has methods annotated with @ProcessingGroup to configure
    Reflections reflections = new Reflections("com.fedeee.reporting.axon.events");
    Set<Class<?>> annotatedClasses = reflections.getTypesAnnotatedWith(ProcessingGroup.class);

    // Configure each identified class
    annotatedClasses.stream().forEach(annotatedClass -> {
      // Locate the appropriate spring bean to get appropriate values from each one.
      String beanName = annotatedClass.getName().substring(annotatedClass.getName().lastIndexOf(".") + 1);
      beanName = beanName.substring(0,1).toLowerCase() + beanName.substring(1);
      Object projObj = context.getBean(beanName);
      if (projObj instanceof EventHandlerProjector) {
        EventHandlerProjector projector = (EventHandlerProjector) projObj;
            LOG.info("Configuring EventHandlerProjector Bean '{}' with maxThreads: {} and batchSize: {}.",
            beanName, projector.getMaxThreads(), projector.getBatchSize());
        ProcessingGroup pgAnnotation = annotatedClass.getAnnotation(ProcessingGroup.class);
        String processingGroup = pgAnnotation.value();

        configurer.registerTrackingEventProcessor(
                processingGroup,
                org.axonframework.config.Configuration::eventStore,
                conf -> TrackingEventProcessorConfiguration.forParallelProcessing(projector.getMaxThreads())
                        .andBatchSize(projector.getBatchSize())
        ).registerHandlerInterceptor(processingGroup, configuration -> new EventHandlerLoggingInterceptor());
        // Enable logging for EventHandlers

        LOG.info(".. '{}' successfully configured with processing group '{}'.", beanName, processingGroup);
      } else {
        LOG.info(".. '{}' failed to configure with any processing group.", beanName);
      }
    });

    // TODO: handle tracking event processor initialization. See the axon mailing list thread:
    // *****************************************************************************************************************
    // https://groups.google.com/forum/#!topic/axonframework/eyw0rRiSzUw
    // In that thread there is a discussion about properly initializing the token store to avoid recreating query models.
    // I still need to understand more about this...
    // *****************************************************************************************************************
  }

//  @Autowired
//  public void configureErrorHandling(
//      EventProcessingConfigurer configurer, ErrorHandler errorHandler
//  ) {
//    configurer.registerDefaultListenerInvocationErrorHandler(c -> errorHandler);
//  }

  @Autowired
  public void registerInterceptors(QueryBus queryBus) {
    Assert.notNull(queryBus, "Invalid configuration, queryBus is null!");
    if (AxonServerQueryBus.class.isAssignableFrom(queryBus.getClass())) {
      queryBus.registerHandlerInterceptor(InterceptorSupport.authorizationHandlerInterceptor());
    }

  }

  @Bean
  public QueryGateway queryGateway(QueryBus queryBus) {
    return DefaultQueryGateway.builder().queryBus(queryBus).build();
  }

  @Bean
  public EventGateway eventGateway(EventBus eventBus) {
    return DefaultEventGateway.builder().eventBus(eventBus).build();
  }

}

Ed.

It turns out my version of Axon client (4.5.5) requires the annotation @ProcessingGroup(“use-only-lower-case”). After using all lowercase for the annotation, events starts firing into the handler.

1 Like