Axon 3.0 Event Handlers not working ( Spring Boot, with starter)

I’ve been able to make eventsourcing basics work with mongodb, but i am not able to make eventhandlers work. Ive tried several ways but i am not able to…

I am trying eventhandler at aggregate and external. None of them works

CommandHandler works

The code is the following:

Appconfig with Spring Boot

package es.osoco.ulyseo.recommender

import com.mongodb.MongoClient
import com.mongodb.MongoCredential
import com.mongodb.ServerAddress
import es.osoco.ulyseo.recommender.application.recommendation.CalculateRecommendationUseCase
import es.osoco.ulyseo.recommender.domain.model.recommendation.Recommendation
import es.osoco.ulyseo.recommender.port.adapter.pricing.PricingPort
import groovy.util.logging.Slf4j
import org.axonframework.commandhandling.AggregateAnnotationCommandHandler
import org.axonframework.config.DefaultConfigurer
import org.axonframework.config.EventHandlingConfiguration
import org.axonframework.eventhandling.EventBus
import org.axonframework.eventhandling.EventProcessor
import org.axonframework.eventhandling.SimpleEventBus
import org.axonframework.eventhandling.SimpleEventHandlerInvoker
import org.axonframework.eventhandling.SubscribingEventProcessor
import org.axonframework.eventsourcing.EventSourcingRepository
import org.axonframework.eventsourcing.eventstore.EmbeddedEventStore
import org.axonframework.eventsourcing.eventstore.EventStore
import org.axonframework.mongo.eventsourcing.eventstore.DefaultMongoTemplate
import org.axonframework.mongo.eventsourcing.eventstore.MongoEventStorageEngine
import org.axonframework.mongo.eventsourcing.eventstore.MongoTemplate
import org.dozer.DozerBeanMapper
import org.springframework.amqp.core.Queue
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory
import org.springframework.amqp.rabbit.connection.ConnectionFactory
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.beans.factory.annotation.Value
import org.springframework.boot.SpringApplication
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.autoconfigure.amqp.RabbitProperties
import org.springframework.context.annotation.Bean
import org.springframework.scheduling.annotation.EnableAsync

@Slf4j
@SpringBootApplication
@EnableAsync
public class RecommenderApp {

    @Autowired
    private RabbitProperties props

    @Autowired
    private ConnectionFactory cachingConnectionFactory

    @Autowired
    private PricingPort pricingPort

    @Value('${app.queue.pricing}')
    private String pricingQueueName;

    @Value('${app.queue.aggregates}')
    private String aggregatesQueueName;

    @Value('${app.pricingGenerated.routingKey}')
    private String pricingRoutingKey;

    @Bean(name = "axonMongoTemplate")
    MongoTemplate axonMongoTemplate() {

        MongoClient mongoClient = new MongoClient(new ServerAddress("mongodb",29017),  [MongoCredential.createCredential("recommender-int","recommender-int","recommender-int".getChars())])

        MongoTemplate template = new DefaultMongoTemplate(mongoClient,
                "recommender-int", "recommendation", "recommendation-snapshot");
        return template;
    }

    @Bean
    public EmbeddedEventStore embeddedEventStore(){

        EventStore eventStore = new EmbeddedEventStore(new MongoEventStorageEngine(axonMongoTemplate()))

        eventStore
    }

    @Bean
    EventBus eventBus(EventStore eventStore){
        eventStore
        //new SimpleEventBus()
    }
    @Bean
    public EventSourcingRepository<Recommendation> taskRepository() {

        EventSourcingRepository repository = new EventSourcingRepository(Recommendation.class, embeddedEventStore())

        return repository
    }

    @Autowired
    CalculateRecommendationUseCase calculateRecommendationUseCase
    @Bean
    AggregateAnnotationCommandHandler<Recommendation> commandHandler(EventSourcingRepository eventSourcingRepository){

        EventHandlingConfiguration ehConfiguration = new EventHandlingConfiguration()
                .registerEventHandler {conf -> calculateRecommendationUseCase}
        DefaultConfigurer.defaultConfiguration().registerModule(ehConfiguration)

        new AggregateAnnotationCommandHandler<Recommendation>(Recommendation.class,eventSourcingRepository)

    }

    @Bean
    SubscribingEventProcessor subscribingEventProcessor(EventStore eventStore) {
        new SubscribingEventProcessor("events", new SimpleEventHandlerInvoker(calculateRecommendationUseCase), eventStore)
    }

    @Bean
    Queue pricingQueue() {
        return new Queue(pricingQueueName);
    }

    @Bean(name = "org.dozer.Mapper")
    public DozerBeanMapper dozerBeanSearchCommandToRecommender() {
        List<String> mappingFiles = Arrays.asList(
                "dozerMappingSearchToRecommender.xml"
        );

        DozerBeanMapper dozerBeanSearchCommandToRecommender = new DozerBeanMapper()
        dozerBeanSearchCommandToRecommender.setMappingFiles(mappingFiles)
        return dozerBeanSearchCommandToRecommender

    }

