Allow only one command to execute and queue up other commands until the execution is completed

Hello Team,

Do we have any recommended approach or configuration to allow only one command to execute and queue up other commands until the execution is completed, usable with async command bus.


Kartik

Hi Kartik,

we had a similar requirement to be able to create a thread safe implementation of the set base consistency validation mentioned in this blogpost → Set Based Consistency Validation. The problem was, that this solution is not safe to use when commands are processed in parallel. We also didn’t want to limit the handling of commands to a single thread. Only specific commands should be synchronized.

We achieved this by creating a wrapper for the command gateway, that acts on commands with a special annotation.

// commands annotated with this will be processed synchronized
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface SyncedCommand {

}
public class SyncableCommandGateway implements CommandGateway {

  public static final Class<SyncableCommandGateway> LOCK = SyncableCommandGateway.class;

  public static final String SEND_SYNCED_EXCEPTION_MESSAGE = String.format(
      "A command annotated by @%s can only be used with sendAndWait", SyncedCommand.class.getSimpleName());

  private final DefaultCommandGateway defaultCommandGateway;

  @Builder
  public SyncableCommandGateway(CommandBus commandBus) {
    this.defaultCommandGateway = DefaultCommandGateway.builder()
        .commandBus(commandBus)
        .build();
  }

  @Override
  public <C, R> void send(C command, CommandCallback<? super C, ? super R> callback) {
    if (commandIsToBeSynced(command)) {
      throw new RuntimeException(SEND_SYNCED_EXCEPTION_MESSAGE);
    }
    defaultCommandGateway.send(command, callback);
  }

  @Override
  public <R> CompletableFuture<R> send(Object command) {
    if (commandIsToBeSynced(command)) {
      throw new RuntimeException(SEND_SYNCED_EXCEPTION_MESSAGE);
    }
    return defaultCommandGateway.send(command);
  }

  @Override
  public <R> R sendAndWait(Object command) {
    if (commandIsToBeSynced(command)) {
      synchronized (LOCK) {
        return defaultCommandGateway.sendAndWait(command);
      }
    } else {
      return defaultCommandGateway.sendAndWait(command);
    }
  }

  @Override
  public <R> R sendAndWait(Object command, long timeout, TimeUnit unit) {
    if (commandIsToBeSynced(command)) {
      synchronized (LOCK) {
        return defaultCommandGateway.sendAndWait(command, timeout, unit);
      }
    } else {
      return defaultCommandGateway.sendAndWait(command, timeout, unit);
    }
  }

  @Override
  public Registration registerDispatchInterceptor(
      MessageDispatchInterceptor<? super CommandMessage<?>> dispatchInterceptor) {
    return defaultCommandGateway.registerDispatchInterceptor(dispatchInterceptor);
  }

  private <C> boolean commandIsToBeSynced(C command) {
    return GenericCommandMessage.class.equals(command.getClass()) ?
        ((GenericCommandMessage<?>) command).getPayloadType().isAnnotationPresent(SyncedCommand.class)
        : command.getClass().isAnnotationPresent(SyncedCommand.class);
  }
}

This solution only works only with the sendAndWait function of the command gateway, because the send function is non blocking. Because of this restriction, wrong usage can only be detected at runtime (see the runtime exception that is thrown in the send function). As there should be some kind of tests anyway, this has not been a real problem for us.

Hope this helps and i am also curious about other solutions to achieve this.


Günter