Event snapshotting on multiple data sources

Hi.

I’m implementing a multiclient axon based backend, where every client has it’s own database schema. So every time a command is about to execute I select the correct schema using command bus interceptor and Spring AbstractRoutingDataSource facility:

public Object handle(Object command, UnitOfWork unitOfWork, InterceptorChain chain)
throws Throwable {

if ( command instanceof Commandable ) {
Commandable cmd = (Commandable) command;
if ( cmd.routingKey() != null ) {
RoutingKeySelector.select( cmd.routingKey() );
chain.proceed();
} else {
logger.warn(“Routing key cannot be null”);

}
}

return command;
}

Here is RoutingKeySelector class:

public class RoutingKeySelector {

private static final ThreadLocal router = new ThreadLocal();

public static void select(String routingKey) {
router.set( routingKey );
}

public static String routingKey() {
return router.get();
}
}
I can configure the event snapshotter and it works correctly when I don’t provide an executor instance, but according Axon documentation it’s not recommended to run the snapshotting process in the same thread. If I set the executor a new thread will get an empty copy of router attribute, and will not be able to select a datasource.

Any idea how could I implement this and run the snapshotter in a seperate thread?

Thanks

Hi,

the solution would be to make sure the Thread that runs the snapshotting process has the ThreadLocal in place.
The easiest way to achieve that is to wrap the Executor you inject with one that read the current threadlocal setting, and wraps the task to execute (the runnable) with a task that sets the threadlocal and then executes the wrapped task.

I noticed that you never remove the ThreadLocal value in your interceptor. Beware with that. When threads are pooled, you may execute something on a wrong datasource, accidentally.

Cheers,

Allard

Thanks Allard.

So you suggest to call router.remove() before setting the ThreadLocal value in the select method?

It’s better to remove the thread local value immediately after calling the method that needs the context information (such as the chain.proceed() call). This ensures that you cannot accidentally use that context for another process. It also prevents PermGen errors.

Just use the ThreadLocal like you would a File… always make sure the methods that open one, close it as well.

Cheers,

Allard

I’m not sure about the implementation part.

Saying I’m going to wrap the executor like this:

public class RoutingThreadTaskPoolExecutor extends ThreadPoolTaskExecutor {

// read RoutingKeySelector.routingKey() selector and set the value for
}

How can I wrap the actual task?
I saw in Axon sources the task that runs the snapshotting process is CreateSnapshotTask (and marked as final).

Thanks

Cheers

Nedo

Hi,

here’s a small example of what I mean:

public class ContextAwareExecutor implements Executor {

private final Executor delegate;

public ContextAwareExecutor(Executor delegate) {
this.delegate = delegate;
}

@Override
public void execute(final Runnable command) {
final MyContext context = MyContextHolder.getCurrentContext;
delegate.execute(new Runnable() {
@Override
public void run() {
try {
MyContextHolder.setContext(context);
command.run();
} finally {
MyContextHolder.clearContext();
}
}
});
}
}

Cheers,

Allard

That worked great =), except one thing. I had to dispose the ThreadLocal context in the command callback (onSuccess / onFailure). Removing it
afther chain.proceed() was causing context aware executor to get null value.

Thanks

Cheers

Nedo