GenericJPARepository and EventStore at the same time

Hi all,

is it possible to have at the time Aggregrate persited with a GenericJPARepository
and events registered in the table domain_event_entry ?

Thanks

Hi,

in Axon 3, yes you can. If you use a GenericJpaRepository, your aggregate will be stored and loaded from a dedicated table. If you also use an Event Store (instead of just an Event Bus), you will have events stored as well. The Event Store won’t be used to load the aggregate, but will still contain all events.

Cheers,

Allard

Hi Allard,

thanks for your reply.

Maybe i’ve have a configuration mistake somewhere

  • i’ve an aggregate with @Entity and @Aggregate
  • I’ve créated a bean for my repository
  • i’ve created a bean for storing event with Jpa

and when i generate 3 commands on my aggregate :

  • my Aggregate is uptodate
  • I’ve 3 lines in the DOMAIN_EVENT_ENTRY table
  • but my colunm AGGREGATE_IDENTIFIER has different value for each line !!

This is my axon configuration class :

`

@Configuration
public class AxonConfiguration
{
@Bean
public EntityManagerProvider entityManagerProvider()
{
return new ContainerManagedEntityManagerProvider();
}

@Bean
public Repository publicationRepository(EntityManagerProvider entityManagerProvider, EventBus eventBus)
{
return new GenericJpaRepository<>(entityManagerProvider, Publication.class, eventBus);
}

@Bean
public EventStorageEngine eventStorageEngine(EntityManagerProvider entityManagerProvider, TransactionManager transactionManager)
{
return new JpaEventStorageEngine(entityManagerProvider, transactionManager);
}

@Bean
public Serializer serializer()
{
return new JacksonSerializer();
}
}

`

and this is my Aggregate

`

@Aggregate
@Entity
@NoArgsConstructor
public class Publication
{
@AggregateIdentifier
private PublicationID publicationID;

@Id
// For Hibernate to handle PublicationID who is a ValueObject
protected String getPublicationID()
{
return publicationID.toString();
}
protected void setPublicationID(String uuid)
{
this.publicationID = new PublicationID(uuid);
}

@Getter @Setter @Column
private String libelle;

@Getter @Setter @Column
private String referenceSelsia;

@Getter @Setter @Column
private String demandeur;

@CommandHandler
public Publication(CreatePublicationCommand command)
{
apply(new PublicationCreatedEvent(command.getPublicationID(), “666”, command.getLibelle(), command.getDemandeur()));
}

@CommandHandler
public void handle(ActivatePublicationCommand command)
{
apply(new PublicationActivatedEvent(command.getPublicationID()));
}

@CommandHandler
public void handle(DeactivatePublicationCommand command)
{
apply(new PublicationDeactivatedEvent(command.getPublicationID()));
}

@CommandHandler
public void handle(DeletePublicationCommand command)
{
markDeleted();
apply(new PublicationDeletedEvent(command.getPublicationID()));
}

@EventHandler
public void on(PublicationCreatedEvent event)
{
this.publicationID = event.getPublicationID();
this.referenceSelsia = event.getReferenceSelsia();
this.libelle = event.getLibelle();
this.demandeur = event.getDemandeur();
}
}

`

Cheers

to complete my previous post :

if i comment the line with @Entity

and the block with the bean definition of the repository,
the attribut aggregate_identifier, type, sequence_number of table DOMAIN_EVENT_ENTRY are correctly filled.

Thanks for help.

Which commands do you send, and which values do you see in your tables?

Hi Allard,

i send commands that are in my domain via the CommandGateway

`

@Autowired
private CommandGateway commandGateway;

PublicationID publicationID = new PublicationID();
commandGateway.send(new CreatePublicationCommand(publicationID, “Libelle”, “@daco”));
commandGateway.send(new ActivatePublicationCommand(publicationID));
commandGateway.send(new DeactivatePublicationCommand(publicationID, “trop cher”));

publicationID = new PublicationID();
commandGateway.send(new CreatePublicationCommand(publicationID, “Hey”, “@me”));

`

