OSDir


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GitHub] fjy closed pull request #6012: [Backport] Fix NPE while handling CheckpointNotice in KafkaSupervisor


fjy closed pull request #6012: [Backport] Fix NPE while handling CheckpointNotice in KafkaSupervisor
URL: https://github.com/apache/incubator-druid/pull/6012
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIOConfig.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIOConfig.java
index 5d48fe19405..4878da24df0 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIOConfig.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIOConfig.java
@@ -26,6 +26,7 @@
 import io.druid.segment.indexing.IOConfig;
 import org.joda.time.DateTime;
 
+import javax.annotation.Nullable;
 import java.util.Map;
 
 public class KafkaIOConfig implements IOConfig
@@ -33,6 +34,8 @@
   private static final boolean DEFAULT_USE_TRANSACTION = true;
   private static final boolean DEFAULT_SKIP_OFFSET_GAPS = false;
 
+  @Nullable
+  private final Integer taskGroupId;
   private final String baseSequenceName;
   private final KafkaPartitions startPartitions;
   private final KafkaPartitions endPartitions;
@@ -44,6 +47,7 @@
 
   @JsonCreator
   public KafkaIOConfig(
+      @JsonProperty("taskGroupId") @Nullable Integer taskGroupId, // can be null for backward compabitility
       @JsonProperty("baseSequenceName") String baseSequenceName,
       @JsonProperty("startPartitions") KafkaPartitions startPartitions,
       @JsonProperty("endPartitions") KafkaPartitions endPartitions,
@@ -54,6 +58,7 @@ public KafkaIOConfig(
       @JsonProperty("skipOffsetGaps") Boolean skipOffsetGaps
   )
   {
+    this.taskGroupId = taskGroupId;
     this.baseSequenceName = Preconditions.checkNotNull(baseSequenceName, "baseSequenceName");
     this.startPartitions = Preconditions.checkNotNull(startPartitions, "startPartitions");
     this.endPartitions = Preconditions.checkNotNull(endPartitions, "endPartitions");
@@ -83,6 +88,13 @@ public KafkaIOConfig(
     }
   }
 
+  @Nullable
+  @JsonProperty
+  public Integer getTaskGroupId()
+  {
+    return taskGroupId;
+  }
+
   @JsonProperty
   public String getBaseSequenceName()
   {
@@ -135,7 +147,8 @@ public boolean isSkipOffsetGaps()
   public String toString()
   {
     return "KafkaIOConfig{" +
-           "baseSequenceName='" + baseSequenceName + '\'' +
+           "taskGroupId=" + taskGroupId +
+           ", baseSequenceName='" + baseSequenceName + '\'' +
            ", startPartitions=" + startPartitions +
            ", endPartitions=" + endPartitions +
            ", consumerProperties=" + consumerProperties +
diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
index 16a67ec6b06..5f039c0336b 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
@@ -702,12 +702,13 @@ public void onFailure(Throwable t)
                 sequences
             );
             requestPause();
-            if (!toolbox.getTaskActionClient().submit(new CheckPointDataSourceMetadataAction(
+            final CheckPointDataSourceMetadataAction checkpointAction = new CheckPointDataSourceMetadataAction(
                 getDataSource(),
-                ioConfig.getBaseSequenceName(),
+                ioConfig.getTaskGroupId(),
                 new KafkaDataSourceMetadata(new KafkaPartitions(topic, sequenceToCheckpoint.getStartOffsets())),
                 new KafkaDataSourceMetadata(new KafkaPartitions(topic, nextOffsets))
-            ))) {
+            );
+            if (!toolbox.getTaskActionClient().submit(checkpointAction)) {
               throw new ISE("Checkpoint request with offsets [%s] failed, dying", nextOffsets);
             }
           }
diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index 454deffe886..cb3352d9402 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -143,8 +143,7 @@
    * time, there should only be up to a maximum of [taskCount] actively-reading task groups (tracked in the [taskGroups]
    * map) + zero or more pending-completion task groups (tracked in [pendingCompletionTaskGroups]).
    */
-  @VisibleForTesting
-  static class TaskGroup
+  private static class TaskGroup
   {
     // This specifies the partitions and starting offsets for this task group. It is set on group creation from the data
     // in [partitionGroups] and never changes during the lifetime of this task group, which will live until a task in
@@ -159,7 +158,7 @@
     DateTime completionTimeout; // is set after signalTasksToFinish(); if not done by timeout, take corrective action
     final TreeMap<Integer, Map<Integer, Long>> sequenceOffsets = new TreeMap<>();
 
-    public TaskGroup(
+    TaskGroup(
         ImmutableMap<Integer, Long> partitionOffsets,
         Optional<DateTime> minimumMessageTime,
         Optional<DateTime> maximumMessageTime
@@ -171,7 +170,7 @@ public TaskGroup(
       this.sequenceOffsets.put(0, partitionOffsets);
     }
 
-    public int addNewCheckpoint(Map<Integer, Long> checkpoint)
+    int addNewCheckpoint(Map<Integer, Long> checkpoint)
     {
       sequenceOffsets.put(sequenceOffsets.lastKey() + 1, checkpoint);
       return sequenceOffsets.lastKey();
@@ -212,9 +211,6 @@ public int addNewCheckpoint(Map<Integer, Long> checkpoint)
   private final ConcurrentHashMap<Integer, ConcurrentHashMap<Integer, Long>> partitionGroups = new ConcurrentHashMap<>();
   // --------------------------------------------------------
 
-  // BaseSequenceName -> TaskGroup
-  private final ConcurrentHashMap<String, TaskGroup> sequenceTaskGroup = new ConcurrentHashMap<>();
-
   private final TaskStorage taskStorage;
   private final TaskMaster taskMaster;
   private final IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator;
@@ -494,13 +490,9 @@ public void reset(DataSourceMetadata dataSourceMetadata)
   }
 
   @Override
-  public void checkpoint(
-      String sequenceName,
-      DataSourceMetadata previousCheckpoint,
-      DataSourceMetadata currentCheckpoint
-  )
+  public void checkpoint(int taskGroupId, DataSourceMetadata previousCheckpoint, DataSourceMetadata currentCheckpoint)
   {
-    Preconditions.checkNotNull(sequenceName, "Cannot checkpoint without a sequence name");
+    Preconditions.checkNotNull(previousCheckpoint, "previousCheckpoint");
     Preconditions.checkNotNull(currentCheckpoint, "current checkpoint cannot be null");
     Preconditions.checkArgument(
         ioConfig.getTopic()
@@ -511,12 +503,14 @@ public void checkpoint(
         ((KafkaDataSourceMetadata) currentCheckpoint).getKafkaPartitions().getTopic()
     );
 
-    log.info("Checkpointing [%s] for sequence [%s]", currentCheckpoint, sequenceName);
-    notices.add(new CheckpointNotice(
-        sequenceName,
-        (KafkaDataSourceMetadata) previousCheckpoint,
-        (KafkaDataSourceMetadata) currentCheckpoint
-    ));
+    log.info("Checkpointing [%s] for taskGroup [%s]", currentCheckpoint, taskGroupId);
+    notices.add(
+        new CheckpointNotice(
+            taskGroupId,
+            (KafkaDataSourceMetadata) previousCheckpoint,
+            (KafkaDataSourceMetadata) currentCheckpoint
+        )
+    );
   }
 
   public void possiblyRegisterListener()
@@ -618,17 +612,17 @@ public void handle()
 
   private class CheckpointNotice implements Notice
   {
-    final String sequenceName;
+    final int taskGroupId;
     final KafkaDataSourceMetadata previousCheckpoint;
     final KafkaDataSourceMetadata currentCheckpoint;
 
     CheckpointNotice(
-        String sequenceName,
+        int taskGroupId,
         KafkaDataSourceMetadata previousCheckpoint,
         KafkaDataSourceMetadata currentCheckpoint
     )
     {
-      this.sequenceName = sequenceName;
+      this.taskGroupId = taskGroupId;
       this.previousCheckpoint = previousCheckpoint;
       this.currentCheckpoint = currentCheckpoint;
     }
@@ -639,17 +633,12 @@ public void handle() throws ExecutionException, InterruptedException, TimeoutExc
       // check for consistency
       // if already received request for this sequenceName and dataSourceMetadata combination then return
 
-      Preconditions.checkNotNull(
-          sequenceTaskGroup.get(sequenceName),
-          "WTH?! cannot find task group for this sequence [%s], sequencesTaskGroup map [%s], taskGroups [%s]",
-          sequenceName,
-          sequenceTaskGroup,
-          taskGroups
-      );
-      final TreeMap<Integer, Map<Integer, Long>> checkpoints = sequenceTaskGroup.get(sequenceName).sequenceOffsets;
+      final TaskGroup taskGroup = taskGroups.get(taskGroupId);
+
+      if (isValidTaskGroup(taskGroup)) {
+        final TreeMap<Integer, Map<Integer, Long>> checkpoints = taskGroup.sequenceOffsets;
 
-      // check validity of previousCheckpoint if it is not null
-      if (previousCheckpoint != null) {
+        // check validity of previousCheckpoint
         int index = checkpoints.size();
         for (int sequenceId : checkpoints.descendingKeySet()) {
           Map<Integer, Long> checkpoint = checkpoints.get(sequenceId);
@@ -666,26 +655,39 @@ public void handle() throws ExecutionException, InterruptedException, TimeoutExc
           log.info("Already checkpointed with offsets [%s]", checkpoints.lastEntry().getValue());
           return;
         }
-      } else {
-        // There cannot be more than one checkpoint when previous checkpoint is null
-        // as when the task starts they are sent existing checkpoints
-        Preconditions.checkState(
-            checkpoints.size() <= 1,
-            "Got checkpoint request with null as previous check point, however found more than one checkpoints"
+        final int taskGroupId = getTaskGroupIdForPartition(
+            currentCheckpoint.getKafkaPartitions()
+                             .getPartitionOffsetMap()
+                             .keySet()
+                             .iterator()
+                             .next()
         );
-        if (checkpoints.size() == 1) {
-          log.info("Already checkpointed with dataSourceMetadata [%s]", checkpoints.get(0));
-          return;
+        final Map<Integer, Long> newCheckpoint = checkpointTaskGroup(taskGroupId, false).get();
+        taskGroups.get(taskGroupId).addNewCheckpoint(newCheckpoint);
+        log.info("Handled checkpoint notice, new checkpoint is [%s] for taskGroup [%s]", newCheckpoint, taskGroupId);
+      }
+    }
+
+    private boolean isValidTaskGroup(@Nullable TaskGroup taskGroup)
+    {
+      if (taskGroup == null) {
+        // taskGroup might be in pendingCompletionTaskGroups or partitionGroups
+        if (pendingCompletionTaskGroups.containsKey(taskGroupId)) {
+          log.warn(
+              "Ignoring checkpoint request because taskGroup[%d] has already stopped indexing and is waiting for "
+              + "publishing segments",
+              taskGroupId
+          );
+          return false;
+        } else if (partitionGroups.containsKey(taskGroupId)) {
+          log.warn("Ignoring checkpoint request because taskGroup[%d] is inactive", taskGroupId);
+          return false;
+        } else {
+          throw new ISE("WTH?! cannot find taskGroup [%s] among all taskGroups [%s]", taskGroupId, taskGroups);
         }
       }
-      final int taskGroupId = getTaskGroupIdForPartition(currentCheckpoint.getKafkaPartitions()
-                                                                          .getPartitionOffsetMap()
-                                                                          .keySet()
-                                                                          .iterator()
-                                                                          .next());
-      final Map<Integer, Long> newCheckpoint = checkpointTaskGroup(taskGroupId, false).get();
-      sequenceTaskGroup.get(sequenceName).addNewCheckpoint(newCheckpoint);
-      log.info("Handled checkpoint notice, new checkpoint is [%s] for sequence [%s]", newCheckpoint, sequenceName);
+
+      return true;
     }
   }
 
@@ -699,7 +701,6 @@ void resetInternal(DataSourceMetadata dataSourceMetadata)
       taskGroups.values().forEach(this::killTasksInGroup);
       taskGroups.clear();
       partitionGroups.clear();
-      sequenceTaskGroup.clear();
     } else if (!(dataSourceMetadata instanceof KafkaDataSourceMetadata)) {
       throw new IAE("Expected KafkaDataSourceMetadata but found instance of [%s]", dataSourceMetadata.getClass());
     } else {
@@ -759,8 +760,7 @@ void resetInternal(DataSourceMetadata dataSourceMetadata)
           resetKafkaMetadata.getKafkaPartitions().getPartitionOffsetMap().keySet().forEach(partition -> {
             final int groupId = getTaskGroupIdForPartition(partition);
             killTaskGroupForPartitions(ImmutableSet.of(partition));
-            final TaskGroup removedGroup = taskGroups.remove(groupId);
-            sequenceTaskGroup.remove(generateSequenceName(removedGroup));
+            taskGroups.remove(groupId);
             partitionGroups.get(groupId).replaceAll((partitionId, offset) -> NOT_SET);
           });
         } else {
@@ -936,9 +936,10 @@ private void updatePartitionDataFromKafka()
     for (int partition = 0; partition < numPartitions; partition++) {
       int taskGroupId = getTaskGroupIdForPartition(partition);
 
-      partitionGroups.putIfAbsent(taskGroupId, new ConcurrentHashMap<Integer, Long>());
-
-      ConcurrentHashMap<Integer, Long> partitionMap = partitionGroups.get(taskGroupId);
+      ConcurrentHashMap<Integer, Long> partitionMap = partitionGroups.computeIfAbsent(
+          taskGroupId,
+          k -> new ConcurrentHashMap<>()
+      );
 
       // The starting offset for a new partition in [partitionGroups] is initially set to NOT_SET; when a new task group
       // is created and is assigned partitions, if the offset in [partitionGroups] is NOT_SET it will take the starting
@@ -1068,23 +1069,21 @@ public Boolean apply(KafkaIndexTask.Status status)
                             }
                             return false;
                           } else {
-                            final TaskGroup taskGroup = new TaskGroup(
-                                ImmutableMap.copyOf(
-                                    kafkaTask.getIOConfig()
-                                             .getStartPartitions()
-                                             .getPartitionOffsetMap()
-                                ), kafkaTask.getIOConfig().getMinimumMessageTime(),
-                                kafkaTask.getIOConfig().getMaximumMessageTime()
-                            );
-                            if (taskGroups.putIfAbsent(
+                            final TaskGroup taskGroup = taskGroups.computeIfAbsent(
                                 taskGroupId,
-                                taskGroup
-                            ) == null) {
-                              sequenceTaskGroup.put(generateSequenceName(taskGroup), taskGroups.get(taskGroupId));
-                              log.info("Created new task group [%d]", taskGroupId);
-                            }
+                                k -> {
+                                  log.info("Creating a new task group for taskGroupId[%d]", taskGroupId);
+                                  return new TaskGroup(
+                                      ImmutableMap.copyOf(
+                                          kafkaTask.getIOConfig().getStartPartitions().getPartitionOffsetMap()
+                                      ),
+                                      kafkaTask.getIOConfig().getMinimumMessageTime(),
+                                      kafkaTask.getIOConfig().getMaximumMessageTime()
+                                  );
+                                }
+                            );
                             taskGroupsToVerify.add(taskGroupId);
-                            taskGroups.get(taskGroupId).tasks.putIfAbsent(taskId, new TaskData());
+                            taskGroup.tasks.putIfAbsent(taskId, new TaskData());
                           }
                         }
                         return true;
@@ -1237,7 +1236,6 @@ public void onFailure(Throwable t)
       // killing all tasks or no task left in the group ?
       // clear state about the taskgroup so that get latest offset information is fetched from metadata store
       log.warn("Clearing task group [%d] information as no valid tasks left the group", groupId);
-      sequenceTaskGroup.remove(generateSequenceName(taskGroup));
       taskGroups.remove(groupId);
       partitionGroups.get(groupId).replaceAll((partition, offset) -> NOT_SET);
     }
@@ -1262,9 +1260,10 @@ private void addDiscoveredTaskToPendingCompletionTaskGroups(
       Map<Integer, Long> startingPartitions
   )
   {
-    pendingCompletionTaskGroups.putIfAbsent(groupId, Lists.<TaskGroup>newCopyOnWriteArrayList());
-
-    CopyOnWriteArrayList<TaskGroup> taskGroupList = pendingCompletionTaskGroups.get(groupId);
+    final CopyOnWriteArrayList<TaskGroup> taskGroupList = pendingCompletionTaskGroups.computeIfAbsent(
+        groupId,
+        k -> new CopyOnWriteArrayList<>()
+    );
     for (TaskGroup taskGroup : taskGroupList) {
       if (taskGroup.partitionOffsets.equals(startingPartitions)) {
         if (taskGroup.tasks.putIfAbsent(taskId, new TaskData()) == null) {
@@ -1392,8 +1391,7 @@ private void checkTaskDuration() throws InterruptedException, ExecutionException
       if (endOffsets != null) {
         // set a timeout and put this group in pendingCompletionTaskGroups so that it can be monitored for completion
         group.completionTimeout = DateTimes.nowUtc().plus(ioConfig.getCompletionTimeout());
-        pendingCompletionTaskGroups.putIfAbsent(groupId, Lists.<TaskGroup>newCopyOnWriteArrayList());
-        pendingCompletionTaskGroups.get(groupId).add(group);
+        pendingCompletionTaskGroups.computeIfAbsent(groupId, k -> new CopyOnWriteArrayList<>()).add(group);
 
         // set endOffsets as the next startOffsets
         for (Map.Entry<Integer, Long> entry : endOffsets.entrySet()) {
@@ -1413,7 +1411,6 @@ private void checkTaskDuration() throws InterruptedException, ExecutionException
         partitionGroups.get(groupId).replaceAll((partition, offset) -> NOT_SET);
       }
 
-      sequenceTaskGroup.remove(generateSequenceName(group));
       // remove this task group from the list of current task groups now that it has been handled
       taskGroups.remove(groupId);
     }
@@ -1437,7 +1434,8 @@ private void checkTaskDuration() throws InterruptedException, ExecutionException
           // metadata store (which will have advanced if we succeeded in publishing and will remain the same if publishing
           // failed and we need to re-ingest)
           return Futures.transform(
-              stopTasksInGroup(taskGroup), new Function<Object, Map<Integer, Long>>()
+              stopTasksInGroup(taskGroup),
+              new Function<Object, Map<Integer, Long>>()
               {
                 @Nullable
                 @Override
@@ -1606,15 +1604,15 @@ private void checkPendingCompletionTasks() throws ExecutionException, Interrupte
             log.warn("All tasks in group [%d] failed to publish, killing all tasks for these partitions", groupId);
           } else {
             log.makeAlert(
-                "No task in [%s] succeeded before the completion timeout elapsed [%s]!",
+                "No task in [%s] for taskGroup [%d] succeeded before the completion timeout elapsed [%s]!",
                 group.taskIds(),
+                groupId,
                 ioConfig.getCompletionTimeout()
             ).emit();
           }
 
           // reset partitions offsets for this task group so that they will be re-read from metadata storage
           partitionGroups.get(groupId).replaceAll((partition, offset) -> NOT_SET);
-          sequenceTaskGroup.remove(generateSequenceName(group));
           // kill all the tasks in this pending completion group
           killTasksInGroup(group);
           // set a flag so the other pending completion groups for this set of partitions will also stop
@@ -1674,7 +1672,6 @@ private void checkCurrentTaskState() throws ExecutionException, InterruptedExcep
         // be recreated with the next set of offsets
         if (taskData.status.isSuccess()) {
           futures.add(stopTasksInGroup(taskGroup));
-          sequenceTaskGroup.remove(generateSequenceName(taskGroup));
           iTaskGroups.remove();
           break;
         }
@@ -1716,7 +1713,6 @@ void createNewTasks() throws JsonProcessingException
             groupId,
             taskGroup
         );
-        sequenceTaskGroup.put(generateSequenceName(taskGroup), taskGroups.get(groupId));
       }
     }
 
@@ -1759,6 +1755,7 @@ private void createKafkaTasksForGroup(int groupId, int replicas) throws JsonProc
     DateTime maximumMessageTime = taskGroups.get(groupId).maximumMessageTime.orNull();
 
     KafkaIOConfig kafkaIOConfig = new KafkaIOConfig(
+        groupId,
         sequenceName,
         new KafkaPartitions(ioConfig.getTopic(), startPartitions),
         new KafkaPartitions(ioConfig.getTopic(), endPartitions),
@@ -1924,7 +1921,7 @@ private boolean isTaskCurrent(int taskGroupId, String taskId)
     }
   }
 
-  private ListenableFuture<?> stopTasksInGroup(TaskGroup taskGroup)
+  private ListenableFuture<?> stopTasksInGroup(@Nullable TaskGroup taskGroup)
   {
     if (taskGroup == null) {
       return Futures.immediateFuture(null);
@@ -2196,4 +2193,26 @@ Runnable updateCurrentAndLatestOffsets()
       }
     };
   }
+
+  @VisibleForTesting
+  @Nullable
+  TaskGroup removeTaskGroup(int taskGroupId)
+  {
+    return taskGroups.remove(taskGroupId);
+  }
+
+  @VisibleForTesting
+  void moveTaskGroupToPendingCompletion(int taskGroupId)
+  {
+    final TaskGroup taskGroup = taskGroups.remove(taskGroupId);
+    if (taskGroup != null) {
+      pendingCompletionTaskGroups.computeIfAbsent(taskGroupId, k -> new CopyOnWriteArrayList<>()).add(taskGroup);
+    }
+  }
+
+  @VisibleForTesting
+  int getNoticesQueueSize()
+  {
+    return notices.size();
+  }
 }
diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIOConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIOConfigTest.java
index 22a792f7e6f..91335ab8cee 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIOConfigTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIOConfigTest.java
@@ -50,6 +50,7 @@ public void testSerdeWithDefaults() throws Exception
   {
     String jsonStr = "{\n"
                      + "  \"type\": \"kafka\",\n"
+                     + "  \"taskGroupId\": 0,\n"
                      + "  \"baseSequenceName\": \"my-sequence-name\",\n"
                      + "  \"startPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n"
                      + "  \"endPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n"
@@ -82,6 +83,7 @@ public void testSerdeWithNonDefaults() throws Exception
   {
     String jsonStr = "{\n"
                      + "  \"type\": \"kafka\",\n"
+                     + "  \"taskGroupId\": 0,\n"
                      + "  \"baseSequenceName\": \"my-sequence-name\",\n"
                      + "  \"startPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n"
                      + "  \"endPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n"
@@ -118,6 +120,7 @@ public void testBaseSequenceNameRequired() throws Exception
   {
     String jsonStr = "{\n"
                      + "  \"type\": \"kafka\",\n"
+                     + "  \"taskGroupId\": 0,\n"
                      + "  \"startPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n"
                      + "  \"endPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n"
                      + "  \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n"
@@ -137,6 +140,7 @@ public void testStartPartitionsRequired() throws Exception
   {
     String jsonStr = "{\n"
                      + "  \"type\": \"kafka\",\n"
+                     + "  \"taskGroupId\": 0,\n"
                      + "  \"baseSequenceName\": \"my-sequence-name\",\n"
                      + "  \"endPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n"
                      + "  \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n"
@@ -156,6 +160,7 @@ public void testEndPartitionsRequired() throws Exception
   {
     String jsonStr = "{\n"
                      + "  \"type\": \"kafka\",\n"
+                     + "  \"taskGroupId\": 0,\n"
                      + "  \"baseSequenceName\": \"my-sequence-name\",\n"
                      + "  \"startPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n"
                      + "  \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n"
@@ -175,6 +180,7 @@ public void testConsumerPropertiesRequired() throws Exception
   {
     String jsonStr = "{\n"
                      + "  \"type\": \"kafka\",\n"
+                     + "  \"taskGroupId\": 0,\n"
                      + "  \"baseSequenceName\": \"my-sequence-name\",\n"
                      + "  \"startPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n"
                      + "  \"endPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n"
@@ -194,6 +200,7 @@ public void testStartAndEndTopicMatch() throws Exception
   {
     String jsonStr = "{\n"
                      + "  \"type\": \"kafka\",\n"
+                     + "  \"taskGroupId\": 0,\n"
                      + "  \"baseSequenceName\": \"my-sequence-name\",\n"
                      + "  \"startPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n"
                      + "  \"endPartitions\": {\"topic\":\"other\", \"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n"
@@ -214,6 +221,7 @@ public void testStartAndEndPartitionSetMatch() throws Exception
   {
     String jsonStr = "{\n"
                      + "  \"type\": \"kafka\",\n"
+                     + "  \"taskGroupId\": 0,\n"
                      + "  \"baseSequenceName\": \"my-sequence-name\",\n"
                      + "  \"startPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n"
                      + "  \"endPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":15}},\n"
@@ -234,6 +242,7 @@ public void testEndOffsetGreaterThanStart() throws Exception
   {
     String jsonStr = "{\n"
                      + "  \"type\": \"kafka\",\n"
+                     + "  \"taskGroupId\": 0,\n"
                      + "  \"baseSequenceName\": \"my-sequence-name\",\n"
                      + "  \"startPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n"
                      + "  \"endPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":15, \"1\":2}},\n"
diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java
index 4232b582217..2a852d4a8ad 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -238,17 +238,17 @@ public KafkaIndexTaskTest(boolean isIncrementalHandoffSupported)
   private static List<ProducerRecord<byte[], byte[]>> generateRecords(String topic)
   {
     return ImmutableList.of(
-        new ProducerRecord<byte[], byte[]>(topic, 0, null, JB("2008", "a", "y", 1.0f)),
-        new ProducerRecord<byte[], byte[]>(topic, 0, null, JB("2009", "b", "y", 1.0f)),
-        new ProducerRecord<byte[], byte[]>(topic, 0, null, JB("2010", "c", "y", 1.0f)),
-        new ProducerRecord<byte[], byte[]>(topic, 0, null, JB("2011", "d", "y", 1.0f)),
-        new ProducerRecord<byte[], byte[]>(topic, 0, null, JB("2011", "e", "y", 1.0f)),
-        new ProducerRecord<byte[], byte[]>(topic, 0, null, JB("246140482-04-24T15:36:27.903Z", "x", "z", 1.0f)),
-        new ProducerRecord<byte[], byte[]>(topic, 0, null, StringUtils.toUtf8("unparseable")),
-        new ProducerRecord<byte[], byte[]>(topic, 0, null, null),
-        new ProducerRecord<byte[], byte[]>(topic, 0, null, JB("2013", "f", "y", 1.0f)),
-        new ProducerRecord<byte[], byte[]>(topic, 1, null, JB("2012", "g", "y", 1.0f)),
-        new ProducerRecord<byte[], byte[]>(topic, 1, null, JB("2011", "h", "y", 1.0f))
+        new ProducerRecord<>(topic, 0, null, JB("2008", "a", "y", 1.0f)),
+        new ProducerRecord<>(topic, 0, null, JB("2009", "b", "y", 1.0f)),
+        new ProducerRecord<>(topic, 0, null, JB("2010", "c", "y", 1.0f)),
+        new ProducerRecord<>(topic, 0, null, JB("2011", "d", "y", 1.0f)),
+        new ProducerRecord<>(topic, 0, null, JB("2011", "e", "y", 1.0f)),
+        new ProducerRecord<>(topic, 0, null, JB("246140482-04-24T15:36:27.903Z", "x", "z", 1.0f)),
+        new ProducerRecord<>(topic, 0, null, StringUtils.toUtf8("unparseable")),
+        new ProducerRecord<>(topic, 0, null, null),
+        new ProducerRecord<>(topic, 0, null, JB("2013", "f", "y", 1.0f)),
+        new ProducerRecord<>(topic, 1, null, JB("2012", "g", "y", 1.0f)),
+        new ProducerRecord<>(topic, 1, null, JB("2011", "h", "y", 1.0f))
     );
   }
 
@@ -345,6 +345,7 @@ public void testRunAfterDataInserted() throws Exception
     final KafkaIndexTask task = createTask(
         null,
         new KafkaIOConfig(
+            0,
             "sequence0",
             new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
             new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
@@ -386,6 +387,7 @@ public void testRunBeforeDataInserted() throws Exception
     final KafkaIndexTask task = createTask(
         null,
         new KafkaIOConfig(
+            0,
             "sequence0",
             new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
             new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
@@ -461,6 +463,7 @@ public void testIncrementalHandOff() throws Exception
     final KafkaIndexTask task = createTask(
         null,
         new KafkaIOConfig(
+            0,
             baseSequenceName,
             startPartitions,
             endPartitions,
@@ -482,14 +485,16 @@ public void testIncrementalHandOff() throws Exception
     Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
 
     Assert.assertEquals(1, checkpointRequestsHash.size());
-    Assert.assertTrue(checkpointRequestsHash.contains(
-        Objects.hash(
-            DATA_SCHEMA.getDataSource(),
-            baseSequenceName,
-            new KafkaDataSourceMetadata(startPartitions),
-            new KafkaDataSourceMetadata(new KafkaPartitions(topic, currentOffsets))
+    Assert.assertTrue(
+        checkpointRequestsHash.contains(
+            Objects.hash(
+                DATA_SCHEMA.getDataSource(),
+                0,
+                new KafkaDataSourceMetadata(startPartitions),
+                new KafkaDataSourceMetadata(new KafkaPartitions(topic, currentOffsets))
+            )
         )
-    ));
+    );
 
     // Check metrics
     Assert.assertEquals(8, task.getFireDepartmentMetrics().processed());
@@ -528,6 +533,7 @@ public void testRunWithMinimumMessageTime() throws Exception
     final KafkaIndexTask task = createTask(
         null,
         new KafkaIOConfig(
+            0,
             "sequence0",
             new KafkaPartitions(topic, ImmutableMap.of(0, 0L)),
             new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
@@ -581,6 +587,7 @@ public void testRunWithMaximumMessageTime() throws Exception
     final KafkaIndexTask task = createTask(
         null,
         new KafkaIOConfig(
+            0,
             "sequence0",
             new KafkaPartitions(topic, ImmutableMap.of(0, 0L)),
             new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
@@ -644,6 +651,7 @@ public void testRunWithTransformSpec() throws Exception
             )
         ),
         new KafkaIOConfig(
+            0,
             "sequence0",
             new KafkaPartitions(topic, ImmutableMap.of(0, 0L)),
             new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
@@ -703,6 +711,7 @@ public void testRunOnNothing() throws Exception
     final KafkaIndexTask task = createTask(
         null,
         new KafkaIOConfig(
+            0,
             "sequence0",
             new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
             new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
@@ -743,6 +752,7 @@ public void testHandoffConditionTimeoutWhenHandoffOccurs() throws Exception
     final KafkaIndexTask task = createTask(
         null,
         new KafkaIOConfig(
+            0,
             "sequence0",
             new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
             new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
@@ -794,6 +804,7 @@ public void testHandoffConditionTimeoutWhenHandoffDoesNotOccur() throws Exceptio
     final KafkaIndexTask task = createTask(
         null,
         new KafkaIOConfig(
+            0,
             "sequence0",
             new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
             new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
@@ -844,6 +855,7 @@ public void testReportParseExceptions() throws Exception
     final KafkaIndexTask task = createTask(
         null,
         new KafkaIOConfig(
+            0,
             "sequence0",
             new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
             new KafkaPartitions(topic, ImmutableMap.of(0, 7L)),
@@ -876,6 +888,7 @@ public void testRunReplicas() throws Exception
     final KafkaIndexTask task1 = createTask(
         null,
         new KafkaIOConfig(
+            0,
             "sequence0",
             new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
             new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
@@ -889,6 +902,7 @@ public void testRunReplicas() throws Exception
     final KafkaIndexTask task2 = createTask(
         null,
         new KafkaIOConfig(
+            0,
             "sequence0",
             new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
             new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
@@ -942,6 +956,7 @@ public void testRunConflicting() throws Exception
     final KafkaIndexTask task1 = createTask(
         null,
         new KafkaIOConfig(
+            0,
             "sequence0",
             new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
             new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
@@ -955,6 +970,7 @@ public void testRunConflicting() throws Exception
     final KafkaIndexTask task2 = createTask(
         null,
         new KafkaIOConfig(
+            1,
             "sequence1",
             new KafkaPartitions(topic, ImmutableMap.of(0, 3L)),
             new KafkaPartitions(topic, ImmutableMap.of(0, 9L)),
@@ -1009,6 +1025,7 @@ public void testRunConflictingWithoutTransactions() throws Exception
     final KafkaIndexTask task1 = createTask(
         null,
         new KafkaIOConfig(
+            0,
             "sequence0",
             new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
             new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
@@ -1022,6 +1039,7 @@ public void testRunConflictingWithoutTransactions() throws Exception
     final KafkaIndexTask task2 = createTask(
         null,
         new KafkaIOConfig(
+            1,
             "sequence1",
             new KafkaPartitions(topic, ImmutableMap.of(0, 3L)),
             new KafkaPartitions(topic, ImmutableMap.of(0, 9L)),
@@ -1081,6 +1099,7 @@ public void testRunOneTaskTwoPartitions() throws Exception
     final KafkaIndexTask task = createTask(
         null,
         new KafkaIOConfig(
+            0,
             "sequence0",
             new KafkaPartitions(topic, ImmutableMap.of(0, 2L, 1, 0L)),
             new KafkaPartitions(topic, ImmutableMap.of(0, 5L, 1, 2L)),
@@ -1145,6 +1164,7 @@ public void testRunTwoTasksTwoPartitions() throws Exception
     final KafkaIndexTask task1 = createTask(
         null,
         new KafkaIOConfig(
+            0,
             "sequence0",
             new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
             new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
@@ -1158,6 +1178,7 @@ public void testRunTwoTasksTwoPartitions() throws Exception
     final KafkaIndexTask task2 = createTask(
         null,
         new KafkaIOConfig(
+            1,
             "sequence1",
             new KafkaPartitions(topic, ImmutableMap.of(1, 0L)),
             new KafkaPartitions(topic, ImmutableMap.of(1, 1L)),
@@ -1213,6 +1234,7 @@ public void testRestore() throws Exception
     final KafkaIndexTask task1 = createTask(
         null,
         new KafkaIOConfig(
+            0,
             "sequence0",
             new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
             new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
@@ -1249,6 +1271,7 @@ public void testRestore() throws Exception
     final KafkaIndexTask task2 = createTask(
         task1.getId(),
         new KafkaIOConfig(
+            0,
             "sequence0",
             new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
             new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
@@ -1300,6 +1323,7 @@ public void testRunWithPauseAndResume() throws Exception
     final KafkaIndexTask task = createTask(
         null,
         new KafkaIOConfig(
+            0,
             "sequence0",
             new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
             new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
@@ -1383,6 +1407,7 @@ public void testRunWithOffsetOutOfRangeExceptionAndPause() throws Exception
     final KafkaIndexTask task = createTask(
         null,
         new KafkaIOConfig(
+            0,
             "sequence0",
             new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
             new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
@@ -1421,6 +1446,7 @@ public void testRunWithOffsetOutOfRangeExceptionAndNextOffsetGreaterThanLeastAva
     final KafkaIndexTask task = createTask(
         null,
         new KafkaIOConfig(
+            0,
             "sequence0",
             new KafkaPartitions(topic, ImmutableMap.of(0, 200L)),
             new KafkaPartitions(topic, ImmutableMap.of(0, 500L)),
@@ -1473,6 +1499,7 @@ public void testRunContextSequenceAheadOfStartingOffsets() throws Exception
     final KafkaIndexTask task = createTask(
         null,
         new KafkaIOConfig(
+            0,
             "sequence0",
             // task should ignore these and use sequence info sent in the context
             new KafkaPartitions(topic, ImmutableMap.of(0, 0L)),
@@ -1749,18 +1776,20 @@ private void makeToolboxFactory() throws IOException
           @Override
           public boolean checkPointDataSourceMetadata(
               String supervisorId,
-              @Nullable String sequenceName,
+              int taskGroupId,
               @Nullable DataSourceMetadata previousDataSourceMetadata,
               @Nullable DataSourceMetadata currentDataSourceMetadata
           )
           {
             log.info("Adding checkpoint hash to the set");
-            checkpointRequestsHash.add(Objects.hash(
-                supervisorId,
-                sequenceName,
-                previousDataSourceMetadata,
-                currentDataSourceMetadata
-            ));
+            checkpointRequestsHash.add(
+                Objects.hash(
+                    supervisorId,
+                    taskGroupId,
+                    previousDataSourceMetadata,
+                    currentDataSourceMetadata
+                )
+            );
             return true;
           }
         }
diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index 60a47f0d58c..2add456df8e 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -19,6 +19,7 @@
 
 package io.druid.indexing.kafka.supervisor;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Charsets;
 import com.google.common.base.Optional;
@@ -29,13 +30,11 @@
 import io.druid.data.input.impl.DimensionSchema;
 import io.druid.data.input.impl.DimensionsSpec;
 import io.druid.data.input.impl.JSONParseSpec;
-import io.druid.java.util.common.parsers.JSONPathFieldSpec;
-import io.druid.java.util.common.parsers.JSONPathSpec;
 import io.druid.data.input.impl.StringDimensionSchema;
 import io.druid.data.input.impl.StringInputRowParser;
 import io.druid.data.input.impl.TimestampSpec;
-import io.druid.indexing.common.TaskInfoProvider;
 import io.druid.indexer.TaskLocation;
+import io.druid.indexing.common.TaskInfoProvider;
 import io.druid.indexing.common.TaskStatus;
 import io.druid.indexing.common.task.RealtimeIndexTask;
 import io.druid.indexing.common.task.Task;
@@ -60,6 +59,9 @@
 import io.druid.java.util.common.ISE;
 import io.druid.java.util.common.StringUtils;
 import io.druid.java.util.common.granularity.Granularities;
+import io.druid.java.util.common.parsers.JSONPathFieldSpec;
+import io.druid.java.util.common.parsers.JSONPathSpec;
+import io.druid.java.util.emitter.EmittingLogger;
 import io.druid.query.aggregation.AggregatorFactory;
 import io.druid.query.aggregation.CountAggregatorFactory;
 import io.druid.segment.TestHelper;
@@ -68,6 +70,7 @@
 import io.druid.segment.indexing.granularity.UniformGranularitySpec;
 import io.druid.segment.realtime.FireDepartment;
 import io.druid.server.metrics.DruidMonitorSchedulerConfig;
+import io.druid.server.metrics.ExceptionCapturingServiceEmitter;
 import io.druid.server.metrics.NoopServiceEmitter;
 import org.apache.curator.test.TestingCluster;
 import org.apache.kafka.clients.producer.KafkaProducer;
@@ -97,7 +100,9 @@
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
+import java.util.concurrent.TimeoutException;
 
 import static org.easymock.EasyMock.anyBoolean;
 import static org.easymock.EasyMock.anyObject;
@@ -138,6 +143,7 @@
   private KafkaIndexTaskClient taskClient;
   private TaskQueue taskQueue;
   private String topic;
+  private ExceptionCapturingServiceEmitter serviceEmitter;
 
   private static String getTopic()
   {
@@ -204,6 +210,8 @@ public void setupTest() throws Exception
     );
 
     topic = getTopic();
+    serviceEmitter = new ExceptionCapturingServiceEmitter();
+    EmittingLogger.registerEmitter(serviceEmitter);
   }
 
   @After
@@ -544,7 +552,7 @@ public void testKillIncompatibleTasks() throws Exception
     Task id1 = createKafkaIndexTask(
         "id1",
         DATASOURCE,
-        "index_kafka_testDS__some_other_sequenceName",
+        1,
         new KafkaPartitions("topic", ImmutableMap.of(0, 0L)),
         new KafkaPartitions("topic", ImmutableMap.of(0, 10L)),
         null,
@@ -555,7 +563,7 @@ public void testKillIncompatibleTasks() throws Exception
     Task id2 = createKafkaIndexTask(
         "id2",
         DATASOURCE,
-        "sequenceName-0",
+        0,
         new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)),
         new KafkaPartitions("topic", ImmutableMap.of(0, 333L, 1, 333L, 2, 333L)),
         null,
@@ -566,7 +574,7 @@ public void testKillIncompatibleTasks() throws Exception
     Task id3 = createKafkaIndexTask(
         "id3",
         DATASOURCE,
-        "index_kafka_testDS__some_other_sequenceName",
+        1,
         new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 1L)),
         new KafkaPartitions("topic", ImmutableMap.of(0, 333L, 1, 333L, 2, 330L)),
         null,
@@ -577,7 +585,7 @@ public void testKillIncompatibleTasks() throws Exception
     Task id4 = createKafkaIndexTask(
         "id4",
         "other-datasource",
-        "index_kafka_testDS_d927edff33c4b3f",
+        2,
         new KafkaPartitions("topic", ImmutableMap.of(0, 0L)),
         new KafkaPartitions("topic", ImmutableMap.of(0, 10L)),
         null,
@@ -625,7 +633,9 @@ public void testKillIncompatibleTasks() throws Exception
 
     TreeMap<Integer, Map<Integer, Long>> checkpoints = new TreeMap<>();
     checkpoints.put(0, ImmutableMap.of(0, 0L, 1, 0L, 2, 0L));
-    expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(2);
+    expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), anyBoolean()))
+        .andReturn(Futures.immediateFuture(checkpoints))
+        .times(2);
 
     replayAll();
 
@@ -643,7 +653,7 @@ public void testKillBadPartitionAssignment() throws Exception
     Task id1 = createKafkaIndexTask(
         "id1",
         DATASOURCE,
-        "sequenceName-0",
+        0,
         new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 2, 0L)),
         new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 2, Long.MAX_VALUE)),
         null,
@@ -652,7 +662,7 @@ public void testKillBadPartitionAssignment() throws Exception
     Task id2 = createKafkaIndexTask(
         "id2",
         DATASOURCE,
-        "sequenceName-1",
+        1,
         new KafkaPartitions("topic", ImmutableMap.of(1, 0L)),
         new KafkaPartitions("topic", ImmutableMap.of(1, Long.MAX_VALUE)),
         null,
@@ -661,7 +671,7 @@ public void testKillBadPartitionAssignment() throws Exception
     Task id3 = createKafkaIndexTask(
         "id3",
         DATASOURCE,
-        "sequenceName-0",
+        0,
         new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)),
         new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)),
         null,
@@ -670,7 +680,7 @@ public void testKillBadPartitionAssignment() throws Exception
     Task id4 = createKafkaIndexTask(
         "id4",
         DATASOURCE,
-        "sequenceName-0",
+        0,
         new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L)),
         new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE)),
         null,
@@ -679,7 +689,7 @@ public void testKillBadPartitionAssignment() throws Exception
     Task id5 = createKafkaIndexTask(
         "id5",
         DATASOURCE,
-        "sequenceName-0",
+        0,
         new KafkaPartitions("topic", ImmutableMap.of(1, 0L, 2, 0L)),
         new KafkaPartitions("topic", ImmutableMap.of(1, Long.MAX_VALUE, 2, Long.MAX_VALUE)),
         null,
@@ -718,8 +728,12 @@ public void testKillBadPartitionAssignment() throws Exception
     checkpoints1.put(0, ImmutableMap.of(0, 0L, 2, 0L));
     TreeMap<Integer, Map<Integer, Long>> checkpoints2 = new TreeMap<>();
     checkpoints2.put(0, ImmutableMap.of(1, 0L));
-    expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints1)).times(1);
-    expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints2)).times(1);
+    expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"), anyBoolean()))
+        .andReturn(Futures.immediateFuture(checkpoints1))
+        .times(1);
+    expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), anyBoolean()))
+        .andReturn(Futures.immediateFuture(checkpoints2))
+        .times(1);
 
     taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class));
     taskQueue.shutdown("id4");
@@ -756,10 +770,12 @@ public void testRequeueTaskWhenFailed() throws Exception
     checkpoints1.put(0, ImmutableMap.of(0, 0L, 2, 0L));
     TreeMap<Integer, Map<Integer, Long>> checkpoints2 = new TreeMap<>();
     checkpoints2.put(0, ImmutableMap.of(1, 0L));
-    expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints1))
-                                                                                        .anyTimes();
-    expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints2))
-                                                                                        .anyTimes();
+    expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), anyBoolean()))
+        .andReturn(Futures.immediateFuture(checkpoints1))
+        .anyTimes();
+    expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), anyBoolean()))
+        .andReturn(Futures.immediateFuture(checkpoints2))
+        .anyTimes();
 
     taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class));
     replayAll();
@@ -821,7 +837,7 @@ public void testRequeueAdoptedTaskWhenFailed() throws Exception
     Task id1 = createKafkaIndexTask(
         "id1",
         DATASOURCE,
-        "sequenceName-0",
+        0,
         new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 2, 0L)),
         new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 2, Long.MAX_VALUE)),
         now,
@@ -848,7 +864,9 @@ public void testRequeueAdoptedTaskWhenFailed() throws Exception
 
     TreeMap<Integer, Map<Integer, Long>> checkpoints = new TreeMap<>();
     checkpoints.put(0, ImmutableMap.of(0, 0L, 2, 0L));
-    expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(2);
+    expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"), anyBoolean()))
+        .andReturn(Futures.immediateFuture(checkpoints))
+        .times(2);
 
     taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class));
     replayAll();
@@ -869,9 +887,12 @@ public void testRequeueAdoptedTaskWhenFailed() throws Exception
     reset(taskClient);
 
     // for the newly created replica task
-    expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints))
-                                                                                        .times(2);
-    expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1);
+    expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), anyBoolean()))
+        .andReturn(Futures.immediateFuture(checkpoints))
+        .times(2);
+    expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"), anyBoolean()))
+        .andReturn(Futures.immediateFuture(checkpoints))
+        .times(1);
 
     expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of(captured.getValue())).anyTimes();
     expect(taskStorage.getStatus(iHaveFailed.getId())).andReturn(Optional.of(TaskStatus.failure(iHaveFailed.getId())));
@@ -944,10 +965,12 @@ public void testQueueNextTasksOnSuccess() throws Exception
     TreeMap<Integer, Map<Integer, Long>> checkpoints2 = new TreeMap<>();
     checkpoints2.put(0, ImmutableMap.of(1, 0L));
     // there would be 4 tasks, 2 for each task group
-    expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints1))
-                                                                                        .times(2);
-    expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints2))
-                                                                                        .times(2);
+    expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), anyBoolean()))
+        .andReturn(Futures.immediateFuture(checkpoints1))
+        .times(2);
+    expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), anyBoolean()))
+        .andReturn(Futures.immediateFuture(checkpoints2))
+        .times(2);
 
     expect(taskStorage.getActiveTasks()).andReturn(tasks).anyTimes();
     for (Task task : tasks) {
@@ -1054,10 +1077,12 @@ public void testBeginPublishAndQueueNextTasks() throws Exception
     checkpoints1.put(0, ImmutableMap.of(0, 0L, 2, 0L));
     TreeMap<Integer, Map<Integer, Long>> checkpoints2 = new TreeMap<>();
     checkpoints2.put(0, ImmutableMap.of(1, 0L));
-    expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints1))
-                                                                                        .times(2);
-    expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints2))
-                                                                                        .times(2);
+    expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), anyBoolean()))
+        .andReturn(Futures.immediateFuture(checkpoints1))
+        .times(2);
+    expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), anyBoolean()))
+        .andReturn(Futures.immediateFuture(checkpoints2))
+        .times(2);
 
     replay(taskStorage, taskRunner, taskClient, taskQueue);
 
