We are using postgres as our event store, not Axon Server. I downloaded and ran the Gift Card demo against AxonServer, and everything works correctly: a plain, ol’ event class gets propogated correctly to listeners of that message type. However, in our application, we are getting an error due to a constraint in the sql schema that says that in table DomainEventEntry
, type
cannot be null
:
create table if not exists domainevententry
(
globalindex serial8
constraint domainevententry_pk
primary key,
aggregateidentifier varchar(255) not null,
sequencenumber numeric(19) not null,
type varchar(255) not null,
eventidentifier varchar(255) not null,
metadata bytea,
payload bytea not null,
payloadrevision varchar(255),
payloadtype varchar(255) not null,
timestamp varchar(255) not null
);
The funny thing is, it’s the Axon framework itself that’s nullifying the type of my event instance. The method appendEvents
on line 98 of JdbcEventStorageEngineStatements.java
calls asDomainEventMessage
:
public static PreparedStatement appendEvents(Connection connection,
EventSchema schema,
Class<?> dataType,
List<? extends EventMessage<?>> events,
Serializer serializer,
TimestampWriter timestampWriter)
throws SQLException {
final String sql = "INSERT INTO " + schema.domainEventTable() + " (" + schema.domainEventFields() + ") "
+ "VALUES (?,?,?,?,?,?,?,?,?)";
PreparedStatement statement = connection.prepareStatement(sql);
for (EventMessage<?> eventMessage : events) {
DomainEventMessage<?> event = asDomainEventMessage(eventMessage);
SerializedObject<?> payload = event.serializePayload(serializer, dataType);
SerializedObject<?> metaData = event.serializeMetaData(serializer, dataType);
statement.setString(1, event.getIdentifier());
statement.setString(2, event.getAggregateIdentifier());
statement.setLong(3, event.getSequenceNumber());
statement.setString(4, event.getType());
timestampWriter.writeTimestamp(statement, 5, event.getTimestamp());
statement.setString(6, payload.getType().getName());
statement.setString(7, payload.getType().getRevision());
statement.setObject(8, payload.getData());
statement.setObject(9, metaData.getData());
statement.addBatch();
}
return statement;
}
asDomainEventMessage
nullifies the type if the given instance is not instanceof DomainEventMessage
, on line 141:
private static <T> DomainEventMessage<T> asDomainEventMessage(EventMessage<T> event) {
return event instanceof DomainEventMessage<?>
? (DomainEventMessage<T>) event
: new GenericDomainEventMessage<>(null, event.getIdentifier(), 0L, event, event::getTimestamp); // THIS IS WHERE AXON NULLIFIES THE TYPE (first argument)
}
Given all this, it appears as though Axon Server can handle null
types in this situation, but when using Postgres, type
cannot be null
. I’ve tried to relax the not null
constraint in the sql schema, but that causes me to get into a repeating error situation with messages like the following:
2022-06-23 10:08:14.514 INFO 24908 --- [.site.events]-0] o.a.e.TrackingEventProcessor : Fetching Segments for Processor 'app.site.events' still failing: Could not load segments for processor [app.site.events]. Preparing for retry in 4s
{"@timestamp":"2022-06-23T10:08:14.514-05:00","@version":"1","message":"Fetching Segments for Processor 'app.site.events' still failing: Could not load segments for processor [app.site.events]. Preparing for retry in 4s","logger_name":"org.axonframework.eventhandling.TrackingEventProcessor","thread_name":"EventProcessor[app.site.events]-0","level":"INFO","level_value":20000}
My next step is to alter the Gift Card demo app to use Postgres to see if I can reproduce the error there.
Any advice given prior to my finishing that effort would be appreciated. FYI, here’s my working scenario source, added to the Gift Card demo app:
package io.axoniq.demo.giftcard.aggbypassevent
import org.axonframework.eventhandling.EventHandler
import org.axonframework.eventhandling.gateway.EventGateway
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Component
import org.springframework.web.bind.annotation.GetMapping
import org.springframework.web.bind.annotation.RequestParam
import org.springframework.web.bind.annotation.RestController
class AggregateBypassingEvent() {
var s: String? = null
constructor(s: String) : this() {
this.s = s
}
override fun toString() = "AggregateBypassingEvent:$s"
}
@RestController
class AggregateBypassingEventController(
@Autowired val evgw: EventGateway
) {
@GetMapping("/foo")
fun go(@RequestParam s: String) {
val ev = AggregateBypassingEvent(s)
evgw.publish(ev)
println("!!!!! published:$ev")
}
}
@Component
class AggregateBypassingEventHandler {
@EventHandler
fun on(ev: AggregateBypassingEvent) {
println("!!!!! got:$ev")
}
}
Thanks,
Matthew