Axon-server-connector for server 5

Hi,

Just watched the “No Aggregates, No Problem: Axon Framework 5 with DCB” stream by Steven which was a great session and I just had to try the university demo connected to axon server5 with dcb enabled.

Unfortunately, I can’t find the required axon-server-connector. In the demo version 5.0.0-SNAPSHOT is used, but that seems not to be publicly available. Also for the other dependencies I can only find 5.0.0-M1 but not the axon-server-connector.

I know it’s all Milestones but I really like to experiment with it. Can you please provide a link to the required libs?

Thanks a lot,

Frank

1 Like

Firstly, thanks for joining the stream!

Secondly, I can definitely help you with getting the snapshots in place.
You’d need to make sure that your dependency management is set to consume snapshots, as described here.
From there, you should be able to get the axonserver-connector-java 2025.0.0-SNAPSHOT dependency (as shown here in the index).

However, you can do the same for the Axon Framework 5.0.0-SNAPSHOT version, of course (which can be found here in the index). The AF5 snapshot version should pull in the Axon Server connector version automatically.

Note that you will be on the bleeding edge, of course! Although we aim for a green build, sometimes minor things do pass through the cracks.

With that said, I am looking forward to hear what you think about AF5 and DCB, @Franky!

Thanks for your response Steven,

I mixed up the

<dependency>
            <groupId>io.axoniq</groupId>
            <artifactId>axonserver-connector-java</artifactId>
            <version>2025.0.0-SNAPSHOT</version>
        </dependency>

with the expected

  <dependency>
            <groupId>org.axonframework</groupId>
            <artifactId>axon-server-connector</artifactId>
            <version>${axon.version}</version>
        </dependency>

So my dependency issue is solved now.

In your live coding you used a TestConverter when configuring the AxonServer connection, I’ve also created a dummy converter so at least it compiles. Should that be good enough?

The next issue I run into is the CreateCourseCommandHandler which implements a StatefulCommandHandler. It looks like the signature of the handle method changed a bit, so I can’t create a correct CommandResultMessage in there. Can you provide some guidance on how to that?

Almost there :slight_smile:

Regards,

Frank

Yep, that’s fine! As stated during the stream, we’re working on the new Converters. These will replace the old Serializer. This is stuff you get when being on the bleeding edge :wink:

Not sure what you’re exactly stuck with here to be honest.
So let me share a bunch of things.

The StatefulCommandHandler interface:

@FunctionalInterface
public interface StatefulCommandHandler extends MessageHandler {

    @Nonnull
    MessageStream.Single<CommandResultMessage<?>> handle(@Nonnull CommandMessage<?> command,
                                                         @Nonnull StateManager state,
                                                         @Nonnull ProcessingContext context);
}

To construct a MessageStream with only a single response, you can use MessageStream#just. There are several static construction methods on the MessageStream interface by the way. Feel free to pick something else if that better suits your sample. If you don’t want to return anything, you can do a lazy MessageStream.empty().cast(). This makes an empty MessageStream, which is cast to a single version of a CommandResultMessage. This sufficiently satisfies Axon Framework, and it can deal with this correctly as well.

For all the Messages, we called the static factory methods. This has to do with the new MessageType (combining a (domain) type and version), that we cannot assume is always based on a Class. Thus, you would need to use the constructor of the GenericCommandResultMessage. The simplest constructor there is the following:

public GenericCommandResultMessage(@Nonnull MessageType type,
                                   @Nullable R commandResult) {
    ....
}

The commandResult is whatever you want to return from the handler. The MessageType is the aforementioned combination of a domain type of the message and a version. The MessageType has numerous constructors to give you different flavors of a MessageType, all documented clearly with JavaDoc. Again, take your pick!

Thanks Steven,

That resolved the issue. However, I find the non-blocking approach less intuitive in terms of readability and understanding what’s actually required.

In my projects, I typically define a CommandResponse class that contains either the aggregate ID with version or validation errors. Using Kotlin’s Either from the Arrow framework makes this pattern quite elegant. While these are implementation details, I think some convenience response types could help, especially since I prefer communicating state through events rather than expanding command response data.

Regarding the response handling: what’s the recommended approach for determining the version after appending events to the EventAppender?

Here’s my current handler implementation, which required signature changes when upgrading to version 5.0.0-SNAPSHOT:

@Override
@Nonnull
public MessageStream.Single<CommandResultMessage<?>> handle(
        @Nonnull CommandMessage<?> command,
        @Nonnull StateManager state,
        @Nonnull ProcessingContext context
) {
    var payload = (CreateCourse) command.getPayload();
    var eventAppender = EventAppender.forContext(context, eventSink, messageTypeResolver);
    
    CompletableFuture<CommandResultMessage<?>> result = state
            .loadEntity(State.class, payload.courseId(), context)
            .thenCompose(entity -> {
                var events = decide(payload, entity);
                eventAppender.append(events);
                return CompletableFuture.completedFuture(createSuccessResult(payload, "1"));
            });
    
    return MessageStream.fromFuture(result);
}

private CommandResultMessage<?> createSuccessResult(CreateCourse payload, String version) {
    return new GenericCommandResultMessage<>(
            new MessageType(CreateCourse.class, version),
            new CommandResult(payload.courseId().toString())
    );
}

My main question is about properly determining the version after event appending - currently I’m hardcoding “1”.

Regards,

Frank

Use of the MessageStream

Thanks for you two cents there, Frank!

When it comes to the base interface, which you are essentially touching with milestone one, I don’t think we can provide it any other way. We require a solution that supports both imperative and reactive programming styles, for which some form of wrapper, in this case the MessageStream, is our chosen solution.

Do note that you’d only touch this flow if you are using the “barest of bones” of Axon Framework’s configuration. We most definitely will provide more straightforward solutions, allowing you to utilize what you’ve been used to in Axon Framework 4.

Milestone 2 will bring back the annotation-based solution in full, for example. The annotation-based solution does not expect you to return a MessageStream at all. From there, I do expect us to add more flavors in the future.

If anything, this feedback is valuable. I do hope you try out future (milestone) releases and keep telling us what you think :pray:


New Question

As we are essentially killing the aggregate, the idea of the aggregate version will be removed as well. The closest thing aligning with this, is the consistency marker. The consistency marker is returned (only) when sourcing a stream of events from a set of tags. In an Event Store implementation like Axon Server, the consistency marker represents the globalIndex of the last event that was used to form your consistency boundary (hence consistency marker).

The Event Store returns this consistency marker, so that it can be used while appending events. Appending events represent the “decisions that have been made” by the command handler. And to append those, you need to tell the event store “what you have read.”

As you may note, this is not a one-on-one mapping of the aggregate version. The aggregate version represented the aggregate sequenceNumber that was stored in the event store. But, since there are no aggregate streams, there are no sequence numbers either. Thus, the solution is to take this marker instead.

It may take a little getting used to, as it’s part of the mind shift.

You can, if needed, retrieve the consistency marker from the ProcessingContext. In milestone one, you can do that by doing the following:

ProcessingContext context ...
ConsistencyMarker marker = context.getResource(ConsistencyMarker.RESOURCE_KEY);