@@ -1091,7 +1116,7 @@ public void testDiscoverExistingPublishingTask() throws Exception
     Task task = createKafkaIndexTask(
         "id1",
         DATASOURCE,
-        "sequenceName-0",
+        0,
         new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)),
         new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)),
         null,
@@ -1185,7 +1210,7 @@ public void testDiscoverExistingPublishingTaskWithDifferentPartitionAllocation()
     Task task = createKafkaIndexTask(
         "id1",
         DATASOURCE,
-        "sequenceName-0",
+        0,
         new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 2, 0L)),
         new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 2, Long.MAX_VALUE)),
         null,
@@ -1277,7 +1302,7 @@ public void testDiscoverExistingPublishingAndReadingTask() throws Exception
     Task id1 = createKafkaIndexTask(
         "id1",
         DATASOURCE,
-        "sequenceName-0",
+        0,
         new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)),
         new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)),
         null,
@@ -1287,7 +1312,7 @@ public void testDiscoverExistingPublishingAndReadingTask() throws Exception
     Task id2 = createKafkaIndexTask(
         "id2",
         DATASOURCE,
-        "sequenceName-0",
+        0,
         new KafkaPartitions("topic", ImmutableMap.of(0, 1L, 1, 2L, 2, 3L)),
         new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)),
         null,
