Using axonframework / event sourcing for offline synchronisation

Hi,

we are currently planning to use axonframework/event sourcing in a project. One project feature will be the ability to do offline editing (via mobile app) and synchronisation.So I wonder if the framework could help me to implement this.

I could imagine that the app uses the same backend code that the central service uses. During synchronisation, all events from the central server are loaded into the app’s event store. If one edits things offline, all those events are marked as “offline events” and stored in the app’s event store. When the user synchronized next time, all “offline events” are applied to the server’s event store (merging/solving conflicts). Then the app’s event store is replaced by the events from the server.

Do you think this could be a usable approach? Did anyone already do this? Is there maybe an example to do such?

Kind regards,
Michael Wyraz.

Hi Michael,

Your question is very interesting.

Assuming multiple devices can modify the same state in a multi-master scenario, I do not think event sourcing is the right tool for the job in this instance. By definition, an aggregate is responsible for its integrity. Considering the decentralized nature of your application, multiple “disconnected” aggregates which are supposed to represent the same aggregate could (and will !) produce a conflicting event stream.

The event stream is THE source of truth. Conflict resolution has to happen before events are committed. I think the only way to satisfactorily solve this issue is to store a pipeline of commands that can then be submitted to a centralized server, which would then handle conflict resolution.

Merging event streams created asynchronously works if the following conditions are present:

  1. At any one time, (a) only one device/client can modify an aggregate and this needs to be enforced extremely aggressively or (b) you are willing to create a conflict resolution layer which rewrites the incoming stream and append it to the event store once it is processed for conflicts. Similarly, an integration layer will have to be written for the client, with the possibility to disregard the read model (up to the branching point) and replay the event stream from the branching point.

The (a) solution is very difficult to implement and supposes that your problem domain makes it possible to completely segregate ALL the aggregates that are modified by the mobile client, which means that your problem domain has no cooperative component. Event if it is the case, I know of no easy mechanism to enforce this, unless you control the device hardware, which does not completely eliminate the risk of having two devices modify the same aggregate separately. It should also be noted that one of the main reasons to use CQRS is that it makes complex interactions between contexts easier (through saga, for example). If your problem domain has no cooperative component, CQRS may not be the right tool in the first place.

