Error creating bean with name 'kafkaPublisher' defined in class path resource [org/axonframework/boot/autoconfig/KafkaAutoConfiguration.class]: Unsatisfied dependency expressed through method 'kafkaPublisher' parameter 0

While upgarding from 3.3.6 to 3.4.4 getting an error for kafka configuration

org.springframework.beans.factory.UnsatisfiedDependencyException: Error creating bean with name ‘kafkaPublisher’ defined in class path resource [org/axonframework/boot/autoconfig/KafkaAutoConfiguration.class]: Unsatisfied dependency expressed through method ‘kafkaPublisher’ parameter 0; nested exception is org.springframework.beans.factory.NoSuchBeanDefinitionException: No qualifying bean of type ‘org.axonframework.kafka.eventhandling.producer.ProducerFactory<java.lang.String, byte>’ available: expected at least 1 bean which qualifies as autowire candidate. Dependency annotations: {}

Any idea which additional configuration is needed

Something like this. Please note that the Kafka extension is tested with Axon framework 4 only. There also have been some changes in what is and what’s not auto wired over time. But I don’t really see how changing the axon version would be a cause for changing the configuration for the Kafka extension.

@Gerard thanks for the replay I m using the Spring version 1.5.10.RELEASE. I guess i need to update to version 2 of spring boot to make it work. Below are the configurations. Any suggestion for the changes?

@SuppressWarnings(“rawtypes”)
@Bean
public Map consumerConfigs() {
// SASL related properties
String jaasCfg = String.format(jaasTemplate, userName, password);
KafkaConfigurationBuilder withProperty = KafkaConfigurationBuilder.defaultConsumer()
.bootstrapServers(bootstrapServers).withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(ByteArrayDeserializer.class).group(groupId)
.withProperty(“security.protocol”, securityProtocol) // SASL_SSL for SASL with SSL mode
.withProperty(“sasl.mechanism”, saslMechanism).withProperty(“sasl.jaas.config”, jaasCfg)
// default do not do server hostname validation with server certificates
.withProperty(“ssl.endpoint.identification.algorithm”, “”);

	if (securityProtocol.equals("SASL_SSL")) {
		
		withProperty.withProperty("ssl.truststore.location", sslTrustStoreLocation)
		.withProperty("ssl.truststore.password", sslTrustStorePassword);
	}
	return withProperty.build();
}



/**
 * defaultKafkaConsumerFactory with consumer config
 * @return
 */
@SuppressWarnings({ "rawtypes", "unchecked" })
@Bean
public DefaultKafkaConsumerFactory consumerFactory() {
	return new DefaultKafkaConsumerFactory(consumerConfigs());
}

/**
 * this methd is used to register subscribing event processor under pkg root.
 */
@Autowired
public void configure(EventProcessingConfiguration config, Configurer configurer, DefaultSubscribableEventSource source) throws Exception {

// final String packageName = AccountEventHandlers.class.getPackage().getName();
config.registerSubscribingEventProcessor(EVENT_PACKAGE_ROOT, c → source);
}

/**
 * DefaultSubscribableEventSource
 * @return
 */
@Bean
public DefaultSubscribableEventSource source() {
	return new DefaultSubscribableEventSource() {
		private final List<Consumer<List<? extends EventMessage<?>>>> eventProcessors = new CopyOnWriteArrayList<>();

		@Override
		public List<Consumer<List<? extends EventMessage<?>>>> getEventProcessors() {
			return eventProcessors;
		}
	};
}

/**
 * ConcurrentKafkaListenerContainerFactory with consumerFactory
 * @return
 */
@Bean
public ConcurrentKafkaListenerContainerFactory<String, byte[]> kafkaListenerContainerFactory() {
	final ConcurrentKafkaListenerContainerFactory<String, byte[]> factory = new ConcurrentKafkaListenerContainerFactory<>();
	factory.setConsumerFactory(consumerFactory());
	return factory;
}


/** kafka message receiver
 * this method is used to receive kafka msgs
 */
@Bean
public KafkaMessageSource receiver(Serializer serializer, DefaultSubscribableEventSource source) {
	return new KafkaMessageSource(serializer, source) {

		@KafkaListener(topics = "${eventsourcing.topic}", group = "${eventsourcing.group.id}")
		public void receive(final ConsumerRecord<String, byte[]> record) {
			log.info("Received message of size {}", record.value().length);
			super.receive(record.value(), isLogEnabled);

		}
	};
}