@@ -1325,7 +1350,9 @@ public void testDiscoverExistingPublishingAndReadingTask() throws Exception
     // since id1 is publishing, so getCheckpoints wouldn't be called for it
     TreeMap<Integer, Map<Integer, Long>> checkpoints = new TreeMap<>();
     checkpoints.put(0, ImmutableMap.of(0, 1L, 1, 2L, 2, 3L));
-    expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1);
+    expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), anyBoolean()))
+        .andReturn(Futures.immediateFuture(checkpoints))
+        .times(1);
 
     replayAll();
 
@@ -1401,10 +1428,12 @@ public void testKillUnresponsiveTasksWhileGettingStartTime() throws Exception
     checkpoints1.put(0, ImmutableMap.of(0, 0L, 2, 0L));
     TreeMap<Integer, Map<Integer, Long>> checkpoints2 = new TreeMap<>();
     checkpoints2.put(0, ImmutableMap.of(1, 0L));
-    expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints1))
-                                                                                        .times(2);
-    expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints2))
-                                                                                        .times(2);
+    expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), anyBoolean()))
+        .andReturn(Futures.immediateFuture(checkpoints1))
+        .times(2);
+    expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), anyBoolean()))
+        .andReturn(Futures.immediateFuture(checkpoints2))
+        .times(2);
 
     expect(taskStorage.getActiveTasks()).andReturn(tasks).anyTimes();
     for (Task task : tasks) {
@@ -1460,10 +1489,12 @@ public void testKillUnresponsiveTasksWhilePausing() throws Exception
     checkpoints1.put(0, ImmutableMap.of(0, 0L, 2, 0L));
     TreeMap<Integer, Map<Integer, Long>> checkpoints2 = new TreeMap<>();
     checkpoints2.put(0, ImmutableMap.of(1, 0L));
-    expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints1))
-                                                                                        .times(2);
-    expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints2))
-                                                                                        .times(2);
+    expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), anyBoolean()))
+        .andReturn(Futures.immediateFuture(checkpoints1))
+        .times(2);
+    expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), anyBoolean()))
+        .andReturn(Futures.immediateFuture(checkpoints2))
+        .times(2);
 
     captured = Capture.newInstance(CaptureType.ALL);
     expect(taskStorage.getActiveTasks()).andReturn(tasks).anyTimes();
