[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GitHub] jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle


jihoonson commented on a change in pull request #5492: Native parallel batch indexing without shuffle
URL: https://github.com/apache/incubator-druid/pull/5492#discussion_r201891571
 
 

 ##########
 File path: extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTaskClient.java
 ##########
 @@ -21,80 +21,32 @@
 
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
+import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import io.druid.indexer.TaskLocation;
-import io.druid.indexer.TaskStatus;
-import io.druid.indexing.common.RetryPolicy;
-import io.druid.indexing.common.RetryPolicyConfig;
-import io.druid.indexing.common.RetryPolicyFactory;
+import io.druid.indexing.common.IndexTaskClient;
 import io.druid.indexing.common.TaskInfoProvider;
-import io.druid.java.util.common.IAE;
-import io.druid.java.util.common.IOE;
 import io.druid.java.util.common.ISE;
 import io.druid.java.util.common.StringUtils;
-import io.druid.java.util.common.concurrent.Execs;
 import io.druid.java.util.common.jackson.JacksonUtils;
 import io.druid.java.util.emitter.EmittingLogger;
 import io.druid.java.util.http.client.HttpClient;
-import io.druid.java.util.http.client.Request;
-import io.druid.java.util.http.client.response.FullResponseHandler;
 import io.druid.java.util.http.client.response.FullResponseHolder;
-import io.druid.segment.realtime.firehose.ChatHandlerResource;
-import org.jboss.netty.channel.ChannelException;
 import org.jboss.netty.handler.codec.http.HttpMethod;
 import org.jboss.netty.handler.codec.http.HttpResponseStatus;
 import org.joda.time.DateTime;
 import org.joda.time.Duration;
-import org.joda.time.Period;
 
-import javax.ws.rs.core.MediaType;
+import javax.annotation.Nullable;
 import java.io.IOException;
-import java.net.Socket;
-import java.net.URI;
-import java.nio.charset.StandardCharsets;
 import java.util.Collections;
 import java.util.Map;
 import java.util.TreeMap;
-import java.util.concurrent.Callable;
 
-public class KafkaIndexTaskClient
+public class KafkaIndexTaskClient extends IndexTaskClient
 {
-  public static class NoTaskLocationException extends RuntimeException
-  {
-    public NoTaskLocationException(String message)
-    {
-      super(message);
-    }
-  }
-
-  public static class TaskNotRunnableException extends RuntimeException
-  {
-    public TaskNotRunnableException(String message)
-    {
-      super(message);
-    }
-  }
-
-  public static final int MAX_RETRY_WAIT_SECONDS = 10;
-
-  private static final int MIN_RETRY_WAIT_SECONDS = 2;
   private static final EmittingLogger log = new EmittingLogger(KafkaIndexTaskClient.class);
-  private static final String BASE_PATH = "/druid/worker/v1/chat";
-  private static final int TASK_MISMATCH_RETRY_DELAY_SECONDS = 5;
-  private static final TreeMap EMPTY_TREE_MAP = new TreeMap();
-
-  private final HttpClient httpClient;
-  private final ObjectMapper jsonMapper;
-  private final TaskInfoProvider taskInfoProvider;
-  private final Duration httpTimeout;
-  private final RetryPolicyFactory retryPolicyFactory;
-  private final ListeningExecutorService executorService;
-  private final long numRetries;
+  private static final TreeMap<Integer, Map<Integer, Long>> EMPTY_TREE_MAP = new TreeMap<>();
 
 Review comment:
   Well, we can wrap this with `Collections.unmodifiableSortedMap()`. However, this requires to change all types of `checkpointOffset`s in KafkaSupervisor which is currently `TreeMap`. I don't think it's trivial change.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@xxxxxxxxxxxxxxxx


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@xxxxxxxxxxxxxxxx
For additional commands, e-mail: dev-help@xxxxxxxxxxxxxxxx