Exemple of one command :

`

@Value
public class CreatePublicationCommand
{
@NotNull
private PublicationID publicationID;

@NotNull
private String libelle;

@NotNull
private String demandeur;
}

`

and an event

`

@Getter
@AllArgsConstructor
public class PublicationCreatedEvent extends BopEvent
{
private PublicationID publicationID;

private String referenceSelsia;

private String libelle;

private String demandeur;
}

`

and in my table domain_events_entry

SELECT EVENT_IDENTIFIER , PAYLOAD_TYPE ,AGGREGATE_IDENTIFIER , SEQUENCE_NUMBER,TYPE FROM DOMAIN_EVENT_ENTRY;

EVENT_IDENTIFIER PAYLOAD_TYPE AGGREGATE_IDENTIFIER SEQUENCE_NUMBER TYPE
4463a2a6-e998-4794-b434-7718f9ad2ff0 fr.planetvo.poc.cqrs.coreapi.publication.PublicationCreatedEvent 4463a2a6-e998-4794-b434-7718f9ad2ff0 0 null
95ab1fd2-19eb-40b1-9d0f-9c4b2f92dbbd fr.planetvo.poc.cqrs.coreapi.publication.PublicationActivatedEvent 95ab1fd2-19eb-40b1-9d0f-9c4b2f92dbbd 0 null
0aa6b3fc-8465-429d-9ae5-6df50c716cc1 fr.planetvo.poc.cqrs.coreapi.publication.PublicationDeactivatedEvent 0aa6b3fc-8465-429d-9ae5-6df50c716cc1 0 null
eec79aa9-72fb-44e5-b2c8-cead89992351 fr.planetvo.poc.cqrs.coreapi.publication.PublicationCreatedEvent eec79aa9-72fb-44e5-b2c8-cead89992351 0 null

As you see my aggregate_identifier has the same value as event_identifier.

If i remove @Entity and the bean definition for the GenericRepositroy, i got cohérent values

SELECT EVENT_IDENTIFIER , PAYLOAD_TYPE ,AGGREGATE_IDENTIFIER , SEQUENCE_NUMBER,TYPE FROM DOMAIN_EVENT_ENTRY;

EVENT_IDENTIFIER PAYLOAD_TYPE AGGREGATE_IDENTIFIER SEQUENCE_NUMBER TYPE
6599f0be-9ced-4f37-8584-3e61a9968e40 fr.planetvo.poc.cqrs.coreapi.publication.PublicationCreatedEvent 6bba0db5-8f98-4563-b472-2d1231f0149e 0 Publication
65c1ac0c-6737-470f-8af8-19d15fe9a4fa fr.planetvo.poc.cqrs.coreapi.publication.PublicationActivatedEvent 6bba0db5-8f98-4563-b472-2d1231f0149e 1 Publication
8e10da8f-981c-4fd9-8560-38bc36b840c0 fr.planetvo.poc.cqrs.coreapi.publication.PublicationDeactivatedEvent 6bba0db5-8f98-4563-b472-2d1231f0149e 2 Publication
3b73e2c2-c354-4e78-83e3-9f0349372b5c fr.planetvo.poc.cqrs.coreapi.publication.PublicationCreatedEvent 0c400e41-cbd4-4f80-a919-a66b8eb2e19d 0 Publication

Thanks for your help

Hi Philippe,

now I see what you mean. I thought the identifier of your aggregate in the repository was different each time.
Actually, because your aggregate isn’t event sourced, the events it emits are just EventMessages, not DomainEventMessage. The reason is that Axon doesn’t keep track of a sequence number, which it would need when publishing DomainEventMessages. For tracking processors to allow to process events, they need to be stored in the event store, along with any other events (DomainEvent or not). The aggregate identifier field is given a value identical to the event identifier, and sequence number 0.