@@ -1537,10 +1568,12 @@ public void testKillUnresponsiveTasksWhileSettingEndOffsets() throws Exception
     checkpoints1.put(0, ImmutableMap.of(0, 0L, 2, 0L));
     TreeMap<Integer, Map<Integer, Long>> checkpoints2 = new TreeMap<>();
     checkpoints2.put(0, ImmutableMap.of(1, 0L));
-    expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints1))
-                                                                                        .times(2);
-    expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints2))
-                                                                                        .times(2);
+    expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), anyBoolean()))
+        .andReturn(Futures.immediateFuture(checkpoints1))
+        .times(2);
+    expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), anyBoolean()))
+        .andReturn(Futures.immediateFuture(checkpoints2))
+        .times(2);
 
     captured = Capture.newInstance(CaptureType.ALL);
     expect(taskStorage.getActiveTasks()).andReturn(tasks).anyTimes();
@@ -1619,7 +1652,7 @@ public void testStopGracefully() throws Exception
     Task id1 = createKafkaIndexTask(
         "id1",
         DATASOURCE,
-        "sequenceName-0",
+        0,
         new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)),
         new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)),
         null,
@@ -1629,7 +1662,7 @@ public void testStopGracefully() throws Exception
     Task id2 = createKafkaIndexTask(
         "id2",
         DATASOURCE,
-        "sequenceName-0",
+        0,
         new KafkaPartitions("topic", ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)),
         new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)),
         null,
