Replaying events to AsynchronousCluster

Hello,

Just wondering if anyone has experience replaying events to AsynchronousCluster? It all seems to work "fine", but since our event handling is generally slower than reading from the event store (which is the whole reason to make it asynchronous), the replay will get ahead of the cluster and happily read the entire database of events into RAM and crash before the cluster catches up. Or at least, that's what it looks like to me because I can't see any kind of throttle or bounded BlockingQueue being configurable in AsynchronousCluster.

Maybe I'm misunderstanding AsynchronousCluster. Wondering: why does it use a custom scheduler instead of some kind of Disruptor/ring buffer (which would be bounded on the input side)? Is there a way to control the "batch size" for the async transactions?

As a workaround, was thinking of customizing ReplayingCluster and/or EventStoreManagement to subscribe to the cluster itself, periodically publish a fake "flush" event, and block until it sees it on the cluster. Haven't tried this yet -- was hoping for an opinion before customizing too much.

We've already customized EventStoreManagement to use multiple threads for reading, since DB2 scales almost linearly for reading from DOMAIN_EVENT_ENTRY. We can easily read 15k-20k events/second on our test hardware as opposed to 1,000/s for a single read thread -- reading is almost entirely limited by I/O and batch size/latency. We were hoping to treat this as kind of a streaming fork/join. We "join" before publishing to the cluster (or rather, calling the visitor) because we didn't want to reimplement all of the other nice features of AsynchronousCluster and ReplayingCluster, including SynchronousPerAggregatePolicy, ReplayAware, exception handling/retry, and backlogging. But obviously the other way to go in this issue for us is to totally customize a multithreaded replay stack. Our goal is to be able to replay millions of events in a reasonable amount of time. We don't have this many events in production yet but are exploring the limits before we do.

Much thanks,
-Peter

Hi Peter,

interesting one. I guess I never had to replay to an asynchronous cluster before. Usually, I use async clusters for things like email sending and 3rd party integration. Or there is a message bus (AMQP) that does the async parts, so that the cluster itself can by sync.

I guess the only thing you could to to (quickly) get around this, is extending the ReplayingCluster and make sure it throttles. Either use a Thread.sleep (ugly, sorry) to insert some delays, or monitor the queue size and only add items if the queue is smaller than a specific threshold.
Could you create an issue for this in the tracker? The next version of Axon should be able to throttle ‘automatically’.

I am currently working on a different Event Store implementation, which makes replaying much, much easier and faster. I am working on the details and expect to be able to send some information around during the next week.

Cheers,

Allard

OK!

I’ll post an update with my throttling solution later.

Actually, I may be able to contribute a patch to ReplayingCluster. Since EventProcessingMonitors were added in 2.1 it seems like ReplayingCluster should be able to do everything within itself to wait for batches of events to be processed, and that this improvement would work for everybody regardless of async.

As long as we’re talking about refactoring the EventStore and replay performance, I’d like to mention that my team added a new SEQUENCE column and an insert trigger to our DOMAIN_EVENT_ENTRY table so that our custom EventStoreManagement can query batches of events in order. Two issues with querying by timestamp:

  1. ORDER BY TIMESTAMP + maxResults leads to ridiculously bad performance (at least on DB2). On a table with a just few thousand events, we were reading ~70/s – with 99% overhead as each batch query scanned and sorted the entire table before returning the first N results. Performance would have been unspeakable with millions of events – didn’t even try. After switching to a query more like :min < SEQUENCE < :max (using min/max instead of Query.setMaxResults), we got 1000 events/s and scaled from there with more threads, and scaled to millions of events without blinking.
  2. We have multiple servers inserting events and clocks synchronization was a serious problem – events ordered by so-called “timestamp” weren’t necessarily ordered by sequence number within an aggregate. (Yes, this isn’t as much a problem for most folks as they’d only have a single server processing each aggregate.)
    So I hope your new solution will not query by timestamp!
    Our fork/join multithreaded EventStoreManagement, with which we got 15k/s+, also takes advantage of the SEQUENCE column to pre-query known chunks.
    (Our EventStoreManagement uses JDBC instead of JPA so we can query the new column.)

-Peter

Hi Peter,

it was exactly my idea to use the EventProcessingMonitors for this. All you’d have to do is count the number of unprocessed events.

I have tested the event replaying on MySql, and performance there is much better without the extra column. I didn’t test it on other database systems. Weird that there is such a big impact for db2. Mysql gave me about 2000 per second with a table of 30mln events.

The event store implementation I am working on uses insert-time ordering (the order you get them out is exactly the order you put them in), which is similar to the behavior you get with a sequence column in a database. Except that there are no indexes involved, making it much faster and is not impacted by the actual size of the event store.

