Re-send outgoing messages (over AMQP)

Hi,

I use Axon in several micro-services at my customer and got everything working as expected. Love it!

In our product microservice we receive “external” messages over AMQP. the user can alter and enrich the products in the user interface and when the user is done he/she can publish the products to the outside world. This triggers an outgoing message over AMQP so the other microservices are informed.

Now we have this (architectural) requirement that it should be possible to resend the outgoing messages on request. Not all messages, only the latest state related to a unique identifier. The format of the outgoing message should be the current format, so old messages should be transformed to the new message structure. We well have less than thousand products in the system.

So I will implement a rest endpoint from where the resending can be triggered. I’am wondering what is the best approach to accomplish the actual resending. The most valuable solutions in my opinion are:

  1. Just query the view model to get the latest state for each product and in a big loop send out all products over AMQP again;
  2. Create a tracking event processor, which has some kind of “enabled” flag which will be disabled by default. When it is enabled due to a request to the rest endpoint, the tracking token will be reset and it should receive the message from the evenstore in a reversed order and keep track of the unique id’s from the products which are send out. This way only the latests version of each product will be resend. When this tracking event process “isCaughtUp” with the events in the event store it will be disabled again.
  3. Somehow create dynamically instantiate a tracking event processor and trigger the processing as described in 2.
    Which approach should I take, one of the above or is there a better alternative?

Regards,

Frank

Hi Frank,

I think I get the gist of what you’ve been tasked with.
Create a Query Model which has the latest Product state, that can be ‘resend’.

If it is resend, the Product states should comply to the latest format of the Query Model.

Especially the last bit makes it sound to me like you need to be able to update your query models through issuing a replay, thus pointing towards the usage of a Tracking Event Processor.
Regardless, here are my thoughts on what you’ve shared as options:

  1. I think this is the quickest, most pragmatic approach out there. I assume you already have the models, so why not just query them once you need them?
  2. The problem with this is that you want to receive the events in a reversed order. That would require quite some custom work, mostly around providing a StreamableMessageSource for your TrackingEventProcessor which traverses the other way around.
  3. Equally tough as option 2, because you are asking for the reversed event order.

If you can find a way around the reversed event order requirement, I think you can highly benefit from the start(), shutdown() and resetTokens() operations on the TrackingEventProcessor.

Through those, you can, once the TEP has started, pause it immediately through the shutdown() operation.
Once the request to send the latest products is arrived, you could reset the processor with the resetTokens() operation (note, this allows the injecting of a Tracking Token to specify at which point you want it to reset) and then start it up again with the start() function. Then, once it is done, you can simply shut it down to not be used int he regular process.

So, if the reverse-event-order is something you can work around, I’d say option 3 using the suggested methods makes the most sense.
Otherwise, I’d first check out using option 1 in a PoC to see what it feels/looks like.

That’s my 2 cents.

Cheers,
Steven

Thanks Steven for your reply. We will indeed go for option 1 and might create a specific query model for this purpose.

Regards,

Frank