Who has a successful case of kafka

Who can help me ,Kafka could not receive messages.

Does the following example work on your machine

https://github.com/marinkobabic/axon-kafka-example

I had implemented this example. it works. but when with same example I am trying to implement saga then SagaEventHandler is not getting the event that was published by consumer

Following is my application.yml

`

axon:
eventhandling:
processors:
“[WalletProcessor]”:
source: kafkaMessageSource
mode: TRACKING
kafka:

client-id: myproducer

default-topic: testqueue
producer:
retries: 5
bootstrap-servers:

  • 127.0.0.1:9092
    transaction-id-prefix: clxtrx
    consumer:
    group-id: axon1
    bootstrap-servers:
  • 127.0.0.1:9092
    spring:
    datasource:
    url: jdbc:mysql://localhost:3306/producerdemo?createDatabaseIfNotExist=true&useSSL=false
    username: root
    password: root
    jpa:
    database-platform: org.hibernate.dialect.MySQL5Dialect
    hibernate.ddl-auto: update
    server:
    port: 9000

`

myConfig file

`

/*

  • © 2018 CREALOGIX. All rights reserved.
    */
    package com.axonkafka;

import org.axonframework.boot.autoconfig.AxonAutoConfiguration;
import org.axonframework.kafka.eventhandling.DefaultKafkaMessageConverter;
import org.axonframework.kafka.eventhandling.KafkaMessageConverter;
import org.axonframework.serialization.Serializer;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/*
This configuration is only needed because of the issue
https://github.com/AxonFramework/AxonFramework/issues/710
*/
@Configuration
@AutoConfigureAfter(AxonAutoConfiguration.class)
public class MyConfig {

@ConditionalOnMissingBean
@Bean
public KafkaMessageConverter<String, byte[]> kafkaMessageConverter(
@Qualifier(“eventSerializer”) Serializer eventSerializer) {
return new DefaultKafkaMessageConverter(eventSerializer);
}
}

`

UserSaga Class

`

package com.axonkafka.saga;

import org.axonframework.commandhandling.gateway.CommandGateway;
import org.axonframework.eventhandling.saga.EndSaga;
import org.axonframework.eventhandling.saga.SagaEventHandler;
import org.axonframework.eventhandling.saga.SagaLifecycle;
import org.axonframework.eventhandling.saga.StartSaga;
import org.axonframework.spring.stereotype.Saga;
import org.springframework.beans.factory.annotation.Autowired;

import com.axonkafka.command.CreateWalletCommand;
import com.axonkafka.events.UserCreatedEvent;
import com.axonkafka.events.WalletCreatedEvent;

@Saga
public class UserSaga {

@Autowired
private transient CommandGateway commandGateway;

private String userId;
private String name;
private String walletId;

public UserSaga() {

}

@StartSaga
@SagaEventHandler(associationProperty = “userId”)
public void on(UserCreatedEvent event) {
System.out.println(“Saga start”);
this.userId = event.getUserId();
this.name = event.getName();
commandGateway.send(new CreateWalletCommand(event.getUserId(), event.getName()));
}

@EndSaga
@SagaEventHandler(associationProperty = “userId”)
public void on(WalletCreatedEvent event) {
System.out.println(“Saga end”);
this.userId = event.getUserId();
this.walletId = event.getWalletId();
//SagaLifecycle.end();
}

public String getUserId() {
return userId;
}

public void setUserId(String userId) {
this.userId = userId;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public String getWalletId() {
return walletId;
}

public void setWalletId(String walletId) {
this.walletId = walletId;
}
}

`

Hi,

you have configured a Tracking Processor to consume events from Kafka, but the Saga is not part of that processor, currently. This configuration will give you the default SubscribingProcessor for the Saga.

You can create a SagaConfiguration bean (called userSagaConfiguration, or specify the bean name in your @Saga annotation) and configure it to track from the kafka message source. That should have the Saga read from Kafka as well.

Hope this helps.
Cheers,

Allard