The (b) solution is also very difficult to implement. It supposes you accept to do away with one of the core principles of DDD (and hence, CQRS), which is that the transactional boundary is the aggregate, and that the aggregate root is responsible for ensuring consistency within this boundary. It also violates one of the most interesting property of the event stream, which is to be append-only. It also supposes you write an integration layer and design your read model to be rewindable up to a given point in time, or that you accept to completely delete the read model when it is necessary. This is not impossible, but certainly not trivial at all. It also assumes that all the side effects of events are reversible. This is certain to cause multiple problems if you deal with hard to reverse side-effects, such as financial transactions or really anything that has to deal with external services.

  1. The integrity of the data can be protected up to the level which is required by your requirements on the mobile device. You are ready to accept that someone may temper with the event stream and send you crap, or you are ready to write a verification layer to ensure that, in which case events will need to be “unapplied” on the mobile device.

  2. Your server code and object model is forever backward compatible, considering old versions of the client will always be running and nothing can force an offline device to update its software version and stop producing new events until the update occurs (using pre-set mandatory time

  3. You are willing to accept that old code can and will, forever, execute and create events without possibly respecting newly established contracts OR you are willing to accept that certain events may be rejected, in which case they will need to be reverted on the client. You will need to create handlers for every event that results in a modification of the read model on the client to revert the changes (i.e. compensating actions) made on the read side.

  4. Other stuff that I have not thought about.

I could go on and on about all the necessary assumptions, trade-offs and headaches you will have to endure to solve your offline problem by merging event streams. Globally, it doesn’t seem like a sound approach.

Maybe a command queue that is purged once the client goes back online is more appropriate ?

Hi Nick,

thank you a lot for that fast and long answer. I go with you that trying to merge two event streams will cause some big trouble in case of conflicts.
How about that command queue? Would this mean that the client stores commands rather than events and replay them to the server during sync? How could such a system detect conflicts (e.g. two persons edit the same field offline)?

Is it possible with axonframework to record a stream of commands rather than events? Should I choose the same design for my central server or can I mix “command sourcing” on the client with “event sourcing” on the server?

Kind regards,
Michael.

HI Michael,

I do believe CQRS is very approriate princple for these “occasionally connected clients”. In fact, Event Sourcing will help you here when merging activity between a synchronizing client and the server.

It’s important to realize that the server’s Event Stream always holds the truth. A client can synchronize a certain part of that stream (typically a few aggregates) so that it can work offline. While working offline, it uses a local query model and a portion of the server’s event stream (or a single snapshot event). When executing commands, these commands are stored locally, then executed and resulting events are stored and processed.

When the client reconnects, it should synchronize with the server. It really depends on your problem domain on how much trouble you can encounter here. Typically, synchronization will involve conflicts that may require user interaction.
During synchronization, you will need to compare the locally applied events with the events applied on the server. If there are no events on the server, that’s easy. Simply send all the stored commands to the server, and you’re done.
Let’s assume there are changes. What you would then do, is compare the locally applied events with the events on the server. If there are conflicts, you can ask the user how to resolve them. They are resolved by changing (or adding) the commands staged on the client. When all issues are resolved, the commands are sent to the server for processing. In the end, as Nicholas pointed out, the server is responsible for maintaining aggregate state. So don’t append events directly to the event store.

I haven’t built an occasionally connected system myself, yet, but have heard of others who have. If I recall correctly, Greg Young has also done this in one of his projects. You might want to ask him (or the ddd/cqrs mailinglist where he is active) for some ideas.

Cheers,

Allard

Hi Allard,

I think we can solve the conflicts easily. The client will have a restricted subset of commands, basically adding new aggregates or adding information to aggregates. In cases where the client may edit existing properties, those who synchonizes last will simply overwrite the current state. Maybe a message would be generated so that a manual review is possible.
But I wonder if it is even possible to detect such conflicts. The command would be “UpdateTitleCommand” and would set a new title. It would get executed and the aggregate will have a new title then - regardless of weather the title was changed on the server before or not.

One question I have to deal with is the generation of aggregate identifiers. The main identifier could be some kind of UUID so that there are no conflicts. But the aggregates will also have a human-readable identifiert (5 digit sequential number). I think of having a service that generates such numbers which will create a new number when run on the server and will return NULL on the client - so that this identifier is only created when the command is executed on the server.

HI Michael,

detecting conflicts isn’t too hard. If you see a “TItleModifiedEvent” on the server, and one generated on the client as well, you know there is a conflict. Unless they have both chosen the same title ;-).

As for the aggregate identifiers, I think using UUID’s is the safest way to go. For the sequential numbers, generate those on the server (using a saga-like construction, for example). This means a disconnected client would only have sequence numbers for aggregates once they have synchronized.

Cheers,

Allard

Hi Michael,

I wrote an inspection system that has been in production for four years that uses ocassionally connected laptops. It is still using Axon 1.4 so it doesn’t have a lot of the features in Axon 2 that would make this job easier. These are process steps that I took:

  • all inspections belonging to the user have the events copied from the server event store to a database on the laptop,
  • any inspection not belonging to the user can be checked out and those events are stored locally,
  • the events are replayed locally to build the views
  • a creation or update to an inspection is done locally (and this is where the code forks depending on if the user is online or offline)

Offline Mode:

  • commands are applied as usual with Axon to the local event store,
  • commands are serialized and stored in the database,
  • commands contain the aggregate version sequence,

when the client returns to online mode the code continues as:

  • inspections belonging to the user/checked out are copied to the local database,
  • all persisted commands are replayed locally to resolve any conflicts,
  • changes to commands to resolve conflicts replace the commands that have already been persisted,
  • commands are sent to the server to be played

Online Mode:

  • commands are sent to the server,
  • server persists events
  • server sends the updated event stream to the client to be stored locally

Both the client and server code share the aggregate API and core. They both run Axon and store events. I guess the best real-world example of this would be Git. You resolve your conflicts locally before you push to a central location.

When the officer is offline and creates a new inspection, that inspection may be printed and handed out to their client. It requires a human readable number that can referenced in payment, court, etc. The number is the initiating officer’s badge number plus the date and time, ie: 566-20140414-0843. The officer’s badge number ensures uniqueness between all officers. The argument of “we know how many inspections were written in a given year by the last inspection number” was resolved by having a counter displayed on the dashboard.

