@Gerard thanks for the replay I m using the Spring version 1.5.10.RELEASE. I guess i need to update to version 2 of spring boot to make it work. Below are the configurations. Any suggestion for the changes?
@SuppressWarnings(“rawtypes”)
@Bean
public Map consumerConfigs() {
// SASL related properties
String jaasCfg = String.format(jaasTemplate, userName, password);
KafkaConfigurationBuilder withProperty = KafkaConfigurationBuilder.defaultConsumer()
.bootstrapServers(bootstrapServers).withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(ByteArrayDeserializer.class).group(groupId)
.withProperty(“security.protocol”, securityProtocol) // SASL_SSL for SASL with SSL mode
.withProperty(“sasl.mechanism”, saslMechanism).withProperty(“sasl.jaas.config”, jaasCfg)
// default do not do server hostname validation with server certificates
.withProperty(“ssl.endpoint.identification.algorithm”, “”);
if (securityProtocol.equals("SASL_SSL")) {
withProperty.withProperty("ssl.truststore.location", sslTrustStoreLocation)
.withProperty("ssl.truststore.password", sslTrustStorePassword);
}
return withProperty.build();
}
/**
* defaultKafkaConsumerFactory with consumer config
* @return
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
@Bean
public DefaultKafkaConsumerFactory consumerFactory() {
return new DefaultKafkaConsumerFactory(consumerConfigs());
}
/**
* this methd is used to register subscribing event processor under pkg root.
*/
@Autowired
public void configure(EventProcessingConfiguration config, Configurer configurer, DefaultSubscribableEventSource source) throws Exception {
// final String packageName = AccountEventHandlers.class.getPackage().getName();
config.registerSubscribingEventProcessor(EVENT_PACKAGE_ROOT, c → source);
}
/**
* DefaultSubscribableEventSource
* @return
*/
@Bean
public DefaultSubscribableEventSource source() {
return new DefaultSubscribableEventSource() {
private final List<Consumer<List<? extends EventMessage<?>>>> eventProcessors = new CopyOnWriteArrayList<>();
@Override
public List<Consumer<List<? extends EventMessage<?>>>> getEventProcessors() {
return eventProcessors;
}
};
}
/**
* ConcurrentKafkaListenerContainerFactory with consumerFactory
* @return
*/
@Bean
public ConcurrentKafkaListenerContainerFactory<String, byte[]> kafkaListenerContainerFactory() {
final ConcurrentKafkaListenerContainerFactory<String, byte[]> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
/** kafka message receiver
* this method is used to receive kafka msgs
*/
@Bean
public KafkaMessageSource receiver(Serializer serializer, DefaultSubscribableEventSource source) {
return new KafkaMessageSource(serializer, source) {
@KafkaListener(topics = "${eventsourcing.topic}", group = "${eventsourcing.group.id}")
public void receive(final ConsumerRecord<String, byte[]> record) {
log.info("Received message of size {}", record.value().length);
super.receive(record.value(), isLogEnabled);
}
};
}
KafkaConfigurationBuilder.java
public abstract class KafkaConfigurationBuilder {
protected final Properties properties = new Properties();
public static ProducerConfiguration defaultProducer() {
return defaultProducer(new Properties());
}
public static ProducerConfiguration defaultProducer(final Properties properties) {
final ProducerConfiguration builder = new ProducerConfiguration();
builder.withKeySerializer(StringSerializer.class);
builder.withValueSerializer(StringSerializer.class);
builder.properties.put("acks", "all");
builder.properties.put("batch.size", 16384);
builder.properties.put("linger.ms", 1);
builder.properties.put("buffer.memory", 33554432);
fill(properties, builder.properties);
return builder;
}
public static ConsumerConfiguration defaultConsumer() {
return defaultConsumer(new Properties());
}
public static ConsumerConfiguration defaultConsumer(final Properties properties) {
final ConsumerConfiguration builder = new ConsumerConfiguration();
builder.withKeyDeserializer(StringDeserializer.class);
builder.withValueDeserializer(StringDeserializer.class);
builder.properties.put("enable.auto.commit", "true");
builder.properties.put("auto.commit.interval.ms", "1000");
fill(properties, builder.properties);
return builder;
}
public KafkaConfigurationBuilder withProperty(final String propertyName, final String propertyValue) {
if (propertyValue != null) {
properties.put(propertyName, propertyValue);
}
return this;
}
public KafkaConfigurationBuilder withSystemProperty(final String propertyName, final String systemPropertyName) {
final String propertyValue = System.getProperty(systemPropertyName);
if (propertyValue != null) {
properties.put(propertyName, propertyValue);
}
return this;
}
public Properties build() {
validate();
return properties;
}
abstract void validate();
public Map<String, Object> asMap() {
final Map<String, Object> result = new HashMap<String, Object>();
properties.keySet().stream().forEach(key -> result.put((String) key, properties.get(key)));
return result;
}
public static class ConsumerConfiguration extends KafkaConfigurationBuilder {
public ConsumerConfiguration withKeyDeserializer(final Class<? extends Deserializer<?>> clazz) {
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, clazz.getName());
return this;
}
public ConsumerConfiguration withValueDeserializer(final Class<? extends Deserializer<?>> clazz) {
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, clazz.getName());
return this;
}
public ConsumerConfiguration bootstrapServers(final String bootstrapServers) {
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
return this;
}
public ConsumerConfiguration group(final String group) {
properties.put(ConsumerConfig.GROUP_ID_CONFIG, group);
return this;
}
@Override
void validate() {
Assert.notNull(properties.get(ConsumerConfig.GROUP_ID_CONFIG), () -> "Group must be set.");
Assert.notNull(properties.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG),
() -> "Bootstrap servers must be set.");
}
}
public static class ProducerConfiguration extends KafkaConfigurationBuilder {
public ProducerConfiguration withKeySerializer(final Class<? extends Serializer> clazz) {
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, clazz.getName());
return this;
}
public ProducerConfiguration withValueSerializer(final Class<? extends Serializer> clazz) {
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, clazz.getName());
return this;
}
public ProducerConfiguration bootstrapServers(final String bootstrapServers) {
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
return this;
}
@Override
void validate() {
Assert.notNull(properties.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG),
() -> "Bootstrap servers must be set.");
}
}
private static void fill(final Properties source, final Properties target) {
if (source != null && !source.isEmpty() && target != null) {
source.forEach((key, value) -> target.put(key, value));
}
}
}