Hope this clarifies it.
Cheers,

Allard

Hi Allard,

thanks for your reply, but i’m not sure to understand :slight_smile:

Do you mean that i need to put a @EventSourcingHandler on a method to handle my PublicationCreatedEvent ?
Because i’ve tried, and i have always the same problem.

In your fist reply, yo tell me that it was possible to use JpaRepository and EventStore

in Axon 3, yes you can. If you use a GenericJpaRepository, your aggregate will be stored and loaded from a dedicated table. If you also use an Event Store (instead of just an Event Bus), you will have events stored as well. The Event Store won’t be used to load the aggregate, but will still contain all events.

But does your answer implied that events are saved, but i can’t use my event store to retreive history ?

Maybe i’ve a mistake in my configuration

Thanks

PS > Sorry for my bad english.

Hi Philippe,

@EventHandler and @EventSourcingHandler are technically the same. They’re just stereotypes for your method to be able to better express the intent.

What I was trying to say, is that there’s actually nothing wrong with your setup. The fact that the aggregate identifier on the events in your event store isn’t set to the identifier of the aggregate that published them, isn’t an issue at all. You application will just work as expected.

The only thing you now can’t do (easily) is retrieve all events generated by a specific aggregate. But the history is still all there, as your events are stored in either case.

Cheers,

Allard

Hi Philippe and Allard,

Have you tried to use that solution with an AynchonousCommandBus?
I am see race conditions when I try.
In my scenario, I handle commands in an separate class with @CommandHandler annotated methods.
I send a first command resulting in the creation of an aggregate and the publication of a single event.
The event is handled by an external listener that dispatches another command that should load the aggregate from the GenericJpaRepository and call a method on it.
The problem is that I am getting an exception stating the aggregate does not exist when the second command handler attempts to load it.

Based on what I am seeing in the log, is seems that events are published on the event bus as soon as they are written in the event store without waiting for the entity to be written in the GenericJpaRepository.
This causes the second thread to receive the second command before the GenericJpaRepository commit has happened, causing the failure.

Am I missing something in what it happening and do you thing that it is possible to correct this behavior?

Thanks.

Hi,

you’re probably using a SubscribingEventProcessor (default). Events are published before the transaction is committed. So when you publish commands from an event handling component, it is possible that the command reaches the handler before the transaction of the event handler is committed. In that case, you will get a “not found”.

We’ve been looking at ways to postpone the actual sending of commands to the bus until the outer transaction was committed. The issue with this, however, is that senders that will wait for a response will effectively block forever. The current API doesn’t allow for a way to prevent that.

In your case, if you don’t expect to wait for a result of the command (which is recommended and in this case even impossible), then you can dispatch the command after the commit, like this:

CurrentUnitOfWork.get().afterCommit(u -> commandBus.dispatch(command));

Cheers,

Allard

Hi Allard,
I’ll try your suggestion.
Thank you.

Hi allard

I cannot get your answer with this config i have the same issue, At domain_event_Entry one aggregate identifier, and on the delivery table, another one.

Using postgres

package com.bob.core.domain.model.delivery

import com.bob.core.application.services.axon.DomainCreateDeliveryCommand
import com.bob.core.domain.model.order.OrderId
import groovy.transform.EqualsAndHashCode
import groovy.transform.ToString
import org.axonframework.commandhandling.CommandHandler
import org.axonframework.commandhandling.model.AggregateIdentifier
import org.axonframework.commandhandling.model.AggregateLifecycle
import org.axonframework.spring.stereotype.Aggregate
import org.springframework.hateoas.Identifiable

import javax.persistence.*

@Entity
@Aggregate
@EqualsAndHashCode
@ToString
class Delivery  implements Identifiable<Long>{

