Carry metadata from events into further commands

Hello Allard,

We have situations where event listeners spawn commands. This happens in “plain” event listeners and also in saga event listeners. Is there an easy way to associate the original metadata with the spawned commands so that we can map the entire user flow?

Do let us know.

Thanks!
Prem

We’re aware that we can include additional parameters annotated with @MetaData parameters on the event handler and pass them along. However, we’re looking for a less intrusive way to do this.

Thanks!

Hi,

on the SagaManager, you can set a CorrelationDataProvider. It defines (based on an incoming message) which meta-data to assign to outgoing messages.
For the regular @EventHandler, there is no other possibility that to use @MetaData annotated parameters (or the MetaData itself as Parameter).

Cheers,

Allard

Hi,

I, like Prem, also need to audit events that are processed by listeners and track the commands that are dispatched as a result. To do this, after looking at the CorrelationDataHolder, I’ve decided to set thread local data at the start of the listener thread that processes the event. My custom command gateway will grab the thread local data and add it as command metadata. My main concern right now is setting and clearing thread local data in the right place. I have the following code so far:

class TrackingEventProcessor extends EventProcessor {
public TrackingEventProcessor(Executor executor, ShutdownCallback shutDownCallback, ErrorHandler errorHandler,
UnitOfWorkFactory unitOfWorkFactory, Set eventListeners,
MultiplexingEventProcessingMonitor eventProcessingMonitor) {
super(executor, shutDownCallback, errorHandler, unitOfWorkFactory, eventListeners, eventProcessingMonitor);
}

@Override
protected ProcessingResult doHandle(EventMessage<?> event) {
try {
//BEST PLACE TO SET THREAD LOCAL?? (will probably end up using CorrelationDataHolder instead)
AuditingContext.setFlowId((String) event.getMetaData().get(AuditingContext.FLOW_ID));
AuditingContext.setOriginatorId(event.getIdentifier());
return super.doHandle(event);
} finally {
//BEST PLACE TO CLEAR THREAD LOCAL??
AuditingContext.clear();
}
}
}

class TrackingAsynchronousCluster extends AsynchronousCluster {
private Executor executor;
private UnitOfWorkFactory unitOfWorkFactory;
private ErrorHandler errorHandler;

public TrackingAsynchronousCluster(String name, Executor executor, UnitOfWorkFactory unitOfWorkFactory,
SequencingPolicy<? super EventMessage<?>> sequencingPolicy, ErrorHandler errorHandler) {
super(name, executor, unitOfWorkFactory, sequencingPolicy, errorHandler);
this.executor = executor;
this.unitOfWorkFactory = unitOfWorkFactory;
this.errorHandler = errorHandler;
}

@Override
protected EventProcessor newProcessingScheduler(
EventProcessor.ShutdownCallback shutDownCallback, Set eventListeners,
MultiplexingEventProcessingMonitor eventProcessingMonitor) {

return new TrackingEventProcessor(executor,
shutDownCallback,
errorHandler,
unitOfWorkFactory,
eventListeners,
eventProcessingMonitor);
}
}

Does this look right? Is there anything I’m missing here?

Hi Kirmanie,

your approach seems to be correct. The doHandle method is invoked in the thread that will actually do the invocation of the EventHandler. So setting the thread local right before, and clearing it afterwards, is the correct approach.

It might be worth noting that the Axon 3 API will make correlating commands and events a lot simpler. It’s unfortunate that it currently requires that much code to attach correlation information.

Kind regards,

Allard

Hello Allard,

Has this been resolved in Axon 3? We have a similar requirement of attaching a correlation data in the commands/events flowing through our system.

Thanks!

Hi Payal,

Yes, the metadata, or what you’re more specifically looking for the correlation id, is automatically carried over for you from command to event.

Additionally, the normal config of an Axon 3 application will already wire a CorrelationDataProvider for you in an MessageHandlerInterceptor.

You’ll thus not necessarily have to wire your own, although you can always add more CorrelationDataProvider to the CorrelationDataInterceptor if you want.

Hope this helps!

Cheers,

Steven