@@ -1639,7 +1672,7 @@ public void testStopGracefully() throws Exception
     Task id3 = createKafkaIndexTask(
         "id3",
         DATASOURCE,
-        "sequenceName-0",
+        0,
         new KafkaPartitions("topic", ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)),
         new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)),
         null,
@@ -1675,8 +1708,12 @@ public void testStopGracefully() throws Exception
     // getCheckpoints will not be called for id1 as it is in publishing state
     TreeMap<Integer, Map<Integer, Long>> checkpoints = new TreeMap<>();
     checkpoints.put(0, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L));
-    expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1);
-    expect(taskClient.getCheckpointsAsync(EasyMock.contains("id3"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1);
+    expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), anyBoolean()))
+        .andReturn(Futures.immediateFuture(checkpoints))
+        .times(1);
+    expect(taskClient.getCheckpointsAsync(EasyMock.contains("id3"), anyBoolean()))
+        .andReturn(Futures.immediateFuture(checkpoints))
+        .times(1);
 
     taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class));
     replayAll();
@@ -1821,7 +1858,7 @@ public void testResetRunningTasks() throws Exception
     Task id1 = createKafkaIndexTask(
         "id1",
         DATASOURCE,
-        "sequenceName-0",
+        0,
         new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)),
         new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)),
         null,
