Hi all,
First, I have a spring boot application using axon 4.5.5 libraries connecting to server 4.2.4. Is this supported?
This spring boot app is suppose to listen to several events coming from the (main app emitted to the) axon server and here is my sprint boot axon client configuration in application.yml below.
Second, this application connects to the axon server but fails to handle any events causing all the said events to be blacklisted. I have trace it to the event handler registration is probably causing the issue. We are using the EventStore as the StreamableMessageSource when calling registerTrackingEventProcessor().
Do you guys have any ideas why the registered event handlers are not firing? I can see the spring boot app is connected to the axon server on the dashboard as well as the main app that fires the events. I can also see the fired events when searching it in the dashboard and their blacklisting in the axon server log. So my guess it is the configuration causing issues.
axon:
axonserver:
client-id: reporting-etl-client
component-name: reporting-etl
query-threads: ${AXON_QUERY_THREADS_MAX:50}
servers: ${AXON_AXONSERVER_SERVERS:axonserver}
serializer:
events: jackson
and AxonConfig.java:
package com.fedeee.reporting.config;
import com.fedeee.reporting.axon.events.EventHandlerProjector;
import com.thoughtworks.xstream.XStream;
import org.axonframework.axonserver.connector.query.AxonServerQueryBus;
import org.axonframework.config.EventProcessingConfigurer;
import org.axonframework.config.ProcessingGroup;
import org.axonframework.eventhandling.EventBus;
import org.axonframework.eventhandling.TrackingEventProcessorConfiguration;
import org.axonframework.eventhandling.gateway.DefaultEventGateway;
import org.axonframework.eventhandling.gateway.EventGateway;
import org.axonframework.queryhandling.DefaultQueryGateway;
import org.axonframework.queryhandling.QueryBus;
import org.axonframework.queryhandling.QueryGateway;
import org.axonframework.serialization.AbstractXStreamSerializer;
import org.axonframework.serialization.Serializer;
import org.axonframework.serialization.xml.XStreamSerializer;
import org.reflections.Reflections;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.Assert;
import java.util.Set;
@Configuration
public class AxonConfig {
private static final Logger LOG = LoggerFactory.getLogger(AxonConfig.class);
/**
* Correctly configuring the XStream serializer to avoid security warnings.
*/
@Autowired
public void configureXStream(Serializer serializer) {
if (serializer instanceof AbstractXStreamSerializer) {
XStream xStream = ((XStreamSerializer) serializer).getXStream();
XStream.setupDefaultSecurity(xStream);
xStream.allowTypesByWildcard(new String[] {"com.fedeee.pkg.api.events.**", "org.axonframework.**"});
}
}
/**
*
* @param configurer
* @param context
*/
@Autowired
public void configure(EventProcessingConfigurer configurer, ApplicationContext context) {
LOG.info("Setting up TrackingEventProcessors for threads, batch size and other configurations..."
+ " annotated with @ProcessingGroup...");
// find classes in the com.fedeee.* package that has methods annotated with @ProcessingGroup to configure
Reflections reflections = new Reflections("com.fedeee.reporting.axon.events");
Set<Class<?>> annotatedClasses = reflections.getTypesAnnotatedWith(ProcessingGroup.class);
// Configure each identified class
annotatedClasses.stream().forEach(annotatedClass -> {
// Locate the appropriate spring bean to get appropriate values from each one.
String beanName = annotatedClass.getName().substring(annotatedClass.getName().lastIndexOf(".") + 1);
beanName = beanName.substring(0,1).toLowerCase() + beanName.substring(1);
Object projObj = context.getBean(beanName);
if (projObj instanceof EventHandlerProjector) {
EventHandlerProjector projector = (EventHandlerProjector) projObj;
LOG.info("Configuring EventHandlerProjector Bean '{}' with maxThreads: {} and batchSize: {}.",
beanName, projector.getMaxThreads(), projector.getBatchSize());
ProcessingGroup pgAnnotation = annotatedClass.getAnnotation(ProcessingGroup.class);
String processingGroup = pgAnnotation.value();
configurer.registerTrackingEventProcessor(
processingGroup,
org.axonframework.config.Configuration::eventStore,
conf -> TrackingEventProcessorConfiguration.forParallelProcessing(projector.getMaxThreads())
.andBatchSize(projector.getBatchSize())
).registerHandlerInterceptor(processingGroup, configuration -> new EventHandlerLoggingInterceptor());
// Enable logging for EventHandlers
LOG.info(".. '{}' successfully configured with processing group '{}'.", beanName, processingGroup);
} else {
LOG.info(".. '{}' failed to configure with any processing group.", beanName);
}
});
// TODO: handle tracking event processor initialization. See the axon mailing list thread:
// *****************************************************************************************************************
// https://groups.google.com/forum/#!topic/axonframework/eyw0rRiSzUw
// In that thread there is a discussion about properly initializing the token store to avoid recreating query models.
// I still need to understand more about this...
// *****************************************************************************************************************
}
// @Autowired
// public void configureErrorHandling(
// EventProcessingConfigurer configurer, ErrorHandler errorHandler
// ) {
// configurer.registerDefaultListenerInvocationErrorHandler(c -> errorHandler);
// }
@Autowired
public void registerInterceptors(QueryBus queryBus) {
Assert.notNull(queryBus, "Invalid configuration, queryBus is null!");
if (AxonServerQueryBus.class.isAssignableFrom(queryBus.getClass())) {
queryBus.registerHandlerInterceptor(InterceptorSupport.authorizationHandlerInterceptor());
}
}
@Bean
public QueryGateway queryGateway(QueryBus queryBus) {
return DefaultQueryGateway.builder().queryBus(queryBus).build();
}
@Bean
public EventGateway eventGateway(EventBus eventBus) {
return DefaultEventGateway.builder().eventBus(eventBus).build();
}
}