Axon JpaEventStore with PostgreSQL configuration (help please)

Hi there,

I have a sample application in place that utilizes an EmbeddedEventStore, as follows:

static CommandBus commandBus = new SimpleCommandBus();

CommandGateway commandGateway = new DefaultCommandGateway(commandBus);

static EventStore eventStore = new EmbeddedEventStore(new InMemoryEventStorageEngine());

public static void main(String[] args) {

SpringApplication.run(DemandBiddingServiceApplication.class, args);

logger.info(“Spring boot application started!”);

EventSourcingRepository repository =

new EventSourcingRepository<>(BidAggregate.class, eventStore);

AggregateAnnotationCommandHandler messagesAggregateAggregateAnnotationCommandHandler =

new AggregateAnnotationCommandHandler(BidAggregate.class, repository);

messagesAggregateAggregateAnnotationCommandHandler.subscribe(commandBus);

final AnnotationEventListenerAdapter annotationEventListenerAdapter =

new AnnotationEventListenerAdapter(new BidsEventHandler());

eventStore.subscribe(eventMessages -> eventMessages.forEach(e -> {

try {

annotationEventListenerAdapter.handle(e);

} catch (Exception e1) {

throw new RuntimeException(e1);

}

}

));

I would like the same functionality, only with a JpaEventStore configured for PostgreSQL. I believe I may also require a JpaRepository, rather than an EventSourcingRepository… I do not want to use the in memory store. Would you be able to show me how I can modify the selection above to achieve this? I have not been able to find any good examples online. I also read that the database is automatically populated, and I don’t need to generate any tables – is this the case?

I have made the following changes to my application.properties :

Spring DATASOURCE (DataSourceAutoConfiguration & DataSourceProperties)

spring.datasource.url=jdbc:postgresql://localhost:5432/bidding_event_store

spring.datasource.username=user

spring.datasource.password=pass

The SQL dialect makes Hibernate generate better SQL for the chosen database

##spring.jpa.properties.hibernate.dialect = org.hibernate.dialect.PostgreSQLDialect

Hibernate ddl auto (create, create-drop, validate, update)

spring.jpa.generate-ddl=true

spring.jpa.hibernate.ddl-auto=update

#spring.jpa.hibernate.ddl-auto=validate

I am using the latest version of axon, 3.3.2

Hi,

if you use Spring Boot, I very heavily recommend you to include the axon-spring-boot-starter. If you have JPA on your classpath, it will automatically default to the JPA implementations of the things you require. There is no need to manually configure all these components.
Where customization is required, you can either define your own configured component as an @Bean, or autowire the Configurer in any method in your configuration class.

Make sure your BidsEventHandler is a Spring @Component, and has @EventHandler annotated methods. That all you need to do.

Check out the sample application we have: https://github.com/AxonIQ/giftcard-demo-series

Cheers,

Allard

Is the event store automatically configured?

I’m getting this error: Default configuration requires the use of event sourcing Either configure an Event Store to use, or configure a specific repository implementation for class com.ge.energy.markets.bids.demand.eventsourcing.aggregates.BidAggregate

I got past that issue, as I had a bean initialized for an EventBus (which I read is essentially an event store).

Now I’m having a problem with: org.axonframework.commandhandling.NoHandlerForCommandException: No handler was subscribed to command

For more context…Here is my application class – note that I have some things commented out as I’ve been trying different things:

@SpringBootApplication

@ComponentScan(basePackages = {“com.ge.energy.bids.demand.eventsourcing”})

@EntityScan(basePackages = {“com.ge.energy.bids.demand.eventsourcing”,

“org.axonframework.eventsourcing.eventstore.jpa”,

“org.axonframework.eventhandling.saga.repository.jpa”,

“org.axonframework.eventhandling.tokenstore.jpa”})

@EnableJpaRepositories(basePackages = {“com.ge.energy.bids.demand.eventsourcing”})

public class DemandBiddingServiceApplication {

private static final Logger logger = LoggerFactory.getLogger(DemandBiddingServiceApplication.class);

static SimpleCommandBus commandBus = new SimpleCommandBus();

CommandGateway commandGateway = new DefaultCommandGateway(commandBus);

public static void main(String[] args) {

SpringApplication.run(DemandBiddingServiceApplication.class, args);

logger.info(“Spring boot application started!”);

}

// @Bean

// Repository bidRepository(EntityManagerProvider entityManagerProvider, EventBus eventBus)

// {

// Repository todoRepository = new GenericJpaRepository(

// entityManagerProvider,

// BidRepository.class,

// eventBus

// );

// return todoRepository;

// }

@Bean

@Primary

public Serializer serializer() {

return new JacksonSerializer();

}

// @Bean

// public EventStorageEngine eventStorageEngine() throws Exception {

// return new JpaEventStorageEngine(entityManagerProvider(), springTransactionManager());

// }

//

// @Bean

// public EntityManagerProvider entityManagerProvider() {

// return new ContainerManagedEntityManagerProvider();

// }

//

// @Bean

// public SpringTransactionManager springTransactionManager() throws Exception {

// return new SpringTransactionManager(transactionManager());

// }

//

// @Bean

// public PlatformTransactionManager transactionManager() throws PropertyVetoException {

// return new JpaTransactionManager(entityManagerFactory);

// }

//

// @Bean

// public TransactionManagerFactoryBean transactionManagerFactoryBean() throws PropertyVetoException {

// TransactionManagerFactoryBean factoryBean = new TransactionManagerFactoryBean();

// factoryBean.setTransactionManager(transactionManager());

//

// return factoryBean;

// }

Here is my aggregate class:

@Aggregate

public class BidAggregate {

@AggregateIdentifier

private String mrid;

private String name;

private long fixedMw;

public BidAggregate()

{

System.out.println("--------------------------------------------------");

System.out.println(“Loaded Aggregate”);

System.out.println("--------------------------------------------------");

}

@CommandHandler

public BidAggregate(CreateBidCommand command)

{

apply(new BidCreatedEvent(command.getMrid(), command.getName(), command.getFixedMw()));

}

@EventHandler

public void on(BidCreatedEvent event)

{

this.mrid = event.getMrid();

this.name = event.getName();

this.fixedMw = event.getFixedMw();

}

@CommandHandler

public BidAggregate(AlterBidCommand command)

{

apply(new BidAlteredEvent(command.getMrid(), command.getName(), command.getFixedMw()));

}

@EventHandler

public void on(BidAlteredEvent event)

{

this.mrid = event.getMrid();

this.name = event.getName();

this.fixedMw = event.getFixedMw();

}

Here are my command classes:

@Value

public class AlterBidCommand {

@TargetAggregateIdentifier

private final String mrid;

private final String name;

private final long fixedMw;

public AlterBidCommand(String mrid, String name, long fixedMw) {

this.mrid = mrid;

this.name = name;

this.fixedMw = fixedMw;

}

public String getMrid() {

return mrid;

}

public String getName() {

return name;

}

public long getFixedMw()

{

return fixedMw;

}

@Value

public class CreateBidCommand {

@TargetAggregateIdentifier

private final String mrid;

private final String name;

private final long fixedMw;

public CreateBidCommand(String mrid, String name, long fixedMw) {

this.mrid = mrid;

this.name = name;

this.fixedMw = fixedMw;

}

public String getMrid() {

return mrid;

}

public String getName() {

return name;

}

public long getFixedMw()

{

return fixedMw;

}

Event classes:

@Value

public class BidAlteredEvent {

private String mrid;

private String name;

private long fixedMw;

public BidAlteredEvent(String mrid, String name, long fixedMw) {

this.mrid = mrid;

this.name = name;

this.fixedMw = fixedMw;

}

public String getMrid() {

return mrid;

}

public String getName() {

return name;

}

public long getFixedMw() {

return fixedMw;

}

@Value

public class BidCreatedEvent {

private String mrid;

private String name;

private long fixedMw;

public BidCreatedEvent(String mrid, String name, long fixedMw) {

this.mrid = mrid;

this.name = name;

this.fixedMw = fixedMw;

}

public String getMrid() {

return mrid;

}

public String getName() {

return name;

}

public long getFixedMw() {

return fixedMw;

}

I am trying to get a events to appear through my JPA connection. I can connect to my database and I do see the tables generated properly once the application kicks off. It just always complains about the handlers.

Finally my event handler class:

@Component

public class BidsEventHandler {

@EventHandler

public void handle(BidCreatedEvent event) {

System.out.println(“Bid Created: " + event.getMrid() + " (” + event.getMrid() + “)”);

}

@EventHandler

public void handle(BidAlteredEvent event) {

System.out.println(“Bid Altered: " + event.getMrid() + " (” + event.getMrid() + “)”);

}

Any help would be greatly appreciated, I can tell I am close – but I’m clearly missing something inherent here.

If you have the spring-boot-starter-data-jpa dependency (by Spring) in your application, it will be automatically configured, indeed.
Make sure you don’t specify any EventStorageEngines or EventStore instances in your configurations, otherwise Axon will use that one instead.

Any idea why I’m getting an error related to org.axonframework.commandhandling.NoHandlerForCommandException: No handler was subscribed to command ?

My command handlers are within my Aggregate class, that I included the details to in a previous post here. The aggregate is in a subdirectory of where the application class is stored (saw in a separate post that could be an issue).

I’ve seen some chatter about needing a AggregateAnnotationCommandHandler, but I don’t see it in your example projects.

Are your aggregates annotated with @Aggregate?

Yes, here is the aggregate:

@Aggregate

public class BidAggregate {

@AggregateIdentifier

private String mrid;

private String name;

private long fixedMw;

public BidAggregate()

{

System.out.println("--------------------------------------------------");

System.out.println(“Loaded Aggregate”);

System.out.println("--------------------------------------------------");

}

@CommandHandler

public BidAggregate(CreateBidCommand command)

{

apply(new BidCreatedEvent(command.getMrid(), command.getName(), command.getFixedMw()));

}

@EventHandler

public void on(BidCreatedEvent event)

{

this.mrid = event.getMrid();

this.name = event.getName();

this.fixedMw = event.getFixedMw();

}

@CommandHandler

public BidAggregate(AlterBidCommand command)

{

apply(new BidAlteredEvent(command.getMrid(), command.getName(), command.getFixedMw()));

}

@EventHandler

public void on(BidAlteredEvent event)

{

this.mrid = event.getMrid();

this.name = event.getName();

this.fixedMw = event.getFixedMw();

}

}

Do I need any beans defined within my application or configuration class? I currently don’t have any beans configured, based on your previous suggestion that I would not need to configure any of those items

Hi,

no, you don’t actually have to configure anything at all. If you have the Spring boot starter for Axon in place, all you need is the @Aggregate and some @EventHandlers.

I see you have @ComponentScan on your SpringBoot application, as well as some other annotations. Is the aggregate in the package defined in the @ComponentScan annotation?
Note that all these annotations are optional and should only be provided if you want to scan objects outside of the package your Application class is in.

Cheers,

Allard

Hi Allard. I figured out my issue. I was configuring a separate command bus from that which had already been auto configured by Spring. That was causing no handlers to be connected to the bus as it had never been assigned to that bus. Rather, I autowired the preconfigured bus to pass through to my controller where I then issued commands that were processed into events.

Thanks!

Allard, I do have one quick question, where does the actual payload get stored within the event store?

I’m seeing the following fields :

Also, how can i get the sequence number to stay the same? It always seems to give me a new aggregate.

My aggregate identifier is set as such:

@AggregateIdentifier

private String mrid;

Yet, I always seem to get a new aggregate, even when passing through the same mrid…

Hi,

I see you’re using Postgres. The payload is stored in the payload column, but Postgres stores blobs separately and references them using an OID.
For production situations, we recommend tuning the database to store data inline up to a decent amount of data. This is not default behavior in Postgres, because blob objects are generally expensive to modify. In an event store, however, these objects are never modified, making it cheaper to inline them.
Have a look at TOAST in the postgres documentation.

Cheers,

Allard

Well…actually – when passing through the same mrid I get an error:

Caused by: org.postgresql.util.PSQLException: ERROR: duplicate key value violates unique constraint “uk8s1f994p4la2ipb13me2xqm1w”
Detail: Key (aggregate_identifier, sequence_number)=(9cc0890b-7403-4fdf-8f8f-8ce81af423c2, 0) already exists.

You AlterBidCommand is also a constructor command handler. This will cause Axon to attempt to create a new instance, thus inserting sequence number 0. However, that entry already exists.
You probably want to change that method into a “normal” method:
public void handle(AlterBidCommand cmd);

Cheers,

Allard

Let me clarify… As I see what I said above made no sense…

When I pass through the same mrid, which is my aggregate identifier, I get the following error:

Caused by: org.postgresql.util.PSQLException: ERROR: duplicate key value violates unique constraint “uk8s1f994p4la2ipb13me2xqm1w”
Detail: Key (aggregate_identifier, sequence_number)=(9cc0890b-7403-4fdf-8f8f-8ce81af423c2, 0) already exists.

I want to see the sequence number increment when a new event has been performed on a specific bid (which is how I would expect the event sourcing to work, anyways)

I am still unsure as to where the actual payload data is being stored as I can’t seem to find it anywhere.

Check out this question on stackoverflow:
https://dba.stackexchange.com/questions/174029/how-to-show-large-objects-lob-contents

Thanks Allard – I tried that, but I’m seeing an issue related to serialization now when I utilize the handle function for the alter method:

Command resulted in exception: com.ge.energy.markets.bids.demand.eventsourcing.commands.AlterBidCommand
org.axonframework.messaging.annotation.MessageHandlerInvocationException: Error handling event of type [class com.ge.energy.markets.bids.demand.eventsourcing.events.BidCreatedEvent] in aggregate

Caused by: org.axonframework.serialization.SerializationException: Error while deserializing object
at org.axonframework.serialization.json.JacksonSerializer.deserialize(JacksonSerializer.java:214)

Caused by: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of com.ge.energy.markets.bids.demand.eventsourcing.events.BidCreatedEvent (no Creators, like default construct, exist): cannot deserialize from Object value (no delegate- or property-based Creator)
at [Source: (byte[])"{“mrid”:“a22a250d-6755-4ea7-b91d-3438165e2faf”,“name”:null,“fixedMw”:22}"; line: 1, column: 2]

Cannot construct instance of com.ge.energy.markets.bids.demand.eventsourcing.events.BidCreatedEvent (no Creators, like default construct, exist): cannot deserialize from Object value (no delegate- or property-based Creator)

That’s a Jackson error. Apparantly, your class isn’t compatible with what Jackson expects. Make sure your events have the proper Jackson annotations.

Allard, do you know if there are any examples which utilize Jackson serialization within them, rather than the default Xstream – as a reference?

Thanks!