@@ -1831,7 +1868,7 @@ public void testResetRunningTasks() throws Exception
     Task id2 = createKafkaIndexTask(
         "id2",
         DATASOURCE,
-        "sequenceName-0",
+        0,
         new KafkaPartitions("topic", ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)),
         new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)),
         null,
@@ -1841,7 +1878,7 @@ public void testResetRunningTasks() throws Exception
     Task id3 = createKafkaIndexTask(
         "id3",
         DATASOURCE,
-        "sequenceName-0",
+        0,
         new KafkaPartitions("topic", ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)),
         new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)),
         null,
@@ -1876,8 +1913,12 @@ public void testResetRunningTasks() throws Exception
 
     TreeMap<Integer, Map<Integer, Long>> checkpoints = new TreeMap<>();
     checkpoints.put(0, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L));
-    expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1);
-    expect(taskClient.getCheckpointsAsync(EasyMock.contains("id3"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1);
+    expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), anyBoolean()))
+        .andReturn(Futures.immediateFuture(checkpoints))
+        .times(1);
+    expect(taskClient.getCheckpointsAsync(EasyMock.contains("id3"), anyBoolean()))
+        .andReturn(Futures.immediateFuture(checkpoints))
+        .times(1);
 
     taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class));
     replayAll();
