How to Edit Committed Offset on MapR Event Store

Contributed by

7 min read

MapR Event Store for Apache Kafka, a publish-subscribe event streaming system built on top of the MapR Data Platform, uses Kafka API for application development.

In this blog post, we will discuss committed cursors and how to edit committed offset. This situation generally occurs when auto commit is enabled.

In the MapR Data Platform, a stream is a first-class entity, and topics will be created on a stream. The producer will produce messages to the topic, and consumer(s) will read messages by subscribing to that topic. By default, each topic will have one partition, and more partitions can be created if the expected volume of messages is huge and needs concurrent processing of messages by multiple consumers. Consumers can join as a group and read the messages from assigned partitions; rebalancing is done automatically.

Committed offset is the position where a consumer has confirmed messages processed. There is always one cursor per partition per consumer group. Consumers that are part of a consumer group can save the current position of their read cursor either automatically or programmatically. The saved cursor offset, or committed offset, is used to avoid resending the same records to the current consumer or a new consumer in the event of partition rebalance.

There are two ways to commit the offset: Auto Commit and Program Control Commit (Manual).

Auto commit: This is the default way to allow consumers to commit the offset. If you configure enable.auto.commit=true, then every five seconds the consumer will commit the largest offset your client received from poll(). The five-second interval is the default and is controlled by setting auto.commit.interval.ms.

Program Control Commit: The Consumer API allows developers to explicitly commit the offset. The first thing to do here is change the enable.auto.commit = false in configurations. There are two approaches to handle the manual commits: commit sync or commit async.

Image taken from Kafka: The Definitive Guide

As shown in the above diagram, if the committed offset is larger than the offset of the last message the client actually processed, all messages between the last processed offset and the committed offset will be missed by the consumer group.

In contrast, if the committed offset is smaller than the offset of the last message the client processed, the messages between the last processed offset and the committed offset will be processed twice.

Problem:

There are situations if the committed offset is larger than the offset of the client actually processed. This could be due to application failure or a customer asking to replay the consumer from the desired offset.

In the MapR environment, there is no CLI command to edit the cursor list directly and replay the stream. You might try a consumer.seek API to reprocess the client program, but this does not commit back to the desired cursor offset. Eventually, this could also lead you to the error: “ERROR StreamsListener -Cursor updates failed due to checkAndPut condition.” For more details on this error, please check this MapR community page.

Solution:

How do you edit the committed offset?

There are two steps to achieve this:

  1. Delete the cursor list using the MapR CLI command for a given stream, consumer group, topic, and partition.
maprcli stream cursor delete  -path <Stream Path>

        [ -consumergroup <Consumer Group ID> ]

        [ -topic <Topic Name> ]

        [ -partition <Partition ID> ]
  1. Run the following consumer to reset the committed offset. This program takes the desired committed offset as a parameter and polls the processing until it matches the desired offset number and commits the cursor.
package com.mapr.streams;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;

import java.util.Arrays;
import java.util.Collections;
import java.util.Properties;

public class ConsumerToSetDesiredOffset {

   public static void pollAll(String  TOPIC, KafkaConsumer consumer, Integer pollInterval, String printStr , Long offset, Integer partId) {

       Integer numRec = 0;
       Long oldOffset = 0L;
       Long newOffset = 0L;
       Integer print = 1;

       consumer.assign(Arrays.asList(new TopicPartition(TOPIC, partId)));

       TopicPartition partition0 = new TopicPartition(TOPIC, partId);

       while (true) {
           ConsumerRecords<String,String> cr = consumer.poll(pollInterval);
           numRec = cr.count();
           System.out.println("numRec: "+numRec);
           if ( numRec == 0)
               return;
           for (ConsumerRecord<String,String> record : cr) {

               newOffset = record.offset();
               if ( print == 1)
                   System.out.printf("offset = %d\n", newOffset);
               if ( oldOffset + 1 != newOffset)
                   System.out.printf("Not in seq, offset gap at Old  = %d, New = %d\n", oldOffset, newOffset);
               oldOffset = newOffset;
               if ( newOffset.equals(offset)) {

                   consumer.commitSync(Collections.singletonMap(partition0, new OffsetAndMetadata(newOffset)));

                   System.out.println("Committed at desired offset : " +offset);
                   return;
               }
           }
       }
   }
   public static void main(String[] args) {

       Integer pollInterval = 1000;
       Long offset = 0L;
       Integer partition = 0;

       if (args.length != 4) {
           System.err.println("USAGE: java -cp mapr-streams-examples-1.0-SNAPSHOT-jar-with-dependencies.jar com.mapr.streams.ConsumerToSetDesiredOffset /stream:orders test 0 5000");
           System.exit(1);
       }
       String TOPIC = args[0], groupId = args[1];
       try {
           partition = Integer.parseInt(args[2]);
       }
       catch (NumberFormatException nfe) {
           System.out.println("The 3rd argument must be an integer. i.e partition.");
           System.exit(1);
       }
       try {
           offset = Long.parseLong(args[3]);
       }
       catch (NumberFormatException nfe) {
           System.out.println("The 4th argument must be an integer. i.e desired offset");
           System.exit(1);
       }

       System.out.println("Topic to be queried is:" +  TOPIC);
       Properties props = new Properties();
       props.put("bootstrap.servers", "");
       props.put("auto.offset.reset", "earliest");
       props.put("enable.auto.commit", "false");
       props.put("max.poll.records", "1");
       System.out.print("Group ID is " + groupId);
       props.put("group.id",groupId);
       props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
       props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
       props.put("max.partition.fetch.bytes", Integer.MAX_VALUE);

       KafkaConsumer<String,String> consumer = new KafkaConsumer<String,String>(props);

       pollAll (TOPIC, consumer , pollInterval, "print", offset , partition);
       return;
   }
}

Conclusion: It is always recommended to programmatically manage the cursor offset for production-grade applications. However, when auto commit is enabled and for some reason, a developer wants to edit the cursor offset, this could be a good solution.


This blog post was published May 09, 2019.
Categories

50,000+ of the smartest have already joined!

Stay ahead of the bleeding edge...get the best of Big Data in your inbox.


Get our latest posts in your inbox

Subscribe Now