    @Bean(name = "org.dozer.Mapper")
    public DozerBeanMapper dozerApplicationMapper() {
        List<String> mappingFiles = ["dozer-application-mappings.xml"]

        DozerBeanMapper dozerBean = new DozerBeanMapper()
        dozerBean.setMappingFiles(mappingFiles)
        return dozerBean
    }

    @Bean(name = "org.dozer.Mapper")
    public DozerBeanMapper dozerBeanMapper() {
        List<String> mappingFiles = ["dozer-port-mappings.xml", "dozer-application-new-mappings.xml"]
        DozerBeanMapper dozerBean = new DozerBeanMapper()
        dozerBean.setMappingFiles(mappingFiles)
        dozerBean
    }

    @Bean
    Queue aggregatesQueue() {
        return new Queue(aggregatesQueueName, false);
    }

    // Setting the annotation listeners to use the jackson2JsonMessageConverter
    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory()
        factory.setConnectionFactory(cachingConnectionFactory)
        factory.setMessageConverter(jackson2JsonMessageConverter())
        //Set properties from file
        factory.setMaxConcurrentConsumers(props.getListener().getMaxConcurrency())
        factory.setConcurrentConsumers(props.getListener().getConcurrency())
        factory.setAcknowledgeMode(props.getListener().getAcknowledgeMode())
        factory.setDefaultRequeueRejected(props.getListener().getDefaultRequeueRejected())

        return factory
    }

    // Standardize on a single objectMapper for all message queue items
    @Bean
    public Jackson2JsonMessageConverter jackson2JsonMessageConverter() {
        new Jackson2JsonMessageConverter()
    }

    @Autowired
    public static void main(String[] args) {
        SpringApplication.run(RecommenderApp.class, args)

    }

}

Aggregate

package es.osoco.ulyseo.recommender.domain.model.recommendation

import es.osoco.ulyseo.recommender.application.recommendation.CalculateRecoCommand
import groovy.transform.EqualsAndHashCode
import groovy.transform.ToString
import groovy.transform.TupleConstructor
import groovy.util.logging.Slf4j
import org.axonframework.commandhandling.CommandHandler
import org.axonframework.commandhandling.model.AggregateIdentifier
import org.axonframework.commandhandling.model.AggregateLifecycle
import org.axonframework.commandhandling.model.AggregateMember
import org.axonframework.commandhandling.model.AggregateRoot
import org.axonframework.eventhandling.EventHandler
import org.springframework.data.annotation.Id
import org.springframework.data.mongodb.core.mapping.Document

@Document(collection = "recommendationPort")
@ToString
@EqualsAndHashCode
@TupleConstructor
@Slf4j
class Recommendation  {

    // TODO fake fields, review them after contract is agreed
    @AggregateIdentifier
    @Id
    String footprint
    @AggregateMember
    List<TravelPackage> travelPackages = new ArrayList<>()
    @AggregateMember
    TravelCriteria travelCriteria
    Integer packagesCountForRecommendation

    Recommendation() {
    }

    @CommandHandler
    public Recommendation(CalculateRecoCommand calculateRecoCommand) {

        this.footprint = calculateRecoCommand.footPrint

        this.travelCriteria = calculateRecoCommand.travelCriteria

        calculateRecoCommand.receivedTravelPackages.addAll(this.travelPackages)
        calculateRecoCommand.receivedTravelPackages.sort {-it.scoring}

        setTravelPackages(calculateRecoCommand.receivedTravelPackages.unique().take(2))

        log.info "Keeping {} packages in recommendation, packagesCountForRecommendation: {}", travelPackages.size(), packagesCountForRecommendation

        AggregateLifecycle.apply(new RecommendationCalculatedDomainEvent(footprint: calculateRecoCommand.footPrint,recommendation: this ))

    }
    @EventHandler
    public void handle(RecommendationCalculatedDomainEvent recommendationCalculatedDomainEvent){
        log.info "HELLO"
    }

}

External eventHandlrer

package es.osoco.ulyseo.recommender.application.recommendation

import es.osoco.ulyseo.recommender.application.common.Generator
import es.osoco.ulyseo.recommender.application.rule.ScoringRuleEngine
import es.osoco.ulyseo.recommender.common.DomainEventPublisher
import es.osoco.ulyseo.recommender.domain.model.recommendation.Recommendation
import es.osoco.ulyseo.recommender.domain.model.recommendation.RecommendationCalculatedDomainEvent
import es.osoco.ulyseo.recommender.port.adapter.recommendation.RecommendationPort
import groovy.util.logging.Slf4j
import org.apache.flink.shaded.com.google.common.util.concurrent.Striped
import org.axonframework.commandhandling.CommandCallback
import org.axonframework.commandhandling.CommandMessage
import org.axonframework.commandhandling.gateway.CommandGateway
import org.axonframework.eventhandling.EventHandler
import org.axonframework.eventsourcing.eventstore.EventStore
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.beans.factory.annotation.Value
import org.springframework.stereotype.Service