I hope this helps.

Randy.

Hi Randy,

thank you a lot. This gives me a good idea how such an application could work.
How did you do command serialization? Could you integrate this with axonframework or did you do this outside of the framework (e.g. within each command handler method)?

I try to build a minimal prototype of my application these days to see if I can get it work.

Michael.

Hi,

our application is now working and I’m implementing synchronization. I’m going to do it the following way:

  • all events are copied from the server to the client on first synchronisation

  • when the client is offline, all commands are applied locally and stored

  • when going online again, all commands are sent to the server, then all events from the server are copied to the client (replacing existing events there).

What Is a good place to catch all commands to store them? I’m going to use CommandHandlerInterceptor here.

What would be a good approach to copy/replace events and start a replay? I can do it directly in the database but maybe it can be done directly using the repositories?

Kind regards,
Michael.

Hi Michael,

you could use another table for the locally applied events, so that you can easily remove them. You may also need to rebuild the query models locally.

Just wondering, are you copying the whole application state locally, or are you selecting specific aggregates (and related read models) to synchronize? If you do so, theoretically, you only need to sync the read models and snapshots of the current aggregate state locally. You may not need any history of your aggregates.

Cheers,

Allard

Sounds like a good idea. Do you have any hint how I could achive that the events are stored in different tables depending on their source? Probably I’d have to modify EventSourcingRepository in some way. My requirement just says that the current state of some aggregates has to be on the client and that all changes from the client needs to be synchronized to the server. To keep it simply, I’m going to copy the whole event stream from server to client with the opportunity to optimize it later in the way you described. Actually on the client the same (web-) application will run as on the server. There is a configuration setting telling it that it runs on the client which changes some things: - only commands for certain aggregates are accepted (using an interceptor). The GUI will reflect this restrictions - all accepted commands will be serialized to en extra database table - synchronisation will be enabled At the moment I try to send the server’s events to the client and have the following problem. I visit all events on the server and send them as Event-Stream to the client. There I’d like to add it to the event store. My problem is that saving events to the event store requires a “type” parameter which is not available in the EventVisitor. Do you have an idea how to solve this? Otherwise I would force the client and the server to use JpaEventStore and directly access/copy the stored entities. Kind regards, Michael.

I wrote the following code the synchronize all server events to the client:

     public void synchronizeWith(ISyncServer syncServer) throws Exception
     {
         final EntityManager entityManager=axonEntityManagerProvider.getEntityManager();
         TransactionStatus tx=axonTransactionManager.startTransaction();

         try
         {
             entityManager.createQuery("delete from org.axonframework.eventstore.jpa.DomainEventEntry").executeUpdate();
             entityManager.createQuery("delete from org.axonframework.eventstore.jpa.SnapshotEventEntry").executeUpdate();

             syncServer.fetchEvents(new IEntityVisitor<DomainEventEntry>() {
                 @Override
                 public boolean handleEntity(DomainEventEntry entity)
                 {
                     entityManager.persist(entity);
                     return true;
                 }
             });

             entityManager.flush();
             axonTransactionManager.commitTransaction(tx);
             tx=null;
         }
         finally
         {
             if (tx!=null) axonTransactionManager.rollbackTransaction(tx);
         }
     }

This simply deletes all entities and copies all stored entities to the client's database. Should I copy the Snapshots from the server as well or can I make the client somehow calculate new snapshots after this action?

My next step is to clear the read model and trigger a cluster replay.

See reply inline.

Hi Allard,

you could use another table for the locally applied events, so that you
can easily remove them. You may also need to rebuild the query models
locally.

Sounds like a good idea. Do you have any hint how I could achive that the
events are stored in different tables depending on their source? Probably
I'd have to modify EventSourcingRepository in some way.

If you use the JpaEventStore, you can extend the DefaultEventEntry store

and implement your own methods for createDomainEventEntry,
createSnapshotEventEntry, domainEventEntryEntityName,
snapshotEventEntryEntityName. The first two methods will need to return a
subclass of DomainEventEntry/SnapshotEventEntry with the proper JPA @Entity
annotations.
However, if you only store the aggregate snapshot when synchronizing, you
don't need to do anything with separate tables. All you would then need to
do is clear the DomainEventEntry tables and download new snapshots. Then,
your event store is up to date again.
I would sync the query model in the same way. Simply download new/modified
query table entries from the server on sync.

