Sequence number duplication

Hi all

Currently I’m building an event sourced service based on Axon Framework 3.1, running on a docker swarm. The service discovery mechanism is provided by Consul. The event store is a custom implementation based on Apache Cassandra. The aggregate repository is a CachingEventSourcingRepository and I’m using the DistributedCommandBus.

Using the Lightweight Transactions of Cassandra, I am actually able to detect a duplication on the “event entries” table. When I start the service with more than 1 node, I sometimes got a ConcurrencyException, meaning that the engine actually detected a duplication of the (aggregateIdentifier, sequenceNumber) pair.

From the documentation, it seems that the DistributedCommandBus can prevent suck kind of error, but I may be wrong. Anyway, what is the safest thing to do in this case? Can i safely retry to send the command? The Retry scheduler can be a solution?

Another question. Is there a way to control the generation of the sequence number for an event (a cassandra counter column)? Maybe it’s an old question, but I couldn’t find anything in the forum.

Thanks for any help

Franco

Hi Franco,

This duplicate key exception your experiencing is exactly why there is a retry mechanism available for a command, so I’d say yes, the ‘RetryScheduler’ is the solution here.

That the documentation of the DistributedCommandBus describes it prevents such an error, is because the default command routing mechanism (so which node will handle which command) ensures that a certain node will receive all the commands for a given aggregate.
The duplicate key exception probably occurs when you’ve got two/several different nodes which both handle commands for the same Aggregate. They can thus simultaneously try to insert an event with a sequenceNumber-aggregateIdentifier, upon which on of the nodes will fail because the other succeeded in appending that event to the store.

Lastly, we’re not servicing the control of sequence number generation, because it might allow users to adjust the above described behavior (which is desired, as it’s safer to let one node handle all the commands for a given aggregate) by setting up the sequence number generator to overcome that issue.

Hope this helps you out Franco.

Cheers,

Steven

Thank you

Actually, the problem was due to a “wrong” registration of the nodes in Consul. We’re using Docker Swarm, and so each service has its own VIP (actually the balancer of the service). Spring Cloud Consul registered each instance of the service using the VIP address.
This caused each service registered in Consul to have the same host/ip. The result of this was that each Axon node knew only the existence of a single node which IP is the VIP of the service.

The DistributedCommandBus correctly forwarded the command to a node using the VIP address, and then Swarm forwarded the command to a random node, breaking the affinity Aggregate/Node guaranteed by DIstributedCommandGateway.

To fix this, I updated the configuration of Spring Cloud using the directive:

cloud:
    inetutils:
       # useOnlySiteLocalInterfaces: true
       #  ignoredInterfaces:
       #     - flcloud_consul
       preferredNetworks:
               - 192.168
               - 10.0.0

in the bootstrap.yml file, where 10.0.0.* is the network of the services in the swarm.

I had to made some fixes to the SpringCloudHttpBackupCommandRouter and SpringCloudCommandRouter classes in order to let my service work correctly with Consul. The most important one is in the class SpringCloudHttpBackupCommandRouter, where I intercepted the exception thrown when a non-Axon node is asked the routing informations, in order to blacklist it.

private Optional<MessageRoutingInformation> requestMessageRoutingInformation( ServiceInstance serviceInstance) {

    SimpleMember<URI> simpleMember = buildSimpleMember(serviceInstance);
    if (simpleMember.local()) {
        return Optional.of(getLocalMessageRoutingInformation());
    }

    URI endpoint = simpleMember.getConnectionEndpoint(URI.class)
                               .orElseThrow(() -> new IllegalArgumentException(String.format(
                                       "No Connection Endpoint found in Member [%s] for protocol [%s] to send a " +
                                               "%s request to", simpleMember,
                                       URI.class, MessageRoutingInformation.class.getSimpleName()
                               )));
    URI destinationUri = buildURIForPath(endpoint, messageRoutingInformationEndpoint);

    try {
        log.debug( "Requesting routing information to {}", destinationUri );
        ResponseEntity<MessageRoutingInformation> responseEntity = restTemplate.exchange(destinationUri,
                                                                                         HttpMethod.GET,
                                                                                         HttpEntity.EMPTY,
                                                                                         MessageRoutingInformation.class);

        return responseEntity.hasBody() ? Optional.of(responseEntity.getBody()) : Optional.empty();
    }
    catch ( Exception e ) {
        log.warn( "Failed to contact endpoint {}. Ignoring it", destinationUri );
        return Optional.empty();
    }
}

Hi Franco,

That’s an interesting situation you’re describing, thanks for sharing!

And ah, I was already doubting whether to include a blacklist on an exception, should have put that in right away.

If you’ve got time to spare, would you mind putting in a PR for that addition?

Additionally, I’m also interested what changes you’ve made in the ‘SpringCloudCommandRouter’ in your situation.

Cheers,

Steven

It’s actually kinda weird situation and I struggled to come up with something usable :smiley:

I will surely put a PR request about that addition.

The other changes to SpringCloudCommanrRouter class are actually very specific for Consul and to my needs. It all seems to work well, and I can scale my service in the Swarm keeping the distribution of command consistent. Disclaimer: I didn’t perform a load test yet!

These are the changes:

  1. in the medhod updateMembership(HeartbeatEvent), I found that it’s better to manipulate the ConsistentHash directly, without creating a new instance for each update.
    @EventListener
    @SuppressWarnings("UnusedParameters")
    public void updateMemberships( HeartbeatEvent event ) {
//        AtomicReference<ConsistentHash> updatedConsistentHash = new AtomicReference<>( new ConsistentHash() );

        discoveryClient.getServices().stream()
                .map( discoveryClient::getInstances )
                .flatMap( Collection::stream )
                .filter( serviceInstanceFilter )
                .filter( this::ifNotBlackListed )
                .forEach( serviceInstance -> updateMembershipForServiceInstance( serviceInstance,
                        atomicConsistentHash ) );

//        atomicConsistentHash.set( updatedConsistentHash.get() );
    }
  1. in the SpringCloudHttpBackupCommandRouter class I overrode the updateMembershipForServiceInstance(), adding a check to blacklist the consul service directly
protected void updateMembershipForServiceInstance( ServiceInstance serviceInstance,
                                                   AtomicReference<ConsistentHash> atomicConsistentHash ) {
    SimpleMember<URI> simpleMember = buildSimpleMember( serviceInstance );

    if ( simpleMember.name().startsWith( consulPrefix ) ||
            simpleMember.name().startsWith( consulPrefix.toUpperCase() ) ) {
        log.debug(
                "Black listed ServiceInstance [{}] under host [{}] and port [{}]",
                serviceInstance.getServiceId(), serviceInstance.getHost(), serviceInstance.getPort()
        );
        blackListedServiceInstances.add( serviceInstance );
        return;
    }

    ...

As you can see, it’s all very far from being well engineered, but I had to make my service work very fast.

Hope it can help

p.s.
We’re using Cassandra as event store (we cannot use a DBRMS, nor MongoDB), and I have many doubt about my implementation of the TokenStore, but I’ll leave my question for another post, maybe :smiley:

Franco

Hi Franco,

Makes sense that it’s not in a ‘well engineered’ form as you put it, it’s just tailored for your needs; I’d have done the same.

Any how, thanks for sharing this!

Cheers,

Steven