@@ -1905,7 +1946,7 @@ public void testNoDataIngestionTasks() throws Exception
     Task id1 = createKafkaIndexTask(
         "id1",
         DATASOURCE,
-        "sequenceName-0",
+        0,
         new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)),
         new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)),
         null,
@@ -1915,7 +1956,7 @@ public void testNoDataIngestionTasks() throws Exception
     Task id2 = createKafkaIndexTask(
         "id2",
         DATASOURCE,
-        "sequenceName-0",
+        0,
         new KafkaPartitions("topic", ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)),
         new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)),
         null,
@@ -1925,7 +1966,7 @@ public void testNoDataIngestionTasks() throws Exception
     Task id3 = createKafkaIndexTask(
         "id3",
         DATASOURCE,
-        "sequenceName-0",
+        0,
         new KafkaPartitions("topic", ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)),
         new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)),
         null,
@@ -1955,9 +1996,15 @@ public void testNoDataIngestionTasks() throws Exception
 
     TreeMap<Integer, Map<Integer, Long>> checkpoints = new TreeMap<>();
     checkpoints.put(0, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L));
-    expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1);
-    expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1);
-    expect(taskClient.getCheckpointsAsync(EasyMock.contains("id3"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1);
+    expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"), anyBoolean()))
+        .andReturn(Futures.immediateFuture(checkpoints))
+        .times(1);
+    expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), anyBoolean()))
+        .andReturn(Futures.immediateFuture(checkpoints))
+        .times(1);
+    expect(taskClient.getCheckpointsAsync(EasyMock.contains("id3"), anyBoolean()))
+        .andReturn(Futures.immediateFuture(checkpoints))
+        .times(1);
 
     taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class));
     replayAll();
@@ -1977,6 +2024,172 @@ public void testNoDataIngestionTasks() throws Exception
     verifyAll();
   }
 
