Re-create aggregate instance from events

Using the following code we can get a list of all the events pertaining to a particular aggregate instance.

eventStore.readEvents("targetAggregateIdentifier").asStream().map(Message::getPayload)
.collect(Collectors.toList());

But, is there a way to create an instance (pojo value object) from those events? I.e., is there a way to re-play these events and retrieve the current state of the aggregate, rather than having to persist a DTO/View entity and using @EventHandlers.

Hi Ryan,
I’m not sure if I understood correctly your question : you have defined an @Aggregate and you would like to reply events on this one: am I right ?
I think you already had the chance to look at our ref-guide : this page https://docs.axoniq.io/reference-guide/axon-framework/axon-framework-commands/modeling/aggregate is specific on the topic.
Aggregate state is rehydrated from events and snapshots: every events that hit a specific aggregate entity that match with the defined @AggregateIdentifier , it is then processed by the specific method annotated with @EventSourcingHandler. These methods will be in charge to modify the Aggregate state.
You can for example download and run our giftcard-demo https://github.com/AxonIQ/giftcard-demo/ and try to Issue one card with a value, then Reedem a value from the card. Run it in debug mode and look at the output log. In my case I first issued a card with value 10, then I Redeemed value 1 three time. Below the output log for the last iteration.

DEBUG 14508 --- [mandProcessor-1] i.axoniq.demo.giftcard.command.GiftCard  : Empty constructor invoked
DEBUG 14508 --- [mandProcessor-1] i.axoniq.demo.giftcard.command.GiftCard  : applying IssuedEvt(id=0024B18C-68, amount=10)
DEBUG 14508 --- [mandProcessor-1] i.axoniq.demo.giftcard.command.GiftCard  : new remaining value: 10
DEBUG 14508 --- [mandProcessor-1] i.axoniq.demo.giftcard.command.GiftCard  : applying RedeemedEvt(id=0024B18C-68, amount=1)
DEBUG 14508 --- [mandProcessor-1] i.axoniq.demo.giftcard.command.GiftCard  : new remaining value: 9
DEBUG 14508 --- [mandProcessor-1] i.axoniq.demo.giftcard.command.GiftCard  : handling RedeemCmd(id=0024B18C-68, amount=1)
DEBUG 14508 --- [mandProcessor-1] i.axoniq.demo.giftcard.command.GiftCard  : applying RedeemedEvt(id=0024B18C-68, amount=1)
DEBUG 14508 --- [mandProcessor-1] i.axoniq.demo.giftcard.command.GiftCard  : new remaining value: 8

My Aggregate was rehydrated from Events and all the event that belongs to this aggregate entity are taken into account and applied.

1 Like

I was more focusing on the projection model rather than the command model. I understand what you’re saying, but my question was specifically how to re-create an “aggregate” (rather I should be saying a view) when a query request is made. This is all so that we don’t have to create duplicate DTO classes in our read model. That’s the common pattern you see most people doing. You have your @Aggregate in the command model and then a AggregateDTO on the view model. I was hoping not to have to maintain two sets of POJOs. Obviously, one way is simply use your @Aggregate as a DTO, which I’ve seen some do. E.g.

@Aggregate
@Entity
class MyAggregate {}

We’ve decided to have a separate set of DTOs that we generate via custom swagger-codgen extensions (for JPA annotation) based off a API spec that aligns with our aggregate models.

Hi, if we are talking about projection/view, so the term @Aggregate is not pretty accurate, and could potentially lead into confusion.
You can then have two different approaches: the first one is to implement the concept of Stored-State Aggregate (you can read more in the ref-guide https://docs.axoniq.io/reference-guide/axon-framework/axon-framework-commands/modeling/state-stored-aggregates)
Second approach is to define a @Component, that will handle all the @EventHandler of his interest and update the view model using a JpaRepository. You can find an example of this into https://github.com/AxonIQ/hotel-demo/blob/master/booking/src/main/java/io/axoniq/demo/hotel/booking/query/AccountHandler.java

A clear drawbacks of the first approach is that data are stored in the projection with the form of the aggregate, as-is you will define it. Any change to your aggregate will be reflected in the query/view side.
Introducing a clear separation of concern between command and query, as the second approach suggest, will give you a lot of degrees of freedom into your query side: it is not rare to define more than one projection that is sourced from events that belongs to one aggregate.

You can then define a @ProcessingGroup : this will give you the possibility to introduce a method into your projection annotated with @ResetHandler that will be in charge to delete all the data from the repository. This method can be triggered calling the method resetTokens() on the specific EventProcessor . More info in the ref-guide https://docs.axoniq.io/reference-guide/axon-framework/events/event-processors#replaying-events

Sayed so : I suggest to introduce an actuator endpoint into your application that can be safely called to trigger a reset on a certain eventProcessor
Below you can find an example of code.

import org.axonframework.config.Configuration;
import org.axonframework.eventhandling.TrackingEventProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.actuate.endpoint.annotation.DeleteOperation;
import org.springframework.boot.actuate.endpoint.annotation.Endpoint;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;

import java.lang.invoke.MethodHandles;

@Component
@Endpoint(id="scheduletokenreset")
public class TriggerResetEventEndpoint {

    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

    private final Configuration configuration;

    public TriggerResetEventEndpoint(Configuration configuration) {
        this.configuration = configuration;
    }

    @DeleteOperation()
    public void triggerResetEventEndpointFor(String processingGroup) {
        Assert.hasLength(processingGroup, "Processing Group is mandatory and can't be empty!");

        logger.info("Trigger ResetTriggeredEvent for processingGroup {}", processingGroup);

        configuration.eventProcessingConfiguration()
                     .eventProcessorByProcessingGroup(processingGroup,
                                                      TrackingEventProcessor.class)
                     .ifPresent(trackingEventProcessor -> {
                         trackingEventProcessor.shutDown();
                         trackingEventProcessor.resetTokens();
                         trackingEventProcessor.start();
                     });
    }

}

Another simpler approach is to set to 0 the value of token into the token_entry table created on your client side for a certain processor_name : please note that this should be done after all client application are shut down. You will also need to take care to drop manually data created into your projection triggered by the specific processor.
Although this is simpler contains a lot of pitfall and I strongly suggest to do not use this approach in a production environment.

2 Likes

Thank you Corrado. Your detailed response has elucidated this issue for me.

1 Like