    @EmbeddedId
    @AggregateIdentifier
    DeliveryId deliveryId
    @Embedded
    OrderId orderId
    @Enumerated(EnumType.STRING)
    DeliveryStatus deliveryStatus = DeliveryStatus.WAITING_FOR_DELIVERY
    @ElementCollection
    @CollectionTable(name="BAGS", joinColumns=@JoinColumn(name="DELIVERY_ID"))
    Set<Bag> bags = []
    @Enumerated(EnumType.STRING)
    Carrier carrier
    @Embedded
    CarrierDeliveryId carrierDelivery
    @Enumerated(EnumType.STRING)
    DeliveryType deliveryType

    Delivery() {
    }

    @CommandHandler
    Delivery(DomainCreateDeliveryCommand deliveryCommand) {
        this.deliveryId = new DeliveryId(bobId:  deliveryCommand.deliveryId)
        this.carrier = deliveryCommand.carrier
        AggregateLifecycle.apply(new DeliveryCreatedEvent(deliveryId: this.deliveryId))
    }

    Long getId(){
        deliveryId.bobId
    }

    void addBags(Set<Bag> moreBags) {
        bags.addAll(moreBags)
    }

    void decreaseBags(Integer quantity) {
        if ((deliveryStatus.equals(DeliveryStatus.WAITING_FOR_DELIVERY) || deliveryStatus.equals(DeliveryStatus.DROPPED_OFF)) && bags.size() >
                quantity) {
            quantity.times { it ->
                bags.minus(bags.getAt(bags.size() - it))
            }
        } else {
            throw new CannotDecreaseBagsException()
        }

    }

    Set<Bag> obtainBags() {
        bags
    }

    void assignToMRW() {
        carrier = Carrier.MRW
    }

    void assignToSEUR() {
        carrier = Carrier.SEUR
    }

    Carrier carrier() {
        carrier
    }

    void cancelDelivery() {
        deliveryStatus = DeliveryStatus.CANCELLED
    }

    void userPicksUpDelivery() {
        deliveryStatus = DeliveryStatus.DELIVERED
    }

    void carrieIsOnTheWay() {
        deliveryStatus = DeliveryStatus.ON_DELIVERY
    }

    void carrierPicksUpTheDelivery() {
        deliveryStatus = DeliveryStatus.CARRIER_PICKED_UP
    }

    void userDropsOffTheDelivery() {
        deliveryStatus = DeliveryStatus.DROPPED_OFF
    }

    void carrierDropsOffTheDelivery() {
        deliveryStatus = DeliveryStatus.CARRIER_DROPPED_OFF
    }

    void carrierDeliversTheDelivery() {
        deliveryStatus = DeliveryStatus.DELIVERED
    }

}

This is my config. I needed to put AggregateAnnotationCommandHandler
to store the aggregate. Canyou help me?

package com.bob.core

import com.bob.core.domain.model.delivery.Delivery
import org.axonframework.commandhandling.AggregateAnnotationCommandHandler
import org.axonframework.commandhandling.model.GenericJpaRepository
import org.axonframework.commandhandling.model.Repository
import org.axonframework.common.jpa.ContainerManagedEntityManagerProvider
import org.axonframework.common.jpa.EntityManagerProvider
import org.axonframework.common.transaction.TransactionManager
import org.axonframework.eventhandling.EventBus
import org.axonframework.eventsourcing.AggregateFactory
import org.axonframework.eventsourcing.eventstore.EmbeddedEventStore
import org.axonframework.eventsourcing.eventstore.EventStorageEngine
import org.axonframework.eventsourcing.eventstore.EventStore
import org.axonframework.eventsourcing.eventstore.jpa.JpaEventStorageEngine
import org.axonframework.serialization.json.JacksonSerializer
import org.axonframework.spring.eventsourcing.SpringAggregateSnapshotterFactoryBean
import org.axonframework.spring.eventsourcing.SpringPrototypeAggregateFactory
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.cache.annotation.EnableCaching
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.context.annotation.Scope

