Our first service is successfully handling events it produces and I can see them being published to axon-topic with a console-consumer for Kafka as our message broker of choice.
The second service fails to pick up any of these events and it’s not clear to me why this is so.
The code for the consumer service is fairly straight forward at the moment as such:
application.yml
axon:
kafka:
default-topic: axon-events
producer:
retries: 5
bootstrap-servers: localhost:9092
transaction-id-prefix: esstx
consumer:
group-id: external-sync-group
bootstrap-servers: localhost:9092
distributed:
enabled: true
serializer:
messages: xstream
eventhandling:
processors:
syncsaga:
mode: tracking
source: kafkaMessageSource
Configurations
@Configuration
class AxonFrameworkConfiguration {
@Autowired
fun registerInterceptors(commandBus: CommandBus) {
commandBus.registerDispatchInterceptor(BeanValidationInterceptor())
}
@Bean
fun snapshotterFactoryBean() = SpringAggregateSnapshotterFactoryBean()
@Bean
fun eventSchemaFactory(): EventTableFactory {
return PostgresEventTableFactory.INSTANCE
}
@Bean
fun eventSchema(): EventSchema {
return EventSchema()
}
@Bean
fun sagaSqlSchema(): SagaSqlSchema {
return PostgresSagaSqlSchema()
}
}
@Configuration
class EventHandlerConfiguration {
@Bean
fun syncPointCommandHandler(axonConfiguration: AxonConfiguration, eventBus: EventBus) = SyncPointHandler(eventBus)
}
Two separate attempts at an EventHandler
open class SyncPointHandler(private val eventBus: EventBus) {
@PostConstruct
fun successfulConstruction() {
println("SyncPointHandler Created")
}
@EventHandler
@AllowReplay(true)
fun handle(event: SyncHostRegistered, @SequenceNumber aggregateVersion: Long) {
println("Received EVENT ${event.aggregateIdentifier}")
}
}
@Component
@ProcessingGroup("query")
internal class RandomHandler() {
@EventHandler
@AllowReplay(true)
fun handle(event: SyncHostRegistered, @SequenceNumber aggregateVersion: Long) {
println("RandomHandler Received EVENT ${event.aggregateIdentifier}")
}
}
Any idea what mistake we’re making here?