I have a problem with Axon with Spring Boot + RabbbitMQ.
I currently have a micro service that processes the messages made available by RabbitMQ (which Axon has inserted) and then I do an insert in my Database (postgresql/MongoDB).
So I have three Databases: Normal BD(postgresql), EventStore(Axon), NoSQL(MongoDB)
But if I do a delete of a patient in “postgresql” or “MongoDB”, after my application restarts, as I’m recovering all my inserts that I had already done. I have the impression that it is related to axon events, as axon sees that it doesn’t exist in that database or something like that, it retrieves that event in TrackingEventProcessor and runs again, I would like to have full control of my messages. I understand that events should not be deleted, but my application database would like to have control.
Could you indicate how I can do this?
I’m using the SendAndWait() method to send the commands and in the projection I’m using the framework’s default settings
As you are using an Event Store, you should have another Event which will delete a patient. Having that as part of your events will make sure your projections are not in a different state.
I have @EventHandler with PatientAddedEvent, PatientUpdatedEvent and PatientDeletedEvent.
And the flow is working normally as expected. Save to Event Store, Normal BD and NoSQL BD. But if I run the deleted event, it will remove it from my database as it’s supposed to and it’s still in my Event Store as it’s supposed to. But the problem is that my @EventHandler method has some kind of cache that executes all my Events again so doing the Insert again, as a kind of cache, I don’t know why, because it shouldn’t be doing. I validated if there is something in RabbitMQ to be processed and it doesn’t. I want to find out where this cache is coming from and why when I restart my application it’s all running again. I don’t have anything in my application.yaml or @Configuration, so I’m using all Axon Framework defaults.
Do you mind sharing an example of this behaviour?
One thing we suspect is that, since you are using Mongo, you haven’t configured a persistent Token Store which will cause a replay of your Event Stream on each restart.
You mentioned that your events are executed again - does it include the PatientDeletedEvent?
I made the settings to work with MongoBD and it’s not writing, just in the “trackingtokens” document, I also noticed that my @EventHandler that worked now aren’t working.
application.yaml
axon:
axonserver:
servers: localhost:8124
token: 12345
amqp:
exchange: patients.exchange
serializer:
general: jackson
events: jackson
messages: jackson
eventhandling:
processors:
name:
source: myQueueMessageSource
rabbitmq:
.....
Config Class:
@Configuration
public class AxonConfig {
@Bean
public EmbeddedEventStore eventStore(EventStorageEngine storageEngine, AxonConfiguration configuration) {
return EmbeddedEventStore.builder()
.storageEngine(storageEngine)
.messageMonitor(configuration.messageMonitor(EventStore.class, "eventStore"))
.build();
}
@Bean
public EventStorageEngine storageEngine(MongoClient client) {
return MongoEventStorageEngine.builder()
.mongoTemplate(DefaultMongoTemplate.builder().mongoDatabase(client).build())
.build();
}
@Bean
public TokenStore tokenStore(MongoClient client, Serializer serializer) {
return MongoTokenStore.builder().mongoTemplate(DefaultMongoTemplate.builder().mongoDatabase(client).build()).serializer(serializer).build();
}
}
RabbitConfig
@Configuration
public class RabbitConfig {
....
@Bean
public SpringAMQPMessageSource myQueueMessageSource(AMQPMessageConverter messageConverter) {
return new SpringAMQPMessageSource(messageConverter) {
@RabbitListener(queues = "${rabbitmq.queue-cache}")
@Override
public void onMessage(Message message, Channel channel) {
log.info("amqp event message received");
super.onMessage(message, channel);
}
};
}
}