Usage of QueryUpdateEmitter inside transactions

Hi folks,

I encountered an interesting problem during the implementation of a query model using a relational database as a storage. I used Spring-Data for persisting the projection and had to implement the support for Subscription Queries inside the projection. The implementation followed the documentation of using the QueryUpdateEmitter to notify the subscriptions about the changes received by the event handlers.

That is in general:

  @EventHandler
  override fun on(event: DataEntryUpdatedEvent, metaData: MetaData) {
    val savedEntity = dataEntryRepository.findByIdOrNull(event.entryId)
    val entity = if (savedEntity == null || savedEntity.lastModifiedDate < event.updateModification.time.toInstant()) {
      dataEntryRepository.save(
        event.toEntity()
      )
    } else {
      savedEntity
    }
   queryUpdateEmitter.emit(..., ..., entity.toUpdateMessage())
  }

The problem here is that the statement dataEntryRepository.save() is not instantly saving the entry, but just marks it to be saved… The actual save operation happens on transaction commit. The control of the transaction is performed outside of the EventHandler code (I believe in case of a tracking event processor, the processor opens a Unit of Work, starts a transaction and commits the transaction after all event handlers are processed).

As an effect, this means that the Java call of emitting an event notifies the query requesters of the subscription queries immediately - BEFORE the actual commit. I believe this is really evil, since the commit may fail (creating an inconsistency) or even if it doesn’t fail, it breaks the contract of subscription query to get the result either from the initial result or from the update (I reasoned about this and there is a time frame between the point in time the change is send in the update message and the point in time the result is flushed and would be available via initial result).

I observed this problem in the application we were building and found a workaround by registering a Spring TransactionSynchronization to avoid a (too early) send of the update messages.

Question

Do you think this problem is just in my configuration / usage, or does the problem exists in every transactional query model supporting subscription queries?

Proposal:

I believe I could supply a more general solution to this problem by making the QueryUpdateEmitter aware of the transaction or even Unit Of Work surrounding it. Are you interested in such an “extension” of the framework? It could be then performed in a way that you can configure the behavior of the query update emitter.

Here is the link to the issue of the project: Make sure the event updates are fired TX-aware · Issue #424 · holunda-io/camunda-bpm-taskpool · GitHub

Cheers,

Simon

1 Like

Hi Simon,

I’m not sure how you observed this issue. The SimpleQueryUpdateEmitter (currently the only implementation in use) will delay an emit() to the AFTER_COMMIT phase. This phase is only executed after a commit has been successfully processed. When rolling back, the emitted messages are discarded.

The only situation where this would not happen, is when there is no active Unit of Work. In that case, they emit immediately.

The relevant code can be found in org.axonframework.queryhandling.SimpleQueryUpdateEmitter#emit:

    @Override
    public <U> void emit(Predicate<SubscriptionQueryMessage<?, ?, U>> filter,
                         SubscriptionQueryUpdateMessage<U> update) {
        runOnAfterCommitOrNow(() -> doEmit(filter, intercept(update)));
    }
1 Like

Thank you Allard,
this is very interesting.

So I sum-up and re-phrase the question.

My assumption is wrong, you are Unit-of-work aware and there is no reason for any patch…

In the same time I needed to create a Spring-based TransactionSynchronization in order to make sure that the update happens correctly. I would like to understand the coupling between Spring Tx (used in Spring-Data JPA) and UoW then. I believe it is very hard do describe and debug…

In my project, I managed to create a failing test (simulating concurrent transactional modifications) showing the behavior, which I observed in the customer project.

I propose / would be happy to have a session, where I can demonstrate you what we are doing and what was failing… In my project I first used a in-memory projection but had to switch to a JPA projection because of the replay time on restart… So the only thing I changed was the implementation behind the projection API by exchanging the in-memory ConcurrentHashMap by CrudRepository.

By the way, I believe the problem with the subscription / data visibility is not timing, but transaction isolation. The problem could be observed constantly, not Heisenberg-ish sometimes…

Cheers,

Simon

Allard,

I executed some experiments. As a result:

  1. If I use TransactionlSynchronization and the projection is declaring propagation REQUIRES_NEW, the subscription query receives two messages the initial result of the query from DB and the update message from the event update during subscription.

  2. If I use TransactionlSynchronization and the projection is NOT declaring propagation, the subscription query receives ONE message - the initial result of the query from DB only. The second message is not delivered… In addition, I see the error in log reporting that the lock could not be set on a projection table.

I think it would be helpful to understand how the Spring transaction and UoW work together. So if there is the next “Ask me everything question, don’t be suprised I ask exactly this…”

Cheers and have a nice weekend,

Simon

Hi Simon,

essentially, the Unit of Work and a Transaction have nothing to do with each other. Though still, there is some interaction happening, indirectly.

The Unit of Work is Axon’s mechanism to ensure that the right actions are performed in the right lifecycle phase of processing a message. Some actions are performed before invoking the message’s handler, some after, and some depend on the outcome of that handler (i.e. exceptions vs regular result).

Some components in Axon will allow you to configure a TransactionManager. Essentially, what that will do, is start a transaction, and register a lifecycle handler on the COMMIT and ROLLBACK phases of the Unit of Work, to commit and roll back the transaction that was just created, respectively.

The QueryUpdateEmitter uses the same Unit of Work to register the publication of an emit() action to only happen in the AFTER_COMMIT phase. This essentially means that if a commit fails, or if a handler throws an exception even before a commit can be attempted, the emit() will be discarded.

Why the TransactionSynchronisation has an influence on this, is unclear to me. It most likely has nothing to do with the QueryUpdateEmitter itself. Are you doing any transaction-specific actions in the query update handler? If that’s the case, you may indeed need to manage transactions explicity. Since emit() is postponed till after commit, there is no active transaction at that point.

Also, normally, calls to QueryUpdateEmitter.emit() happen from an Event Handler method as part of the updating of a certain projection. Updating the projection means updating responses to queries, which in turn warrants a Query Update. In that case, you’d already have a transaction active to perform the update on the projection. I’m not sure why a (new) transaction would be needed to emit these updates.

Hi Allard,

I examined the code again and should confirm the statement of Sherlock Holmes: “When you have eliminated the impossible, whatever remains, however improbable, must be the truth”…

My problem had nothing to do with Tx at all, but the error in queryEmitter predicate caused the update message to be NOT delivered to the correct subscription. Essentially, the in-memory view was fast enough to deliver the result in the initial result (and not in updates), but the updates were also blocked there…

Now after implementing 20 unit testing making sure my predicates are correct, I fixed the initial issue, removed the TransactionSynchronization and everything seems to be fine…

By the way - testing this in an ITest is a mess. I need several threads (for updates and subscription) and now I invoke EventStore#publish to deliver messages in one thread and join the subscription in another…

Thank you and sorry for bothering,

Simon

I’m glad I was able to help you find the issue.

I guess you’ll have to find another question to ask during the next “ask me anything” :wink: .

2 Likes