import javax.annotation.PostConstruct
import java.util.concurrent.Semaphore

@Slf4j
@Service
class CalculateRecommendationUseCase implements Serializable {

    @Autowired
    EventStore eventStore

    @Autowired
    CommandGateway commandGateway

    @Autowired
    ScoringRuleEngine scoringRuleEngine

    @Autowired
    RecommendationPort recommendationRepository

    @Autowired
    DomainEventPublisher domainEventPublisher;

    @Value('${app.packagesCountForRecommendation}')
    Integer packagesCountForRecommendation

    @Value('${app.concurrentFootprintExpected}')
    Integer concurrentFootprintExpected

    @Value('${app.concurrentPermitsByFootprint}')
    Integer concurrentPermitsByFootprint

    static Striped<Semaphore> semaphores

    @PostConstruct
    public void initializeSemaphores() {
        semaphores = semaphores ?: Striped.semaphore(concurrentFootprintExpected, concurrentPermitsByFootprint)
    }

    void execute(CalculateRecommendationCommand calculateRecommendationCommand) {
        //Translate to Domain
        String footPrint = Generator.generateFootprint(calculateRecommendationCommand.getSearch())
        Recommendation recommendation = calculateRecommendation(footPrint, calculateRecommendationCommand)

    }

    private void calculateRecommendation(String footPrint, CalculateRecommendationCommand calculateRecommendationCommand) {
        Semaphore semaphore = semaphores.get(footPrint)
        semaphore.acquire()

        try {
            //Recommendation recommendation = recommendationRepository.get(footPrint)
            /*if (!recommendation) {
                recommendation = new Recommendation()
                recommendation.setFootprint(footPrint)
            }*/
            //recommendation.setPackagesCountForRecommendation(packagesCountForRecommendation)

            //TODO: Convert application to Domain TravelPackages

            List<es.osoco.ulyseo.recommender.domain.model.recommendation.TravelPackage> receivedTravelPackages = calculateRecommendationCommand.packages

            assignScoring(receivedTravelPackages, calculateRecommendationCommand.travelCriteria)

            //recommendation.calculateRecommendation(receivedTravelPackages, calculateRecommendationCommand.getTravelCriteria())

            commandGateway.send(new CalculateRecoCommand(travelCriteria:calculateRecommendationCommand.getTravelCriteria(),footPrint: footPrint, receivedTravelPackages: receivedTravelPackages), new CommandCallback() {
                @Override
                void onSuccess(CommandMessage commandMessage, Object result) {
                    log.info("OK {}",result)

                }

                @Override
                void onFailure(CommandMessage commandMessage, Throwable cause) {

                    log.info("KO {}",cause)

                }
            })

        }
            catch(Exception e){
                log.error("Error calculating recommendation",e)

            }

         finally {
            semaphore.release()
        }
    }

    private RecommendationCalculatedDomainEvent buildRecommendationCalculatedEvent(CalculateRecommendationCommand calculateRecommendationCommand, Recommendation recommendation) {
        RecommendationCalculatedDomainEvent recommendationCalculatedDomainEvent = new RecommendationCalculatedDomainEvent()
        recommendationCalculatedDomainEvent.setFootprint(recommendation.getFootprint())
        recommendationCalculatedDomainEvent.setPackagesCountForRecommendation(packagesCountForRecommendation)
        recommendationCalculatedDomainEvent.setTravelCriteria(calculateRecommendationCommand.getTravelCriteria())
        recommendationCalculatedDomainEvent.setRecommendation(recommendation)
        recommendationCalculatedDomainEvent
    }

    private void assignScoring(List<es.osoco.ulyseo.recommender.domain.model.recommendation.TravelPackage> receivedTravelPackages, es.osoco.ulyseo.recommender.domain.model.recommendation.TravelCriteria travelCriteria){

        receivedTravelPackages.each { travelPackage ->

            scoringRuleEngine.execute(travelPackage)

        }
    }
    @EventHandler
    public void handle(RecommendationCalculatedDomainEvent recommendationCalculatedDomainEvent){
        recommendationRepository.set(recommendationCalculatedDomainEvent.footprint,recommendationCalculatedDomainEvent.recommendation)
        domainEventPublisher.publish(recommendationCalculatedDomainEvent)
    }

}

I found the problem. :I

The Event was implementing DomainEventMessage, That was the problem

Thanks to all

An awesome product