Setup saga for listening with Spring Cloud Stream

Hello,

There are two services:

Service 1 - Publishes “PlatformCreatedEvent” to a RMQ exchange.
Service 2 - Consumes this message to trigger a saga.

Here is the problem. The saga is not triggered since axon is not aware of this received event in Service 2. I don’t know how to configure Axon with Spring Cloud Stream with RMQ.

Since I see message being received in Service 2 in the handlePlatformCreated function, I am leaving out the configuration of Service 1.

SERVICE 2 Snippets:

class OnboardingSaga {
    private val businessId = ""

    @Autowired
    @Transient
    lateinit var commandGateway: CommandGateway

    @StartSaga
    @SagaEventHandler(associationProperty = "id", keyName = "businessId")
    fun handle(event: PlatformCreatedEvent) {
        println("I am not being triggered!")
    }
}
@Configuration
class AxonConfig {

    @Bean
    fun sagaConfiguration(): SagaConfiguration<*> {
        return SagaConfiguration.trackingSagaManager(OnboardingSaga::class.java)
    }

}
@Component
interface Service2Channels {

    @Input(ApplicationExchanges.PLATFORM_CREATED)
    fun platformCreated(): SubscribableChannel
}
@EnableBinding(Service2Channels::class)
class PlatformEventConsumer {

    @StreamListener(ApplicationExchanges.PLATFORM_CREATED)
    fun handlePlatformCreated(event: PlatformCreatedEvent) {
        println(">>>>> I am being triggered! YAY!!!)
    }
}

Thanks

Hi Altug,

the problem is with your AxonConfig class. The sagaConfiguration bean must either stick to the “convention”, or you must explicitly define the configuration’s bean name in the @Saga annotation.

The default bean name to use for a Saga class called MySaga is mySagaConfiguration.
Also, the default source for a Tracking Processor is the Event Store. If you want to use another source, you must specify that in the SagaConfiguration.

Hope this helps.
Cheers,

Allard

Hi Allard,

Thanks you for your response. I’ll try this but I have a quick follow up question on something on tangent. In Axon saga samples I see this code (Kotlin in this case):

@RabbitListener(queues=["queue name"])

Why is it queues we set but not exchange? In RMQ (AMQP) we talk to exchanges not queues.

Thanks

Hi Allard,

I realized I missed adding the AMQP config snippet in Service 2:

@Configuration
class AmqpConfig {
    @Bean
    fun springAMQPMessageSource(serializer: Serializer): SpringAMQPMessageSource {
        return object : SpringAMQPMessageSource(serializer) {
//            @RabbitListener(queues = [(ApplicationExchanges.PLATFORM_CREATED)])
            @StreamListener(ApplicationExchanges.PLATFORM_CREATED)
            override fun onMessage(message: Message, channel: Channel) {
                super.onMessage(message, channel)
            }
        }
    }
}

I updated the AxonConfig as you suggested.

@Configuration
class AxonConfig {

    @Bean
    fun onboardingSagaConfiguration(): SagaConfiguration<*> {
        return SagaConfiguration.trackingSagaManager(OnboardingSaga::class.java)
    }

}

I see the message is received but I get serialization exception for the received event

data class PlatformCreatedEvent(
        var id: PlatformId,
        var name: String = "",
        var awsAccessKey: String = "",
        var awsSecretKey: String = ""
)
data class PlatformId(var id: String = IdentifierFactory.getInstance().generateIdentifier()) {
    override fun toString(): String {
        return id
    }

    fun identifier(): String {
        return id
    }
}

The exception is:

2018-03-05 20:42:31.704 WARN 49059 --- [kGqR36QeUMKjw-1] ingErrorHandler$DefaultExceptionStrategy : Fatal message conversion error; message rejected; it will be dropped or routed to a dead letter exchange, if so configured: (Body:'{"id":{"id":"74b91da7-41e7-4f53-97f7-4ebec8b27e67"},"name":"homer","awsAccessKey":"string","awsSecretKey":"string"}' MessageProperties [headers={contentType=text/plain, originalContentType=application/json;charset=UTF-8}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=text/plain, contentEncoding=null, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=wd-platform-out-platform-created, receivedRoutingKey=wd-platform-out-platform-created, receivedDelay=null, deliveryTag=1, messageCount=0, consumerTag=amq.ctag-9Y6XUpLYSwN0hd2GjlDexw, consumerQueue=wd-platform-out-platform-created.anonymous.v0UkXL4fRkGqR36QeUMKjw])

Issue seems to be with PlatformId (id property of PlatformCreatedEvent)

Body:’{“id”:{“id”:“74b91da7-41e7-4f53-97f7-4ebec8b27e67”},“name”:“homer”,“awsAccessKey”:“string”,“awsSecretKey”:“string”}’

id is set to “id: { uuid }” instead of “{id: uuid}”

The spring cloud stream content type is set to json on both service 1 and 2.

spring.cloud.stream.default.contentType=application/json

Any ideas why the serialization fails?

Thank you

Hi Altug,

You might want to try the @JsonValue @JsonProperty annotations on the getter of the id field in your PlatformId class.

I believe that should make it so that your id will not be wrapped in another id{...} field.

Any how my hunch is that the solution is somewhere in Jackson annotations rather than Axon Framework.

Cheers,

Steven