KafkaConfigurationBuilder.java

public abstract class KafkaConfigurationBuilder {

protected final Properties properties = new Properties();

public static ProducerConfiguration defaultProducer() {
	return defaultProducer(new Properties());
}

public static ProducerConfiguration defaultProducer(final Properties properties) {
	final ProducerConfiguration builder = new ProducerConfiguration();
	builder.withKeySerializer(StringSerializer.class);
	builder.withValueSerializer(StringSerializer.class);

	builder.properties.put("acks", "all");
	builder.properties.put("batch.size", 16384);
	builder.properties.put("linger.ms", 1);
	builder.properties.put("buffer.memory", 33554432);
	fill(properties, builder.properties);

	return builder;
}

public static ConsumerConfiguration defaultConsumer() {
	return defaultConsumer(new Properties());
}

public static ConsumerConfiguration defaultConsumer(final Properties properties) {
	final ConsumerConfiguration builder = new ConsumerConfiguration();
	builder.withKeyDeserializer(StringDeserializer.class);
	builder.withValueDeserializer(StringDeserializer.class);
	builder.properties.put("enable.auto.commit", "true");
	builder.properties.put("auto.commit.interval.ms", "1000");
	fill(properties, builder.properties);

	return builder;
}

public KafkaConfigurationBuilder withProperty(final String propertyName, final String propertyValue) {
	if (propertyValue != null) {
		properties.put(propertyName, propertyValue);
	}
	return this;
}

public KafkaConfigurationBuilder withSystemProperty(final String propertyName, final String systemPropertyName) {
	final String propertyValue = System.getProperty(systemPropertyName);
	if (propertyValue != null) {
		properties.put(propertyName, propertyValue);
	}
	return this;
}

public Properties build() {
	validate();
	return properties;
}

abstract void validate();

public Map<String, Object> asMap() {
	final Map<String, Object> result = new HashMap<String, Object>();
	properties.keySet().stream().forEach(key -> result.put((String) key, properties.get(key)));
	return result;
}

public static class ConsumerConfiguration extends KafkaConfigurationBuilder {
	public ConsumerConfiguration withKeyDeserializer(final Class<? extends Deserializer<?>> clazz) {
		properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, clazz.getName());
		return this;
	}

	public ConsumerConfiguration withValueDeserializer(final Class<? extends Deserializer<?>> clazz) {
		properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, clazz.getName());
		return this;
	}

	public ConsumerConfiguration bootstrapServers(final String bootstrapServers) {
		properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
		return this;
	}

	public ConsumerConfiguration group(final String group) {
		properties.put(ConsumerConfig.GROUP_ID_CONFIG, group);
		return this;
	}

	@Override
	void validate() {
		Assert.notNull(properties.get(ConsumerConfig.GROUP_ID_CONFIG), () -> "Group must be set.");
		Assert.notNull(properties.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG),
				() -> "Bootstrap servers must be set.");
	}
}

public static class ProducerConfiguration extends KafkaConfigurationBuilder {

	public ProducerConfiguration withKeySerializer(final Class<? extends Serializer> clazz) {
		properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, clazz.getName());
		return this;
	}

	public ProducerConfiguration withValueSerializer(final Class<? extends Serializer> clazz) {
		properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, clazz.getName());
		return this;
	}

	public ProducerConfiguration bootstrapServers(final String bootstrapServers) {
		properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
		return this;
	}

	@Override
	void validate() {
		Assert.notNull(properties.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG),
				() -> "Bootstrap servers must be set.");
	}
}

private static void fill(final Properties source, final Properties target) {
	if (source != null && !source.isEmpty() && target != null) {
		source.forEach((key, value) -> target.put(key, value));
	}
}

}

Something that looks off, is using String serializers for the value, as the default converter is expecting a byte value and not String. So I would expect the Byte serializers to be used for the value.

Are there any particular errors you see?

its runtime error, but when I revert to 3.3.6 it works perfectly fine.

Error creating bean with name ‘kafkaPublisher’ defined in class path resource [org/axonframework/boot/autoconfig/KafkaAutoConfiguration.class]: Unsatisfied dependency expressed through method ‘kafkaPublisher’ parameter 0

So from the error, I think you need to define the KafkaProducerFactory like in the link. But in your case it should be typed as <String,String>. Not quite sure why the Axon Framework version would make a difference tbh.