Events not being triggered in a separate service

Our first service is successfully handling events it produces and I can see them being published to axon-topic with a console-consumer for Kafka as our message broker of choice.
The second service fails to pick up any of these events and it’s not clear to me why this is so.

The code for the consumer service is fairly straight forward at the moment as such:

application.yml

axon:
  kafka:
    default-topic: axon-events
    producer:
      retries: 5
      bootstrap-servers: localhost:9092
      transaction-id-prefix: esstx
    consumer:
      group-id: external-sync-group
      bootstrap-servers: localhost:9092
  distributed:
    enabled: true
  serializer:
    messages: xstream
  eventhandling:
    processors:
      syncsaga:
        mode: tracking
        source: kafkaMessageSource

Configurations

@Configuration
class AxonFrameworkConfiguration {
    @Autowired
    fun registerInterceptors(commandBus: CommandBus) {
        commandBus.registerDispatchInterceptor(BeanValidationInterceptor())
    }

    @Bean
    fun snapshotterFactoryBean() = SpringAggregateSnapshotterFactoryBean()

    @Bean
    fun eventSchemaFactory(): EventTableFactory {
        return PostgresEventTableFactory.INSTANCE
    }

    @Bean
    fun eventSchema(): EventSchema {
        return EventSchema()
    }

    @Bean
    fun sagaSqlSchema(): SagaSqlSchema {
        return PostgresSagaSqlSchema()
    }
}

@Configuration
class EventHandlerConfiguration {
    @Bean
    fun syncPointCommandHandler(axonConfiguration: AxonConfiguration, eventBus: EventBus) = SyncPointHandler(eventBus)
}

Two separate attempts at an EventHandler

open class SyncPointHandler(private val eventBus: EventBus) {
    @PostConstruct
    fun successfulConstruction() {
        println("SyncPointHandler Created")
    }

    @EventHandler
    @AllowReplay(true)
    fun handle(event: SyncHostRegistered, @SequenceNumber aggregateVersion: Long) {
        println("Received EVENT ${event.aggregateIdentifier}")
    }
}

@Component
@ProcessingGroup("query")
internal class RandomHandler() {

    @EventHandler
    @AllowReplay(true)
    fun handle(event: SyncHostRegistered, @SequenceNumber aggregateVersion: Long) {
        println("RandomHandler Received EVENT ${event.aggregateIdentifier}")
    }
}

Any idea what mistake we’re making here?

I wanted to provide some additional information that might be helpful in resolving my problem after re-reading the documentation on event processors (which is still a bit vague to me).

Service A has a sends the event SyncHostRegistered via an apply() to the SyncHostAggregate which is located in the following package:

package com.aramark.ess.command.domain.aggregates

Service B which is attempting to listen for this event is currently part of a package

package com.aramark.efcs

Hi,

the thing that matters when publishing events, is how they are serialized and then deserialized. By default, Axon uses the fully qualified class name in the serialized format. This means the recipient must have them in the same package as the sender (basically treating the package as a namespace). If the recipient has the event classes in a different package, it won’t be able to read them.

The location of your aggregate and event handlers doesn’t matter.

I hope this clarifies things a bit, and that it helps you in resolving the issue.
Cheers,

Allard

Still having a lot of difficulty with this.
I’ve taken the events and their dependent classes and placed them into a separate library jar, then removed all references to the previous events from both services. Reestablished new dependencies and attempted the same again.
The problem symptoms have not changed at all. I’ve attempted triggering the event from Service B to A and A to B. Only the service that fires the event is able to handle the event.

As an additional update I’ve ALSO attempted to switch serialization to jackson so the messages where they come through as simple json and created brand new events. The behavior continues to be the same.

My problem feels like one of misconfiguration

I’ve reproduced the problem in a sample project here: https://github.com/mishkaechoes/axon-demo
There’s a dependency on postgres and kafka as per application.yml of service_a and service_b

Hi,

it looks like you’re missing configuration for Event Processing, which tells Axon you want to consume messages from Kafka.
In Axon, handlers are assigned to “processing groups”. Be default, handlers are assigned to the group whose name is the package name of the event handler class.
These groups are then assigned to processors (tracking or subscribing). Finally, a processor uses a source of messages, which is by default the Event Bus (or Event Store, if present). In your case, you want to use Kafka as a source.

In Spring, you can define an @Autowired method taking EventProcessingConfiguration as a parameter. This class allows you to configure how you want event processing to be done.

Hope this helps you further.
Cheers,

Allard

Thank you we’re up and running now!
This was the statement that hinted me towards the answer: “handlers are assigned to the group whose name is the package name of the event handler class”
Personally I think the documentation could emphasize this a bit more or in a more obvious way with code samples.

The last bit that I’m a little shaky on are Kafka Consumer Groups. Does each service need to belong to it’s own consumer group in order to receive the same Event?

This is more related to Apache Kafka then Axon itself. Putting Kafka in the ‘middle’ to distribute and orchestrate your ‘domain’ events requires deep understanding of the tool (Kafka).#####
Order of events (kafka topics & partitions)

The order of events matters. When using Kafka, you can preserve the order of those events by putting them all in the same Kafka partition. They must be in the same Kafka topic because different topics mean different partitions.

Queue vs publish-subscribe (kafka groups)

If all consumers are from the same group, the Kafka model functions as a traditional message queue would. All the records and processing is then load balanced. Each message would be consumed by one consumer of the group only. Each partition is connected to at most one consumer from a group.

When multiple consumer groups exist, the flow of the data consumption model aligns with the traditional publish-subscribe model. The messages are broadcast to all consumer groups.

The answer is Yes.

Please note that you can only distribute ‘events’ with Kafka, and not commands and queries. You can distribute commands with JGroups or Spring Cloud, but you can not distribute queries out of the box. This is one of the reasons Axon created Axon Server (Hub), so you don’t have to configure and maintain ‘three’ different types of message orchestrations which can be hard and error prone.

Best,
Ivan

Technically an event and a command are just messages. Why it shouldn’t be possible to distribute them using Kafka?

Hi Marinko,

You can use Kafka to distribute events out of the box with Axon. Distribution of commands or queries with Kafka is not provided by Axon ‘out of the box’, so you have to make some effort to implement this. Please note that you can distribute commands (command bus) with JGroups or Spring Cloud out of the box.

Best,
Ivan