Just wondering, are you copying the whole application state locally, or

are you selecting specific aggregates (and related read models) to
synchronize? If you do so, theoretically, you only need to sync the read
models and snapshots of the current aggregate state locally. You may not
need any history of your aggregates.

My requirement just says that the current state of some aggregates has to
be on the client and that all changes from the client needs to be
synchronized to the server. To keep it simply, I'm going to copy the whole
event stream from server to client with the opportunity to optimize it
later in the way you described.

As described above, you could store snapshots only. In that case, you

would query for each aggregate identifier and its maximum sequence number.
If that sequence number is higher than the information you have stored
locally, you use the snapshotter to create a snapshot, and store that in
the local Snapshot table. In future versions, you could limit this to the
aggregates that the client "subscribed" to.

Actually on the client the same (web-) application will run as on the

server. There is a configuration setting telling it that it runs on the
client which changes some things:
- only commands for certain aggregates are accepted (using an
interceptor). The GUI will reflect this restrictions
- all accepted commands will be serialized to en extra database table
- synchronisation will be enabled

At the moment I try to send the server's events to the client and have the
following problem. I visit all events on the server and send them as
Event-Stream to the client. There I'd like to add it to the event store. My
problem is that saving events to the event store requires a "type"
parameter which is not available in the EventVisitor. Do you have an idea
how to solve this?
Otherwise I would force the client and the server to use JpaEventStore and
directly access/copy the stored entities.

Valid point. The API currently doesn't allow you to get the type

parameter. The only way is to access the store directly, unfortunately. I
think I will need to add the type as a parameter to the visitor in future
versions.

Hi Allard,

thank you a lot for the answers. Yesterday I finished my first implementation that passes all my unit tests:

1. An application property "appMode" which can be "Client" or "Server"

2. A "SyncClientCommandInterceptor".
If appMode is Client, it ensures that all commands that are applied have the "IClientCommand" interface. If so, these events are serialized using xstream and stored to a database table (and of course always passed to the interceptorChain).

3. A "ISyncServer" interface+ "SyncServer" implementation
It exposes the Method fetchEvents(IEntityVisitor<DomainEventEntry> eventVisitor). In this Method it iterates directly over the stored events (no snapshots and not filter for the first implementation).

It also exposes the transactional method applyClientCommands(InputStream packedCommandStream, String checksum) which applies a stream of commands to the CommandBus

3. A "SyncClient" implementation that simply exposes the method synchronizeWith(ISyncServer syncServer)
In this method it first reads all serialized commands and sends them as stream to the sync server. After this is finished without errors, it deletes all local data (stored events, snapshots, read model) and reads the complete event stream from the server. This is done within a transaction.

This is the most simple implementation that works. Now I'm going to implement some HTTP transport including authentication stuff, do some integration tests and write more complex unit tests. When all works, i start to optimize the sync a bit more.

If you are interested, I can share some code with you. The synchronisation code is almost application independent (except the stuff that deletes the read model but this could be done via callback) - so it could potentially become part of axonframwork.

Kind regards,
Michael.

Hi Michael,

Your solution sounds pretty good. I'd love to see some code, and if there is something to include in Axon itself, that would be even better.
Just wondering, do you do some conflict checking/resolution in your synchronization? Or is that one if the v2 features?

Cheers,

Allard

Hi Allard,

I just stumbled upon your conversation about offline mobile clients. I currently have the same requirements and wanted to ask, wheater there is any news about integrating this functionality into the axon framework?

Best regards,
Andreas

Hi Andreas,

the framework itself doens’t contain any explicit building blocks for this purpose. However, the architectural principle of CQRS makes it not-too-hard to implement this in a way that doesn’t affect your backend architecture too much. Since clients use many different types of technologies, it is virtually impossible to create a generic component to support this directly.

Cheers,

Allard

Hi Allard,

thanks for your fast reply. I see your point.

Greets,
Andreas

Hi Michael,

thanks for the detailed description of your approach. I am sure, this will be really helpfull!

best regards,
Andreas