import javax.persistence.EntityManager
import javax.persistence.PersistenceContext
import javax.sql.DataSource

@Configuration
@EnableCaching
class AxonConfiguration {

    @Autowired
    private DataSource dataSource

    @PersistenceContext
    private EntityManager entityManager;

    @Bean
    JacksonSerializer jacksonSerializer() {
        new JacksonSerializer()
    }

    @Bean
    EntityManagerProvider entityManagerProvider() {
        return new ContainerManagedEntityManagerProvider();
    }

    @Bean
    EventStore eventStorageEngine(EntityManagerProvider entityManagerProvider, TransactionManager transactionManager) {

        JpaEventStorageEngine storageEngine = new JpaEventStorageEngine(jacksonSerializer(), null, dataSource, entityManagerProvider,
                transactionManager)

        EventStore eventStore = new EmbeddedEventStore(storageEngine)
    }

    @Bean
    public SpringAggregateSnapshotterFactoryBean springAggregateSnapshotterFactoryBean() {
        return new SpringAggregateSnapshotterFactoryBean()
    }

    @Bean
    @Scope("prototype")
    public AggregateFactory<Delivery> deliveryAggregateFactory() {
        AggregateFactory<Delivery> aggregateFactory = new SpringPrototypeAggregateFactory<Delivery>()
        aggregateFactory.setPrototypeBeanName("delivery")
        return aggregateFactory;
    }

    @Bean
    @Scope("prototype")
    Delivery recommendation() {
        new Delivery()
    }

    @Bean
    public Repository<Delivery> deliveryRepository(EntityManagerProvider entityManagerProvider, EventBus eventBus) {
        new GenericJpaRepository<>(entityManagerProvider, Delivery, eventBus)
    }

    @Bean
    AggregateAnnotationCommandHandler<Delivery> toCommandHandler(GenericJpaRepository repo) {

        new AggregateAnnotationCommandHandler<Delivery>(Delivery.class, repo)
    }

}

Sorry real config is this. We need to know what aggregate generated which events

package com.bob.core

import com.bob.core.domain.model.delivery.Delivery
import org.axonframework.commandhandling.AggregateAnnotationCommandHandler
import org.axonframework.commandhandling.model.GenericJpaRepository
import org.axonframework.commandhandling.model.Repository
import org.axonframework.common.jpa.ContainerManagedEntityManagerProvider
import org.axonframework.common.jpa.EntityManagerProvider
import org.axonframework.common.transaction.TransactionManager
import org.axonframework.eventhandling.EventBus
import org.axonframework.eventsourcing.AggregateFactory
import org.axonframework.eventsourcing.EventCountSnapshotTriggerDefinition
import org.axonframework.eventsourcing.EventSourcingRepository
import org.axonframework.eventsourcing.eventstore.EmbeddedEventStore
import org.axonframework.eventsourcing.eventstore.EventStorageEngine
import org.axonframework.eventsourcing.eventstore.EventStore
import org.axonframework.eventsourcing.eventstore.jpa.JpaEventStorageEngine
import org.axonframework.serialization.json.JacksonSerializer
import org.axonframework.spring.eventsourcing.SpringAggregateSnapshotter
import org.axonframework.spring.eventsourcing.SpringAggregateSnapshotterFactoryBean
import org.axonframework.spring.eventsourcing.SpringPrototypeAggregateFactory
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.cache.annotation.EnableCaching
import org.springframework.context.ApplicationContext
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.context.annotation.Scope

import javax.persistence.EntityManager
import javax.persistence.PersistenceContext
import javax.sql.DataSource

@Configuration
@EnableCaching
class AxonConfiguration {

    @Autowired
    private DataSource dataSource

    @PersistenceContext
    private EntityManager entityManager;

    @Bean
    JacksonSerializer jacksonSerializer() {
        new JacksonSerializer()
    }