Switching to jdbc for this seems a good thing. In JPA, if you have a db-generated identifier, there is an extra call to the db to fetch it, so that it can be inserted into the field in the object. Insert performance became much higher without that field. With plain jdbc, you don’t have that issue. I wonder if it is possible to order by a column in jql for which there no mapping.

Cheers,

Allard

I’ve built a custom solution now that uses a semaphore to count outstanding events, and block when there are too many. It also uses the semaphore to flush before/after replay. This is in a wrapper around ReplayingCluster (it also wraps the IncomingMessageHandler to get the necessary hooks). I’m thinking this can be easily refactored as a replacement of (or patch to) ReplayingCluster. But now I’m also wondering if this throttling/flushing functionality should go into AsynchronousCluster instead, since regardless of replay, it never seems like a good idea to allow too big a buffer to pile up. What do you think?

Hi Peter,

it sounds like a really good solution. Semaphore would be a good implementation to ensure a “backlog” doesn’t get too big. I also agree with your suggestion to implement it in the AsynchronousCluster. On that, we could set a configuration property which defines the maximum ‘backlog’ size. The semaphore can be used to guard that, by setting the number of available permits to the backlog size. The eventHandlingMonitor mechanism can be used to release permits, while the publish method will request permits (one for each event to be published).

Cheers,

Allard

Hi Peter,

I don't know if it can work for you but in Google Guava there is a nice class called RateLimiter.
http://docs.guava-libraries.googlecode.com/git/javadoc/com/google/common/util/concurrent/RateLimiter.html

Cheers,
Domenico

That was the approach I took. The algorithm is as dumb as hell, but it’s functional. Written in Groovy:

import groovy.util.logging.Slf4j
import org.axonframework.eventhandling.MultiplexingEventProcessingMonitor;

import java.lang.reflect.Field
import java.util.concurrent.Executor

import org.axonframework.domain.EventMessage
import org.axonframework.eventhandling.async.AsynchronousCluster
import org.axonframework.eventhandling.async.ErrorHandler
import org.axonframework.eventhandling.async.SequencingPolicy
import org.axonframework.unitofwork.TransactionManager
import org.axonframework.unitofwork.UnitOfWorkFactory

import com.google.common.util.concurrent.RateLimiter

@Slf4j
class RateLimitedCluster extends AsynchronousCluster {

Map superSchedulers

private limiter = RateLimiter.create(5000)

private UPPER = 2000

private LOWER = 10

private checkCounter = 0

public RateLimitedCluster(String identifier, Executor executor,
SequencingPolicy<? super EventMessage<?>> sequencingPolicy) {
super(identifier, executor, sequencingPolicy);
}

public RateLimitedCluster(String identifier, Executor executor,
TransactionManager transactionManager,
SequencingPolicy<? super EventMessage<?>> sequencingPolicy) {
super(identifier, executor, transactionManager, sequencingPolicy);
}

public RateLimitedCluster(String identifier, Executor executor,
TransactionManager transactionManager,
SequencingPolicy<? super EventMessage<?>> sequencingPolicy,
ErrorHandler errorHandler) {
super(identifier, executor, transactionManager, sequencingPolicy,
errorHandler);
}

public RateLimitedCluster(String identifier, Executor executor,
UnitOfWorkFactory unitOfWorkFactory,
SequencingPolicy<? super EventMessage<?>> sequencingPolicy,
ErrorHandler errorHandler) {
super(identifier, executor, unitOfWorkFactory, sequencingPolicy,
errorHandler);
}

@Override
protected void schedule(EventMessage<?> task, MultiplexingEventProcessingMonitor eventProcessingMonitor) {
if (!(++checkCounter % 1000)) {
def currentSize = currentSchedulers.size()
if (currentSize > UPPER) {
Double newRate = limiter.rate * (1 - ((currentSize - UPPER) / (2 * (currentSize + UPPER))))
log.debug “Reducing $name to $newRate, current size $currentSize”
limiter.rate = newRate
} else if (currentSize < LOWER && limiter.rate < 100000) {
Double newRate = limiter.rate * (1 + ((UPPER - currentSize) / UPPER))
log.debug “Increasing $name to $newRate, current size $currentSize”
limiter.rate = newRate
}
}
limiter.acquire()
super.schedule(task, eventProcessingMonitor);
}

private Map getCurrentSchedulers() {
if (superSchedulers == null) {
Field f = AsynchronousCluster.class.getDeclaredField(‘currentSchedulers’)
f.accessible = true
superSchedulers = f.get(this)
f.accessible = false
}
superSchedulers
}
}

Andy