Testing Message Handlers with Test Containers

Hello Community!

I have seen questions around How do I test Axon @EventHandler and the persistent answer has been to mock the Component/Class with the event/message handler and directly pass the message instance to the mock as evidenced here Testing an @EventHandler

This has been the only viable solution until testcontainers came along. Now, we can write & run unitigration tests against real systems and dependencies, and have greater confidence in the software we ship, succinctly. Anyways, aren’t mocks sweet lies that we tell ourselves - debatable.

As I will demonstrate, I am capable of spinning up an axon server test container instance and dispatch events via the EventGateway, with a slight hiccup…

 // Container Declaration
class TestContainersInitializer : ApplicationContextInitializer<ConfigurableApplicationContext> {
        ...
        private val axon = AxonServerContainer.builder()
            .dockerImageVersion(AXON_IMAGE_VERSION)
            .enableDevMode()
            .build()
        ...
 // Start container, and inject dynamic properties
override fun initialize(applicationContext: ConfigurableApplicationContext) {
        ... 
        axon.start()
        ... 

        TestPropertySourceUtils.addInlinedPropertiesToEnvironment(
            applicationContext,
            ...
            "axon.axonserver.servers=${axon.url}",
            "axon.serializer.general=jackson",
            "axon.serializer.events=jackson",
            "axon.serializer.messages=jackson"
            ...
        )

Now, when I emit an event, I expect that my system handles it successfully as declared by the test…

@SpringBootTest
@ContextConfiguration(initializers = [TestContainersInitializer::class])
class PointReEventProcessorTest : BehaviorSpec() {
        ...
        Given("A Student") {

            val testStudent = studentService.addStudent(student)

            When("A Student Rewarded Event is Published") {

                val event = StudentRewarded(
                    studentId = testStudent.studentId,
                    rewardType = KPS.PointRewardType.DAILY_LOGIN_BONUS,
                    amount = 2.toBigInteger(),
                )

                eventPublisher.publish(event)

                Then("The Student Should Have Been Awarded with 2 Points") {

                    val rewardedStudent = studentService.getStudent(studentId = testStudent.studentId, response = Student::class.java)

                    rewardedStudent.asClue {
                        rewardedStudent.balance shouldBe BigInteger.TWO
                    }

                }
            }
        }
        ...
}

The message handler is as defined as such…

    @EventHandler
    @DisallowReplay
    fun handle(event: StudentRewarded) {
        val student = service.getStudent(studentId = event.studentId, response = Student::class.java)
        student.reward(rewardId = UUID.randomUUID(), rewardType = event.rewardType, amount = event.amount)
        service.updateStudent(student)
    }

and the event is defined as such …

data class StudentRewarded(
    val studentId: UUID,
    val rewardType: PointRewardType,
    val amount: BigInteger
)

When the test is executed, a runtime jackson deserialization exception is thrown…

Caused by: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of `KPS.StudentRewarded` (no Creators, like default constructor, exist): cannot deserialize from Object value (no delegate- or property-based Creator)
 at [Source: (byte[])"{"studentId":"997de0e0-72b8-4b5c-aa04-5a01766af03d","rewardType":"DAILY_LOGIN_BONUS","amount":2}"; line: 1, column: 2]
...

io.kotest.assertions.AssertionFailedError: ...
Expected : 2
Actual   : 0

As you can see, through the axon server, the test is capable of publishing and capturing events. Now, after combing through the codebase, it’s difficult to pinpoint the source of the exception as I think this configuration should suffice…

@Configuration
class AxonConfig {

    @Bean
    @Primary
    fun serializer(): Serializer {
        return JacksonSerializer
            .builder()
            .lenientDeserialization()
            .build()
    }
}

Could there be something I’m missing out? Your help is warmly welcome.

Thanks and Kind Regards.

UPDATE…

The issue was indeed in the configuration in that I had to register the kotlin module to the serializer’s Object Mapper…

    @Bean
    @Primary
    fun serializer(): Serializer {

        val objectMapper = ObjectMapper()

        objectMapper.registerKotlinModule()

        return JacksonSerializer
            .builder()
            .lenientDeserialization()
            .objectMapper(objectMapper)
            .build()
    }

Now, this opened up an interesting problem where the THEN assertions are executed before the event is eventually handled, leading to test failures.

My current solution is to force a Thread.sleep() of 1 second to allow changes in my read model to be propagated eventually.

So the test ends up looking like this…

        Given("A Student") {

            val testStudent = studentService.addStudent(student)

            When("A Student Rewarded Event is Published") {

                val event = StudentRewarded(
                    studentId = testStudent.studentId,
                    rewardType = DAILY_LOGIN_BONUS,
                    amount = TWO,
                )

                eventPublisher.publish(event)

                Thread.sleep(1000)

                Then("The Student Should Have Been Awarded with 2 Points") {

                    val rewardedStudent = studentService
                        .getStudent(studentId = testStudent.studentId, response = Student::class.java)

                    rewardedStudent.asClue {
                        rewardedStudent.balance shouldBe TWO
                    }

                }
            }
        }

This is not the most efficient solution but it gets the job done and demonstrates that you can effectively test your axon message handler, transaction and possibly aggregates with test containers.

Aren’t unitigrations beautiful?

UPDATE…

While this approach is feasible with Event Handlers without any cascading side effects, such as Command/Event Publishing, I have experienced halting challenges mainly in the way the Axon Framework expresses CQRS and DDD - the Unit Of Work (UoW)

See, having succeeded in testing a terminal event handler in this manner, I was inclined to take up a more interesting event handler.

The event handler naturally consumes its events from an aggregate, processes it (updates the query model) which triggers a Spring Event that then publishes events and commands as demonstrated…

// Event Processing Service
    @EventHandler
    @DisallowReplay
    fun handle(event: StudentEventRecorded) {
        runBlocking {
            val student = service.getStudent(studentId = event.studentId, response = Student::class.java)
            student.recordEvent(eventId = UUID.randomUUID(), eventType = event.eventType)
            service.updateStudent(student)
        }
    }


    @TransactionalEventListener(phase = AFTER_COMMIT)
    fun handle(event: StudentEventCommitted) {
            runBlocking {

                val recordedEvent =
                    eventService.getStudentEvent(eventId = event.eventId, response = StudentEventSummary::class.java)

                when (recordedEvent.eventType) {

                    LOGGED_IN ->
                        if (eventService.`Count Student Events of this Type Created Between Then And Now`(studentId = event.studentId, eventType = LOGGED_IN, then = recordedEvent.createdAt.minusDays(1)) == 0L)
                            eventPublisher.publish(StudentRewarded(studentId = event.studentId, rewardType = DAILY_LOGIN_BONUS, amount = ONE))
...

The event publishing operation fails spectacularly given that the UoW that handled the StudentEventRecorded Event is complete. You would then require to construct a new UoW instance…

        DefaultUnitOfWork.startAndGet(null).execute {
            runBlocking {
...

to publish the event and handle any side effects. This is undesirable, in many ways, mainly because the event should be published through your aggregate execution.

Sending commands is feasible, in that the framework will handle it in a new UoW…

...
    @TransactionalEventListener(phase = AFTER_COMMIT)
    fun handle(event: StudentEventCommitted) {

        DefaultUnitOfWork.startAndGet(null).execute {
            runBlocking {

                val recordedEvent =
                    eventService.getStudentEvent(eventId = event.eventId, response = StudentEventSummary::class.java)

                when (recordedEvent.eventType) {

                    LOGGED_IN ->
                        if (eventService.`Count Student Events of this Type Created Between Then And Now`(studentId = event.studentId, eventType = LOGGED_IN, then = recordedEvent.createdAt.minusDays(1)) == 0L)
                            commander.send<Void>(RewardStudent(studentId = event.studentId, rewardType = DAILY_LOGIN_BONUS, amount = ONE)).join()
                        else { }
...

Until you realize that your aggregate instance does not exist and, as a consequence, can not be event sourced so as to handle the command.

The natural reaction, in this test approach, would be to emit the “seeding” command, during the test set up. The test ends up looking like so…

        Given("A Student") {

            val studentId = UUID.randomUUID()

            commander.sendAndWait<Void>(OnboardStudent(studentId = studentId, emailAddress = student.emailAddress))


            When("When a StudentEventRecorded Event is Published..."){

                commander.sendAndWait<Void>(RecordStudentEvent(studentId = studentId, eventType = LOGGED_IN))

                Then("The Student Should Have Been Awarded with 1 Point"){

                    val rewardedStudent = studentService
                        .getStudent(studentId = studentId, response = Student::class.java)

                    rewardedStudent.asClue {
                        rewardedStudent.balance shouldBe ONE
                    }

                }

                    Then("The Student Should Have Been Awarded with 1 Point"){

                        val rewardedStudent = studentService
                            .getStudent(studentId = studentId, response = Student::class.java)

                        rewardedStudent.asClue {
                            rewardedStudent.balance shouldBe ONE
                        }

                    }
            } 

This Test should pass, I mean, it works in a full spring context.

Yes, it, does, the first command is sent but its state transition and aggregate are not persisted in the event store.

When the aggregate attempts to execute the second command it throws an interesting exception…

2024-01-28T16:17:30.823+03:00  WARN 44904 --- [...] [mandProcessor-1] o.a.m.command.AbstractRepository         : Exception occurred while trying to load a aggregate with identifier [9f846106-c072-4872-ab9f-1f24bf0572f5].

org.axonframework.modelling.command.AggregateNotFoundException: The aggregate was not found in the event store
...

2024-01-28T16:17:30.843+03:00  WARN 44904 --- [...] [mandProcessor-1] o.a.a.c.command.CommandSerializer        : Serializing exception [class org.axonframework.modelling.command.AggregateNotFoundException] without details.

org.axonframework.modelling.command.AggregateNotFoundException: The aggregate was not found in the event store
	at org.axonframework.eventsourcing.EventSourcingRepository.doLoadWithLock(EventSourcingRepository.java:133) ~[axon-eventsourcing-4.9.1.jar:4.9.1]
...
An exception was thrown by the remote message handling component: The aggregate was not found in the event store
AxonServerNonTransientRemoteCommandHandlingException{message=An exception was thrown by the remote message handling component: The aggregate was not found in the event store, errorCode='AXONIQ-4005', server='44904@kahiga-mac.local'}

I’m curious if there is a way to get around this problem. Solutions are welcome.

UPDATE…

Well the solution to the last problem is embarrassingly simple. There was an issue applying my initial aggregate event.

The intellij IDE allows you to call AggrgateLifecycle.apply() as apply(), only that kotlin also has the apply operator that I erroneously used.

Everything works as expected - the integration test passes, by sending events and verifying system changes as events course through.

This has been a pleasant adventure, I have been the community and contributor at the same time, how’s that?

I hope this helps you immensely, thank you for reading through my nonsense :blush:

1 Like