+  @Test(timeout = 60_000L)
+  public void testCheckpointForInactiveTaskGroup()
+      throws InterruptedException, ExecutionException, TimeoutException, JsonProcessingException
+  {
+    supervisor = getSupervisor(2, 1, true, "PT1S", null, null, false);
+    //not adding any events
+    final Task id1 = createKafkaIndexTask(
+        "id1",
+        DATASOURCE,
+        0,
+        new KafkaPartitions(topic, ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)),
+        new KafkaPartitions(topic, ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)),
+        null,
+        null
+    );
+
+    final Task id2 = createKafkaIndexTask(
+        "id2",
+        DATASOURCE,
+        0,
+        new KafkaPartitions(topic, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)),
+        new KafkaPartitions(topic, ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)),
+        null,
+        null
+    );
+
+    final Task id3 = createKafkaIndexTask(
+        "id3",
+        DATASOURCE,
+        0,
+        new KafkaPartitions(topic, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)),
+        new KafkaPartitions(topic, ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)),
+        null,
+        null
+    );
+
+    expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
+    expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
+    expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes();
+    expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
+    expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes();
+    expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes();
+    expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes();
+    expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes();
+    expect(taskStorage.getTask("id3")).andReturn(Optional.of(id3)).anyTimes();
+    expect(
+        indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(new KafkaDataSourceMetadata(null)
+    ).anyTimes();
+    expect(taskClient.getStatusAsync("id1")).andReturn(Futures.immediateFuture(KafkaIndexTask.Status.READING));
+    expect(taskClient.getStatusAsync("id2")).andReturn(Futures.immediateFuture(KafkaIndexTask.Status.READING));
+    expect(taskClient.getStatusAsync("id3")).andReturn(Futures.immediateFuture(KafkaIndexTask.Status.READING));
+
+    final DateTime startTime = DateTimes.nowUtc();
+    expect(taskClient.getStartTimeAsync("id1")).andReturn(Futures.immediateFuture(startTime));
+    expect(taskClient.getStartTimeAsync("id2")).andReturn(Futures.immediateFuture(startTime));
+    expect(taskClient.getStartTimeAsync("id3")).andReturn(Futures.immediateFuture(startTime));
+
+    final TreeMap<Integer, Map<Integer, Long>> checkpoints = new TreeMap<>();
+    checkpoints.put(0, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L));
+    expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"), anyBoolean()))
+        .andReturn(Futures.immediateFuture(checkpoints))
+        .times(1);
+    expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), anyBoolean()))
+        .andReturn(Futures.immediateFuture(checkpoints))
+        .times(1);
+    expect(taskClient.getCheckpointsAsync(EasyMock.contains("id3"), anyBoolean()))
+        .andReturn(Futures.immediateFuture(checkpoints))
+        .times(1);
+
+    taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class));
+    replayAll();
+
+    supervisor.start();
+    supervisor.runInternal();
+
+    final Map<Integer, Long> fakeCheckpoints = Collections.emptyMap();
+    supervisor.moveTaskGroupToPendingCompletion(0);
+    supervisor.checkpoint(
+        0,
+        new KafkaDataSourceMetadata(new KafkaPartitions(topic, checkpoints.get(0))),
+        new KafkaDataSourceMetadata(new KafkaPartitions(topic, fakeCheckpoints))
+    );
+
+    while (supervisor.getNoticesQueueSize() > 0) {
+      Thread.sleep(100);
+    }
+
+    verifyAll();
+
+    Assert.assertNull(serviceEmitter.getStackTrace());
+    Assert.assertNull(serviceEmitter.getExceptionMessage());
+    Assert.assertNull(serviceEmitter.getExceptionClass());
+  }
+
+  @Test(timeout = 60_000L)
+  public void testCheckpointForUnknownTaskGroup() throws InterruptedException
+  {
+    supervisor = getSupervisor(2, 1, true, "PT1S", null, null, false);
+    //not adding any events
+    final Task id1 = createKafkaIndexTask(
+        "id1",
+        DATASOURCE,
+        0,
+        new KafkaPartitions(topic, ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)),
+        new KafkaPartitions(topic, ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)),
+        null,
+        null
+    );
+
+    final Task id2 = createKafkaIndexTask(
+        "id2",
+        DATASOURCE,
+        0,
+        new KafkaPartitions(topic, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)),
+        new KafkaPartitions(topic, ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)),
+        null,
+        null
+    );
+
+    final Task id3 = createKafkaIndexTask(
+        "id3",
+        DATASOURCE,
+        0,
+        new KafkaPartitions(topic, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)),
+        new KafkaPartitions(topic, ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)),
+        null,
+        null
+    );
+
+    expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
+    expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
+    expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes();
+    expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
+    expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes();
+    expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes();
+    expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes();
+    expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes();
+    expect(taskStorage.getTask("id3")).andReturn(Optional.of(id3)).anyTimes();
+    expect(
+        indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(new KafkaDataSourceMetadata(null)
+    ).anyTimes();
+
+    replayAll();
+
+    supervisor.start();
+
+    supervisor.checkpoint(
+        0,
+        new KafkaDataSourceMetadata(new KafkaPartitions(topic, Collections.emptyMap())),
+        new KafkaDataSourceMetadata(new KafkaPartitions(topic, Collections.emptyMap()))
+    );
+
+    while (supervisor.getNoticesQueueSize() > 0) {
+      Thread.sleep(100);
+    }
+
+    verifyAll();
+
+    Assert.assertNotNull(serviceEmitter.getStackTrace());
+    Assert.assertEquals(
+        "WTH?! cannot find taskGroup [0] among all taskGroups [{}]",
+        serviceEmitter.getExceptionMessage()
+    );
+    Assert.assertEquals(ISE.class, serviceEmitter.getExceptionClass());
+  }
+
   private void addSomeEvents(int numEventsPerPartition) throws Exception
   {
     try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
@@ -2101,7 +2314,7 @@ private static DataSchema getDataSchema(String dataSource)
   private KafkaIndexTask createKafkaIndexTask(
       String id,
       String dataSource,
-      String sequenceName,
+      int taskGroupId,
       KafkaPartitions startPartitions,
       KafkaPartitions endPartitions,
       DateTime minimumMessageTime,
@@ -2114,7 +2327,8 @@ private KafkaIndexTask createKafkaIndexTask(
         getDataSchema(dataSource),
         tuningConfig,
         new KafkaIOConfig(
-            sequenceName,
+            taskGroupId,
+            "sequenceName-" + taskGroupId,
             startPartitions,
             endPartitions,
             ImmutableMap.<String, String>of(),
@@ -2123,7 +2337,7 @@ private KafkaIndexTask createKafkaIndexTask(
             maximumMessageTime,
             false
         ),
-        ImmutableMap.<String, Object>of(),
+        Collections.emptyMap(),
         null,
         null
     );
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java b/indexing-service/src/main/java/io/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java
index aca4a1099ae..083ef9e5355 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java
@@ -21,6 +21,7 @@
 
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.base.Preconditions;
 import io.druid.indexing.common.task.Task;
 import io.druid.indexing.overlord.DataSourceMetadata;
 
@@ -29,21 +30,21 @@
 public class CheckPointDataSourceMetadataAction implements TaskAction<Boolean>
 {
   private final String supervisorId;
-  private final String sequenceName;
+  private final int taskGroupId;
   private final DataSourceMetadata previousCheckPoint;
   private final DataSourceMetadata currentCheckPoint;
 
   public CheckPointDataSourceMetadataAction(
       @JsonProperty("supervisorId") String supervisorId,
-      @JsonProperty("sequenceName") String sequenceName,
+      @JsonProperty("taskGroupId") Integer taskGroupId,
       @JsonProperty("previousCheckPoint") DataSourceMetadata previousCheckPoint,
       @JsonProperty("currentCheckPoint") DataSourceMetadata currentCheckPoint
   )
   {
-    this.supervisorId = supervisorId;
-    this.sequenceName = sequenceName;
-    this.previousCheckPoint = previousCheckPoint;
-    this.currentCheckPoint = currentCheckPoint;
+    this.supervisorId = Preconditions.checkNotNull(supervisorId, "supervisorId");
+    this.taskGroupId = Preconditions.checkNotNull(taskGroupId, "taskGroupId");
+    this.previousCheckPoint = Preconditions.checkNotNull(previousCheckPoint, "previousCheckPoint");
+    this.currentCheckPoint = Preconditions.checkNotNull(currentCheckPoint, "currentCheckPoint");
   }
 
   @JsonProperty
@@ -53,9 +54,9 @@ public String getSupervisorId()
   }
 
   @JsonProperty
-  public String getSequenceName()
+  public int getTaskGroupId()
   {
-    return sequenceName;
+    return taskGroupId;
   }
 
   @JsonProperty
@@ -83,8 +84,12 @@ public Boolean perform(
       Task task, TaskActionToolbox toolbox
   ) throws IOException
   {
-    return toolbox.getSupervisorManager()
-                  .checkPointDataSourceMetadata(supervisorId, sequenceName, previousCheckPoint, currentCheckPoint);
+    return toolbox.getSupervisorManager().checkPointDataSourceMetadata(
+        supervisorId,
+        taskGroupId,
+        previousCheckPoint,
+        currentCheckPoint
+    );
   }
 
   @Override
@@ -98,7 +103,7 @@ public String toString()
   {
     return "CheckPointDataSourceMetadataAction{" +
            "supervisorId='" + supervisorId + '\'' +
-           ", sequenceName='" + sequenceName + '\'' +
+           ", taskGroupId='" + taskGroupId + '\'' +
            ", previousCheckPoint=" + previousCheckPoint +
            ", currentCheckPoint=" + currentCheckPoint +
            '}';
diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java b/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java
index 00f2c92c4b7..31e54cfd521 100644
--- a/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java
+++ b/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java
@@ -159,9 +159,9 @@ public boolean resetSupervisor(String id, @Nullable DataSourceMetadata dataSourc
 
   public boolean checkPointDataSourceMetadata(
       String supervisorId,
-      @Nullable String sequenceName,
-      @Nullable DataSourceMetadata previousDataSourceMetadata,
-      @Nullable DataSourceMetadata currentDataSourceMetadata
+      int taskGroupId,
+      DataSourceMetadata previousDataSourceMetadata,
+      DataSourceMetadata currentDataSourceMetadata
   )
   {
     try {
@@ -172,7 +172,7 @@ public boolean checkPointDataSourceMetadata(
 
       Preconditions.checkNotNull(supervisor, "supervisor could not be found");
 
-      supervisor.lhs.checkpoint(sequenceName, previousDataSourceMetadata, currentDataSourceMetadata);
+      supervisor.lhs.checkpoint(taskGroupId, previousDataSourceMetadata, currentDataSourceMetadata);
       return true;
     }
     catch (Exception e) {
diff --git a/java-util/src/main/java/io/druid/java/util/emitter/EmittingLogger.java b/java-util/src/main/java/io/druid/java/util/emitter/EmittingLogger.java
index 55c679d4991..4d9404bb34c 100644
--- a/java-util/src/main/java/io/druid/java/util/emitter/EmittingLogger.java
+++ b/java-util/src/main/java/io/druid/java/util/emitter/EmittingLogger.java
@@ -35,6 +35,10 @@
  */
 public class EmittingLogger extends Logger
 {
+  public static final String EXCEPTION_TYPE_KEY = "exceptionType";
+  public static final String EXCEPTION_MESSAGE_KEY = "exceptionMessage";
+  public static final String EXCEPTION_STACK_TRACE_KEY = "exceptionStackTrace";
+
   private static volatile ServiceEmitter emitter = null;
 
   private final String className;
diff --git a/server/src/main/java/io/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java b/server/src/main/java/io/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java
index 380861d9fb8..0ba0701e82d 100644
--- a/server/src/main/java/io/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java
+++ b/server/src/main/java/io/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java
@@ -83,9 +83,9 @@ public void reset(DataSourceMetadata dataSourceMetadata) {}
 
       @Override
       public void checkpoint(
-          @Nullable String sequenceName,
-          @Nullable DataSourceMetadata previousCheckPoint,
-          @Nullable DataSourceMetadata currentCheckPoint
+          int taskGroupId,
+          DataSourceMetadata previousCheckPoint,
+          DataSourceMetadata currentCheckPoint
       )
       {
 
diff --git a/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java b/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java
index 681421b4800..58e73ff3df7 100644
--- a/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java
+++ b/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java
@@ -21,8 +21,6 @@
 
 import io.druid.indexing.overlord.DataSourceMetadata;
 
-import javax.annotation.Nullable;
-
 public interface Supervisor
 {
   void start();
@@ -45,13 +43,9 @@
    * for example - Kafka Supervisor uses this to merge and handoff segments containing at least the data
    * represented by {@param currentCheckpoint} DataSourceMetadata
    *
-   * @param sequenceName       unique Identifier to figure out for which sequence to do checkpointing
+   * @param taskGroupId        unique Identifier to figure out for which sequence to do checkpointing
    * @param previousCheckPoint DataSourceMetadata checkpointed in previous call
    * @param currentCheckPoint  current DataSourceMetadata to be checkpointed
    */
-  void checkpoint(
-      @Nullable String sequenceName,
-      @Nullable DataSourceMetadata previousCheckPoint,
-      @Nullable DataSourceMetadata currentCheckPoint
-  );
+  void checkpoint(int taskGroupId, DataSourceMetadata previousCheckPoint, DataSourceMetadata currentCheckPoint);
 }
diff --git a/server/src/test/java/io/druid/server/metrics/ExceptionCapturingServiceEmitter.java b/server/src/test/java/io/druid/server/metrics/ExceptionCapturingServiceEmitter.java
new file mode 100644
index 00000000000..a145046b416
--- /dev/null
+++ b/server/src/test/java/io/druid/server/metrics/ExceptionCapturingServiceEmitter.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.server.metrics;
+
+import io.druid.java.util.emitter.EmittingLogger;
+import io.druid.java.util.emitter.core.Event;
+import io.druid.java.util.emitter.service.ServiceEmitter;
+
+import javax.annotation.Nullable;
+import java.util.Map;
+
+public class ExceptionCapturingServiceEmitter extends ServiceEmitter
+{
+  private volatile Class exceptionClass;
+  private volatile String exceptionMessage;
+  private volatile String stackTrace;
+
+  public ExceptionCapturingServiceEmitter()
+  {
+    super("", "", null);
+  }
+
+  @Override
+  public void emit(Event event)
+  {
+    //noinspection unchecked
+    final Map<String, Object> dataMap = (Map<String, Object>) event.toMap().get("data");
+    final Class exceptionClass = (Class) dataMap.get(EmittingLogger.EXCEPTION_TYPE_KEY);
+    if (exceptionClass != null) {
+      final String exceptionMessage = (String) dataMap.get(EmittingLogger.EXCEPTION_MESSAGE_KEY);
+      final String stackTrace = (String) dataMap.get(EmittingLogger.EXCEPTION_STACK_TRACE_KEY);
+      this.exceptionClass = exceptionClass;
+      this.exceptionMessage = exceptionMessage;
+      this.stackTrace = stackTrace;
+    }
+  }
+
+  @Nullable
+  public Class getExceptionClass()
+  {
+    return exceptionClass;
+  }
+
+  @Nullable
+  public String getExceptionMessage()
+  {
+    return exceptionMessage;
+  }
+
+  @Nullable
+  public String getStackTrace()
+  {
+    return stackTrace;
+  }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@xxxxxxxxxxxxxxxx


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@xxxxxxxxxxxxxxxx
For additional commands, e-mail: dev-help@xxxxxxxxxxxxxxxx