Kafka - Manually acknowledgements

Has anyone found a way to manually acknowledge consumer records? The AsynchFetcher API provides a callback with the consumer record, but I’m assuming the record has been committed already. It there any class I can extend to do what I need to? Also, if I’m wrapping my own ConsumerFactory bean inside Axon’s ConsumerFactory API and I set ‘enable.auto.commit=false’, will Axon respect it?

Hi Ryan,

To be 100% honest with you, this took me some investigation into Kafka specific logic, as my knowledge of Kafka is somewhat basic.
For example, how and when Kafka acknowledges messages is not entirely clear to me, making it potentially difficult to answer your question at all.

Regardless, I think I can give you some guidance on this, and hopefully the other way around as well if I am taking a wrong turn with my answer.
So, Ryan, please correct me if you feel I am taking a wrong angle with my response, ideally with explaining the acknowledgement process.

I believe you are on the right track with the AsynchFetcher and the callback mechanism, however whether the default Kafka Consumers and Producers auto-acknowledge is not clear to me at all.
However, given the property ‘enable.auto.commit’ you state (which I assume originates from Kafka? Would’ve been valuable to specify that in your question), I’d say setting this to ‘false’ will ensure that the messages would no be automatically committed.
To define your own callback, you do not have to extend from any class. You can simply call upon the builder of the AsyncFetcher and provide a callback through the AsyncFetcher.Builder#consumerRecordCallback method. Then I’d assume that Kafka’s API provides you the means to acknowledge a ConsumerRecord manually.

Lastly, Axon does not impose any adjustments to configuration from other pieces of software.
As the ‘enable.auto.commit’ property does not originate from Axon itself, providing your own ConsumerFactory Bean does not change the outcome of setting the property.

Hope this helps Ryan!

Cheers,
Steven

Thanks, for your response. Yes, that's a Kafka property. I'll have to discuss a path forward with my team. Maybe we'll just have a hard requirement of auto committing.

For manually acknowledging the kafka messages you need to do the following:

1> Implement ConsumerSeekAware iterface
2> Make consumer property auto-commit as false
3> Set acknowledgement property as Acknowledgement.MANUAL_IMMEDIATE

Once you got the message after processing it you can manually acknowledge it by acknowledgment.acknowledge()

Hope this help.

This is inline with a new thread I started on the topic https://groups.google.com/forum/#!topic/axonframework/XnJtTtcAlTs

@Sanjeev
I’m not clear on the suggestion that simply turning off auto-commit will be sufficient.

Correct me if I’m wrong but fo the implementation to work successfully the following logic in FetchEventsTask.java

`

while (running.get()) {
    ConsumerRecords<K, V> records = consumer.poll(timeout);
    if (logger.isDebugEnabled()) {
        logger.debug("Fetched {} records", records.count());
    }
    Collection<KafkaEventMessage> messages = new ArrayList<>(records.count());
    List<CallbackEntry<K, V>> callbacks = new ArrayList<>(records.count());
    for (ConsumerRecord<K, V> record : records) {
        converter.readKafkaMessage(record).ifPresent(eventMessage -> {
            KafkaTrackingToken nextToken = currentToken.advancedTo(record.partition(), record.offset());
            if (logger.isDebugEnabled()) {
                logger.debug("Updating token from {} -> {}", currentToken, nextToken);
            }
            currentToken = nextToken;
            messages.add(KafkaEventMessage.from(eventMessage, record, currentToken));
            callbacks.add(new CallbackEntry<>(currentToken, record));
        });
    }
    try {
        buffer.putAll(messages);
        for (CallbackEntry<K, V> c : callbacks) {
            this.callback.apply(c.record, c.token);
        }
    } catch (InterruptedException e) {
        running.set(false);
        if (logger.isDebugEnabled()) {
            logger.debug("Event producer thread was interrupted. Shutting down.", e);
        }
        Thread.currentThread().interrupt();
    }
}

`

should call consumer.commit() after the message is processed if auto-commit is set to false.