How do you publish an event intended only for @EventHandler-annotated methods (not part of an aggregate)?

Hi all,

We have sensor data coming in to MQTT, which we’re processing with a message processor that has a reference to an EventGateway. It parses the MQTT message into an instance foo of our own message class, then we call EventGateway#publish(foo). We then get an exception, because it appears the framework is looking for an aggregate id while trying to store the event in the event store (see below).

How do you publish a GenericEventMessage that only gets routed to @EventHandler-annotated methods that are not defined on @Aggregate?

Here’s the exception:

java.sql.BatchUpdateException: Batch entry 0 INSERT INTO DomainEventEntry (eventIdentifier, aggregateIdentifier, sequenceNumber, type, timeStamp, payloadType, payloadRevision, payload, metaData) VALUES ('80f51f3c-f70d-441b-b42b-5ee947f7f6e1','80f51f3c-f70d-441b-b42b-5ee947f7f6e1',0,NULL,'2022-06-21T19:11:02.452Z','app.site.messaging.PastEventMessage',NULL,?,?) was aborted: ERROR: null value in column "type" of relation "domainevententry" violates not-null constraint
  Detail: Failing row contains (6738, 80f51f3c-f70d-441b-b42b-5ee947f7f6e1, 0, null, 80f51f3c-f70d-441b-b42b-5ee947f7f6e1, \x3c6d6574612d646174612f3e, \x3c6170702e736974652e6d6573736167696e672e506173744576656e744d65..., null, app.site.messaging.PastEventMessage, 2022-06-21T19:11:02.452Z).  Call getNextException to see other errors in the batch
1 Like

Events don’t rely on any explicit set id in order to be handled by the framework. Could there be something wrong with the event definition? Perhaps you could share an example of your event class and the way you publish it on the event gateway.

Normally, an event is just a POJO without anything special related to @AggregateIdentifier or @TargetAggregateIdentifier like commands do.

The exception message states the type should not be NULL which it is. Hence it violates a constraint. Do you use Axon Server, or do you use a custom database as the event store?

2 Likes

We are using postgres as our event store, not Axon Server. I downloaded and ran the Gift Card demo against AxonServer, and everything works correctly: a plain, ol’ event class gets propogated correctly to listeners of that message type. However, in our application, we are getting an error due to a constraint in the sql schema that says that in table DomainEventEntry, type cannot be null:

create table if not exists domainevententry
(
    globalindex         serial8
    constraint domainevententry_pk
    primary key,
    aggregateidentifier varchar(255) not null,
    sequencenumber      numeric(19)  not null,
    type                varchar(255) not null,
    eventidentifier     varchar(255) not null,
    metadata            bytea,
    payload             bytea        not null,
    payloadrevision     varchar(255),
    payloadtype         varchar(255) not null,
    timestamp           varchar(255) not null
    );

The funny thing is, it’s the Axon framework itself that’s nullifying the type of my event instance. The method appendEvents on line 98 of JdbcEventStorageEngineStatements.java calls asDomainEventMessage:

    public static PreparedStatement appendEvents(Connection connection,
                                                 EventSchema schema,
                                                 Class<?> dataType,
                                                 List<? extends EventMessage<?>> events,
                                                 Serializer serializer,
                                                 TimestampWriter timestampWriter)
            throws SQLException {
        final String sql = "INSERT INTO " + schema.domainEventTable() + " (" + schema.domainEventFields() + ") "
                + "VALUES (?,?,?,?,?,?,?,?,?)";
        PreparedStatement statement = connection.prepareStatement(sql);
        for (EventMessage<?> eventMessage : events) {
            DomainEventMessage<?> event = asDomainEventMessage(eventMessage);
            SerializedObject<?> payload = event.serializePayload(serializer, dataType);
            SerializedObject<?> metaData = event.serializeMetaData(serializer, dataType);
            statement.setString(1, event.getIdentifier());
            statement.setString(2, event.getAggregateIdentifier());
            statement.setLong(3, event.getSequenceNumber());
            statement.setString(4, event.getType());
            timestampWriter.writeTimestamp(statement, 5, event.getTimestamp());
            statement.setString(6, payload.getType().getName());
            statement.setString(7, payload.getType().getRevision());
            statement.setObject(8, payload.getData());
            statement.setObject(9, metaData.getData());
            statement.addBatch();
        }
        return statement;
    }