    @Bean
    EntityManagerProvider entityManagerProvider() {
        return new ContainerManagedEntityManagerProvider();
    }

    @Bean
    EventStore eventStore(EntityManagerProvider entityManagerProvider, TransactionManager transactionManager) {

        JpaEventStorageEngine storageEngine = new JpaEventStorageEngine(jacksonSerializer(), null, dataSource, entityManagerProvider,
                transactionManager)

        EventStore eventStore = new EmbeddedEventStore(storageEngine)
    }

    @Bean
    public SpringAggregateSnapshotterFactoryBean springAggregateSnapshotterFactoryBean() {
        return new SpringAggregateSnapshotterFactoryBean()
    }

    @Bean
    @Scope("prototype")
    public AggregateFactory<Delivery> deliveryAggregateFactory() {
        AggregateFactory<Delivery> aggregateFactory = new SpringPrototypeAggregateFactory<Delivery>()
        aggregateFactory.setPrototypeBeanName("delivery")
        return aggregateFactory;
    }

    @Bean
    @Scope("prototype")
    Delivery recommendation() {
        new Delivery()
    }

    @Bean
    public EventSourcingRepository<Delivery> recommendationRepository(EventStore eventStore, SpringAggregateSnapshotterFactoryBean
            springAggregateSnapshotterFactoryBean, ApplicationContext applicationContext) {

        SpringAggregateSnapshotter snapshotter = springAggregateSnapshotterFactoryBean.getObject()
        snapshotter.setApplicationContext(applicationContext)
        EventCountSnapshotTriggerDefinition snapshotTriggerDefinition = new EventCountSnapshotTriggerDefinition(
                snapshotter,
                2);
        EventSourcingRepository repository = new EventSourcingRepository(Delivery.class, eventStore, snapshotTriggerDefinition)

        return repository
    }

    @Bean
    public Repository<Delivery> deliveryRepository(EntityManagerProvider entityManagerProvider, EventBus eventBus) {
        new GenericJpaRepository<>(entityManagerProvider, Delivery, eventBus)
    }

    @Bean
    AggregateAnnotationCommandHandler<Delivery> toCommandHandler(GenericJpaRepository repo) {

        new AggregateAnnotationCommandHandler<Delivery>(Delivery.class, repo)
    }

}

Hi,

the aggregateIdentifier used in the Event Store table itself, is only actually the aggregateIdentifier if that aggregate is Event Sourced. If the aggregate is stored using ORM, then the aggregate identifier is actually set to the message id. This is a workaround to allow for a uniqueness check on the combination of aggregateIdentifier and sequenceNumber. Since there is not concept of “sequenceNumber” in a state-stored aggregate, this is the only way to guarantee uniqueness.

If you need the aggregate identifier, make sure it’s attached either in the payload (preferred) or meta data of your event.

Cheers,

Allard

That means it cannot be eventsourced and with genericJparepositry at the sametime, syncing identifier?

It seem to me a good way to have 'transparent CQRS, state and eventsourced automatically together via configuration without listening domain event and storing state/read model afterwards in a custom way

Hi Fernando/Allard,

This means we can’t store snapshot also because each generated event have distinct aggregate identifier with same sequence no. zero.
its not going inside the following if condition and not storing snapshot.

AbstractSnapshotter

Hi all,

snapshotting is only useful when using the Event Sourcing repository. If you use another repository, snapshotting will never be triggered. It wouldn’t make sense, as the aggregate is loaded from its state anyway. Snapshots are only meant as a temporary replacement for a series of event to optimize the loading time of a specific aggregate.

Cheers,

Allard

In summary, to have both, and synced we need to make eventsourcing, and projections via event handling

Enviado con Mailtrack

That is indeed the essence of CQRS with Event Sourcing. The command model’s state is stored as a sequence of events, while the query model is persisted as state, in such a way that it is efficient to query.