I’ve been able to make eventsourcing basics work with mongodb, but i am not able to make eventhandlers work. Ive tried several ways but i am not able to…
I am trying eventhandler at aggregate and external. None of them works
CommandHandler works
The code is the following:
Appconfig with Spring Boot
package es.osoco.ulyseo.recommender
import com.mongodb.MongoClient
import com.mongodb.MongoCredential
import com.mongodb.ServerAddress
import es.osoco.ulyseo.recommender.application.recommendation.CalculateRecommendationUseCase
import es.osoco.ulyseo.recommender.domain.model.recommendation.Recommendation
import es.osoco.ulyseo.recommender.port.adapter.pricing.PricingPort
import groovy.util.logging.Slf4j
import org.axonframework.commandhandling.AggregateAnnotationCommandHandler
import org.axonframework.config.DefaultConfigurer
import org.axonframework.config.EventHandlingConfiguration
import org.axonframework.eventhandling.EventBus
import org.axonframework.eventhandling.EventProcessor
import org.axonframework.eventhandling.SimpleEventBus
import org.axonframework.eventhandling.SimpleEventHandlerInvoker
import org.axonframework.eventhandling.SubscribingEventProcessor
import org.axonframework.eventsourcing.EventSourcingRepository
import org.axonframework.eventsourcing.eventstore.EmbeddedEventStore
import org.axonframework.eventsourcing.eventstore.EventStore
import org.axonframework.mongo.eventsourcing.eventstore.DefaultMongoTemplate
import org.axonframework.mongo.eventsourcing.eventstore.MongoEventStorageEngine
import org.axonframework.mongo.eventsourcing.eventstore.MongoTemplate
import org.dozer.DozerBeanMapper
import org.springframework.amqp.core.Queue
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory
import org.springframework.amqp.rabbit.connection.ConnectionFactory
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.beans.factory.annotation.Value
import org.springframework.boot.SpringApplication
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.autoconfigure.amqp.RabbitProperties
import org.springframework.context.annotation.Bean
import org.springframework.scheduling.annotation.EnableAsync
@Slf4j
@SpringBootApplication
@EnableAsync
public class RecommenderApp {
@Autowired
private RabbitProperties props
@Autowired
private ConnectionFactory cachingConnectionFactory
@Autowired
private PricingPort pricingPort
@Value('${app.queue.pricing}')
private String pricingQueueName;
@Value('${app.queue.aggregates}')
private String aggregatesQueueName;
@Value('${app.pricingGenerated.routingKey}')
private String pricingRoutingKey;
@Bean(name = "axonMongoTemplate")
MongoTemplate axonMongoTemplate() {
MongoClient mongoClient = new MongoClient(new ServerAddress("mongodb",29017), [MongoCredential.createCredential("recommender-int","recommender-int","recommender-int".getChars())])
MongoTemplate template = new DefaultMongoTemplate(mongoClient,
"recommender-int", "recommendation", "recommendation-snapshot");
return template;
}
@Bean
public EmbeddedEventStore embeddedEventStore(){
EventStore eventStore = new EmbeddedEventStore(new MongoEventStorageEngine(axonMongoTemplate()))
eventStore
}
@Bean
EventBus eventBus(EventStore eventStore){
eventStore
//new SimpleEventBus()
}
@Bean
public EventSourcingRepository<Recommendation> taskRepository() {
EventSourcingRepository repository = new EventSourcingRepository(Recommendation.class, embeddedEventStore())
return repository
}
@Autowired
CalculateRecommendationUseCase calculateRecommendationUseCase
@Bean
AggregateAnnotationCommandHandler<Recommendation> commandHandler(EventSourcingRepository eventSourcingRepository){
EventHandlingConfiguration ehConfiguration = new EventHandlingConfiguration()
.registerEventHandler {conf -> calculateRecommendationUseCase}
DefaultConfigurer.defaultConfiguration().registerModule(ehConfiguration)
new AggregateAnnotationCommandHandler<Recommendation>(Recommendation.class,eventSourcingRepository)
}
@Bean
SubscribingEventProcessor subscribingEventProcessor(EventStore eventStore) {
new SubscribingEventProcessor("events", new SimpleEventHandlerInvoker(calculateRecommendationUseCase), eventStore)
}
@Bean
Queue pricingQueue() {
return new Queue(pricingQueueName);
}
@Bean(name = "org.dozer.Mapper")
public DozerBeanMapper dozerBeanSearchCommandToRecommender() {
List<String> mappingFiles = Arrays.asList(
"dozerMappingSearchToRecommender.xml"
);
DozerBeanMapper dozerBeanSearchCommandToRecommender = new DozerBeanMapper()
dozerBeanSearchCommandToRecommender.setMappingFiles(mappingFiles)
return dozerBeanSearchCommandToRecommender
}
@Bean(name = "org.dozer.Mapper")
public DozerBeanMapper dozerApplicationMapper() {
List<String> mappingFiles = ["dozer-application-mappings.xml"]
DozerBeanMapper dozerBean = new DozerBeanMapper()
dozerBean.setMappingFiles(mappingFiles)
return dozerBean
}
@Bean(name = "org.dozer.Mapper")
public DozerBeanMapper dozerBeanMapper() {
List<String> mappingFiles = ["dozer-port-mappings.xml", "dozer-application-new-mappings.xml"]
DozerBeanMapper dozerBean = new DozerBeanMapper()
dozerBean.setMappingFiles(mappingFiles)
dozerBean
}
@Bean
Queue aggregatesQueue() {
return new Queue(aggregatesQueueName, false);
}
// Setting the annotation listeners to use the jackson2JsonMessageConverter
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory()
factory.setConnectionFactory(cachingConnectionFactory)
factory.setMessageConverter(jackson2JsonMessageConverter())
//Set properties from file
factory.setMaxConcurrentConsumers(props.getListener().getMaxConcurrency())
factory.setConcurrentConsumers(props.getListener().getConcurrency())
factory.setAcknowledgeMode(props.getListener().getAcknowledgeMode())
factory.setDefaultRequeueRejected(props.getListener().getDefaultRequeueRejected())
return factory
}
// Standardize on a single objectMapper for all message queue items
@Bean
public Jackson2JsonMessageConverter jackson2JsonMessageConverter() {
new Jackson2JsonMessageConverter()
}
@Autowired
public static void main(String[] args) {
SpringApplication.run(RecommenderApp.class, args)
}
}
Aggregate
package es.osoco.ulyseo.recommender.domain.model.recommendation
import es.osoco.ulyseo.recommender.application.recommendation.CalculateRecoCommand
import groovy.transform.EqualsAndHashCode
import groovy.transform.ToString
import groovy.transform.TupleConstructor
import groovy.util.logging.Slf4j
import org.axonframework.commandhandling.CommandHandler
import org.axonframework.commandhandling.model.AggregateIdentifier
import org.axonframework.commandhandling.model.AggregateLifecycle
import org.axonframework.commandhandling.model.AggregateMember
import org.axonframework.commandhandling.model.AggregateRoot
import org.axonframework.eventhandling.EventHandler
import org.springframework.data.annotation.Id
import org.springframework.data.mongodb.core.mapping.Document
@Document(collection = "recommendationPort")
@ToString
@EqualsAndHashCode
@TupleConstructor
@Slf4j
class Recommendation {
// TODO fake fields, review them after contract is agreed
@AggregateIdentifier
@Id
String footprint
@AggregateMember
List<TravelPackage> travelPackages = new ArrayList<>()
@AggregateMember
TravelCriteria travelCriteria
Integer packagesCountForRecommendation
Recommendation() {
}
@CommandHandler
public Recommendation(CalculateRecoCommand calculateRecoCommand) {
this.footprint = calculateRecoCommand.footPrint
this.travelCriteria = calculateRecoCommand.travelCriteria
calculateRecoCommand.receivedTravelPackages.addAll(this.travelPackages)
calculateRecoCommand.receivedTravelPackages.sort {-it.scoring}
setTravelPackages(calculateRecoCommand.receivedTravelPackages.unique().take(2))
log.info "Keeping {} packages in recommendation, packagesCountForRecommendation: {}", travelPackages.size(), packagesCountForRecommendation
AggregateLifecycle.apply(new RecommendationCalculatedDomainEvent(footprint: calculateRecoCommand.footPrint,recommendation: this ))
}
@EventHandler
public void handle(RecommendationCalculatedDomainEvent recommendationCalculatedDomainEvent){
log.info "HELLO"
}
}
External eventHandlrer
package es.osoco.ulyseo.recommender.application.recommendation
import es.osoco.ulyseo.recommender.application.common.Generator
import es.osoco.ulyseo.recommender.application.rule.ScoringRuleEngine
import es.osoco.ulyseo.recommender.common.DomainEventPublisher
import es.osoco.ulyseo.recommender.domain.model.recommendation.Recommendation
import es.osoco.ulyseo.recommender.domain.model.recommendation.RecommendationCalculatedDomainEvent
import es.osoco.ulyseo.recommender.port.adapter.recommendation.RecommendationPort
import groovy.util.logging.Slf4j
import org.apache.flink.shaded.com.google.common.util.concurrent.Striped
import org.axonframework.commandhandling.CommandCallback
import org.axonframework.commandhandling.CommandMessage
import org.axonframework.commandhandling.gateway.CommandGateway
import org.axonframework.eventhandling.EventHandler
import org.axonframework.eventsourcing.eventstore.EventStore
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.beans.factory.annotation.Value
import org.springframework.stereotype.Service
import javax.annotation.PostConstruct
import java.util.concurrent.Semaphore
@Slf4j
@Service
class CalculateRecommendationUseCase implements Serializable {
@Autowired
EventStore eventStore
@Autowired
CommandGateway commandGateway
@Autowired
ScoringRuleEngine scoringRuleEngine
@Autowired
RecommendationPort recommendationRepository
@Autowired
DomainEventPublisher domainEventPublisher;
@Value('${app.packagesCountForRecommendation}')
Integer packagesCountForRecommendation
@Value('${app.concurrentFootprintExpected}')
Integer concurrentFootprintExpected
@Value('${app.concurrentPermitsByFootprint}')
Integer concurrentPermitsByFootprint
static Striped<Semaphore> semaphores
@PostConstruct
public void initializeSemaphores() {
semaphores = semaphores ?: Striped.semaphore(concurrentFootprintExpected, concurrentPermitsByFootprint)
}
void execute(CalculateRecommendationCommand calculateRecommendationCommand) {
//Translate to Domain
String footPrint = Generator.generateFootprint(calculateRecommendationCommand.getSearch())
Recommendation recommendation = calculateRecommendation(footPrint, calculateRecommendationCommand)
}
private void calculateRecommendation(String footPrint, CalculateRecommendationCommand calculateRecommendationCommand) {
Semaphore semaphore = semaphores.get(footPrint)
semaphore.acquire()
try {
//Recommendation recommendation = recommendationRepository.get(footPrint)
/*if (!recommendation) {
recommendation = new Recommendation()
recommendation.setFootprint(footPrint)
}*/
//recommendation.setPackagesCountForRecommendation(packagesCountForRecommendation)
//TODO: Convert application to Domain TravelPackages
List<es.osoco.ulyseo.recommender.domain.model.recommendation.TravelPackage> receivedTravelPackages = calculateRecommendationCommand.packages
assignScoring(receivedTravelPackages, calculateRecommendationCommand.travelCriteria)
//recommendation.calculateRecommendation(receivedTravelPackages, calculateRecommendationCommand.getTravelCriteria())
commandGateway.send(new CalculateRecoCommand(travelCriteria:calculateRecommendationCommand.getTravelCriteria(),footPrint: footPrint, receivedTravelPackages: receivedTravelPackages), new CommandCallback() {
@Override
void onSuccess(CommandMessage commandMessage, Object result) {
log.info("OK {}",result)
}
@Override
void onFailure(CommandMessage commandMessage, Throwable cause) {
log.info("KO {}",cause)
}
})
}
catch(Exception e){
log.error("Error calculating recommendation",e)
}
finally {
semaphore.release()
}
}
private RecommendationCalculatedDomainEvent buildRecommendationCalculatedEvent(CalculateRecommendationCommand calculateRecommendationCommand, Recommendation recommendation) {
RecommendationCalculatedDomainEvent recommendationCalculatedDomainEvent = new RecommendationCalculatedDomainEvent()
recommendationCalculatedDomainEvent.setFootprint(recommendation.getFootprint())
recommendationCalculatedDomainEvent.setPackagesCountForRecommendation(packagesCountForRecommendation)
recommendationCalculatedDomainEvent.setTravelCriteria(calculateRecommendationCommand.getTravelCriteria())
recommendationCalculatedDomainEvent.setRecommendation(recommendation)
recommendationCalculatedDomainEvent
}
private void assignScoring(List<es.osoco.ulyseo.recommender.domain.model.recommendation.TravelPackage> receivedTravelPackages, es.osoco.ulyseo.recommender.domain.model.recommendation.TravelCriteria travelCriteria){
receivedTravelPackages.each { travelPackage ->
scoringRuleEngine.execute(travelPackage)
}
}
@EventHandler
public void handle(RecommendationCalculatedDomainEvent recommendationCalculatedDomainEvent){
recommendationRepository.set(recommendationCalculatedDomainEvent.footprint,recommendationCalculatedDomainEvent.recommendation)
domainEventPublisher.publish(recommendationCalculatedDomainEvent)
}
}