asDomainEventMessage nullifies the type if the given instance is not instanceof DomainEventMessage, on line 141:

    private static <T> DomainEventMessage<T> asDomainEventMessage(EventMessage<T> event) {
        return event instanceof DomainEventMessage<?>
                ? (DomainEventMessage<T>) event
                : new GenericDomainEventMessage<>(null, event.getIdentifier(), 0L, event, event::getTimestamp); // THIS IS WHERE AXON NULLIFIES THE TYPE (first argument)
    }

Given all this, it appears as though Axon Server can handle null types in this situation, but when using Postgres, type cannot be null. I’ve tried to relax the not null constraint in the sql schema, but that causes me to get into a repeating error situation with messages like the following:

2022-06-23 10:08:14.514  INFO 24908 --- [.site.events]-0] o.a.e.TrackingEventProcessor             : Fetching Segments for Processor 'app.site.events' still failing: Could not load segments for processor [app.site.events]. Preparing for retry in 4s
{"@timestamp":"2022-06-23T10:08:14.514-05:00","@version":"1","message":"Fetching Segments for Processor 'app.site.events' still failing: Could not load segments for processor [app.site.events]. Preparing for retry in 4s","logger_name":"org.axonframework.eventhandling.TrackingEventProcessor","thread_name":"EventProcessor[app.site.events]-0","level":"INFO","level_value":20000}

My next step is to alter the Gift Card demo app to use Postgres to see if I can reproduce the error there.

Any advice given prior to my finishing that effort would be appreciated. FYI, here’s my working scenario source, added to the Gift Card demo app:

package io.axoniq.demo.giftcard.aggbypassevent

import org.axonframework.eventhandling.EventHandler
import org.axonframework.eventhandling.gateway.EventGateway
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Component
import org.springframework.web.bind.annotation.GetMapping
import org.springframework.web.bind.annotation.RequestParam
import org.springframework.web.bind.annotation.RestController

class AggregateBypassingEvent() {
    var s: String? = null

    constructor(s: String) : this() {
        this.s = s
    }

    override fun toString() = "AggregateBypassingEvent:$s"
}

@RestController
class AggregateBypassingEventController(
    @Autowired val evgw: EventGateway
) {
    @GetMapping("/foo")
    fun go(@RequestParam s: String) {
        val ev = AggregateBypassingEvent(s)
        evgw.publish(ev)
        println("!!!!! published:$ev")
    }
}

@Component
class AggregateBypassingEventHandler {
    @EventHandler
    fun on(ev: AggregateBypassingEvent) {
        println("!!!!! got:$ev")
    }
}

Thanks,
Matthew

Ok, after switching our project to using JPA for event storage, I realized that I think we were simply missing the table definition for TokenEntry objects.

Is there a standard place where the SQL database-specific schemas for commonly used SQL databases are kept? I can’t recall where I got our Postgres ones for DomainEventEntry etc came from. Besides, we’ll need the token entry & saga store schemas, unless we just rely on Hibernate-via-JPA to generate the schemas.

Perhaps the Axon schema generator tool can be of any use related to your last response.

When I saw no commits for 5 years, I was worried, then I bailed when I saw the message

This project started to tackle a particular presales question from an individual prospect and is currently a program which is configured by modifying its source. We’ll further develop this into a full command line utility to create DDL scripts for Axon Framework JPA entities.

:blush:

You can use the schema I have for postgres here. But I am using JSON for the payload columns.

Maybe this can also help you?