osdir.com

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[incubator-druid] branch master updated: Coordinator fix balancer stuck (#5987)


This is an automated email from the ASF dual-hosted git repository.

jonwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 31c2179  Coordinator fix balancer stuck (#5987)
31c2179 is described below

commit 31c2179fe1daa564e918f4ba64937743a45132a1
Author: Clint Wylie <cjwylie@xxxxxxxxx>
AuthorDate: Wed Jul 11 20:19:11 2018 -0700

    Coordinator fix balancer stuck (#5987)
    
    * this will fix it
    
    * filter destinations to not consider servers already serving segment
    
    * fix it
    
    * cleanup
    
    * fix opposite day in ImmutableDruidServer.equals
    
    * simplify
---
 .../java/io/druid/client/ImmutableDruidServer.java |  6 +--
 .../helper/DruidCoordinatorBalancer.java           | 56 ++++++++++++++++------
 2 files changed, 43 insertions(+), 19 deletions(-)

diff --git a/server/src/main/java/io/druid/client/ImmutableDruidServer.java b/server/src/main/java/io/druid/client/ImmutableDruidServer.java
index ef4fc66..22d5e71 100644
--- a/server/src/main/java/io/druid/client/ImmutableDruidServer.java
+++ b/server/src/main/java/io/druid/client/ImmutableDruidServer.java
@@ -142,11 +142,7 @@ public class ImmutableDruidServer
 
     ImmutableDruidServer that = (ImmutableDruidServer) o;
 
-    if (metadata.equals(that.metadata)) {
-      return false;
-    }
-
-    return true;
+    return metadata.equals(that.metadata);
   }
 
   @Override
diff --git a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java
index 23f8dcb..8547954 100644
--- a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java
+++ b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java
@@ -93,9 +93,11 @@ public class DruidCoordinatorBalancer implements DruidCoordinatorHelper
       CoordinatorStats stats
   )
   {
-    final BalancerStrategy strategy = params.getBalancerStrategy();
-    final int maxSegmentsToMove = params.getCoordinatorDynamicConfig().getMaxSegmentsToMove();
 
+    if (params.getAvailableSegments().size() == 0) {
+      log.info("Metadata segments are not available. Cannot balance.");
+      return;
+    }
     currentlyMovingSegments.computeIfAbsent(tier, t -> new ConcurrentHashMap<>());
 
     if (!currentlyMovingSegments.get(tier).isEmpty()) {
@@ -117,33 +119,59 @@ public class DruidCoordinatorBalancer implements DruidCoordinatorHelper
       numSegments += sourceHolder.getServer().getSegments().size();
     }
 
+
     if (numSegments == 0) {
       log.info("No segments found.  Cannot balance.");
       return;
     }
 
+    final BalancerStrategy strategy = params.getBalancerStrategy();
+    final int maxSegmentsToMove = Math.min(params.getCoordinatorDynamicConfig().getMaxSegmentsToMove(), numSegments);
+    final int maxIterations = 2 * maxSegmentsToMove;
     final int maxToLoad = params.getCoordinatorDynamicConfig().getMaxSegmentsInNodeLoadingQueue();
     long unmoved = 0L;
-    for (int moved = 0; (moved + unmoved) < maxSegmentsToMove;) {
-      final BalancerSegmentHolder segmentToMove = strategy.pickSegmentToMove(toMoveFrom);
 
-      if (segmentToMove != null && params.getAvailableSegments().contains(segmentToMove.getSegment())) {
-        final List<ServerHolder> toMoveToWithLoadQueueCapacity =
+    for (int moved = 0, iter = 0; (moved + unmoved) < maxSegmentsToMove; ++iter) {
+      final BalancerSegmentHolder segmentToMoveHolder = strategy.pickSegmentToMove(toMoveFrom);
+
+      if (segmentToMoveHolder != null && params.getAvailableSegments().contains(segmentToMoveHolder.getSegment())) {
+        final DataSegment segmentToMove = segmentToMoveHolder.getSegment();
+        final ImmutableDruidServer fromServer = segmentToMoveHolder.getFromServer();
+        // we want to leave the server the segment is currently on in the list...
+        // but filter out replicas that are already serving the segment, and servers with a full load queue
+        final List<ServerHolder> toMoveToWithLoadQueueCapacityAndNotServingSegment =
             toMoveTo.stream()
-                    .filter(s -> maxToLoad <= 0 || s.getNumberOfSegmentsInQueue() < maxToLoad)
+                    .filter(s -> s.getServer().equals(fromServer) ||
+                                 (!s.isServingSegment(segmentToMove) &&
+                                  (maxToLoad <= 0 || s.getNumberOfSegmentsInQueue() < maxToLoad)))
                     .collect(Collectors.toList());
 
-        final ServerHolder destinationHolder =
-            strategy.findNewSegmentHomeBalancer(segmentToMove.getSegment(), toMoveToWithLoadQueueCapacity);
-
-        if (destinationHolder != null) {
-          moveSegment(segmentToMove, destinationHolder.getServer(), params);
-          moved++;
+        if (toMoveToWithLoadQueueCapacityAndNotServingSegment.size() > 0) {
+          final ServerHolder destinationHolder =
+              strategy.findNewSegmentHomeBalancer(segmentToMove, toMoveToWithLoadQueueCapacityAndNotServingSegment);
+
+          if (destinationHolder != null && !destinationHolder.getServer().equals(fromServer)) {
+            moveSegment(segmentToMoveHolder, destinationHolder.getServer(), params);
+            moved++;
+          } else {
+            log.info("Segment [%s] is 'optimally' placed.", segmentToMove.getIdentifier());
+            unmoved++;
+          }
         } else {
-          log.info("Segment [%s] is 'optimally' placed.", segmentToMove.getSegment().getIdentifier());
+          log.info(
+              "No valid movement destinations for segment [%s].",
+              segmentToMove.getIdentifier()
+          );
           unmoved++;
         }
       }
+      if (iter >= maxIterations) {
+        log.info(
+            "Unable to select %d remaining candidate segments out of %d total to balance after %d iterations, ending run.",
+            (maxSegmentsToMove - moved - unmoved), maxSegmentsToMove, iter
+        );
+        break;
+      }
     }
 
     if (unmoved == maxSegmentsToMove) {


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@xxxxxxxxxxxxxxxx
For additional commands, e-mail: dev-help@xxxxxxxxxxxxxxxx