1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 package org.apache.hadoop.hdfs.server.balancer; 19 20 import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed; 21 22 import java.io.BufferedInputStream; 23 import java.io.BufferedOutputStream; 24 import java.io.DataInputStream; 25 import java.io.DataOutputStream; 26 import java.io.IOException; 27 import java.io.InputStream; 28 import java.io.OutputStream; 29 import java.net.Socket; 30 import java.util.ArrayList; 31 import java.util.Arrays; 32 import java.util.Collection; 33 import java.util.EnumMap; 34 import java.util.HashMap; 35 import java.util.HashSet; 36 import java.util.Iterator; 37 import java.util.List; 38 import java.util.Map; 39 import java.util.Set; 40 import java.util.concurrent.ExecutionException; 41 import java.util.concurrent.ExecutorService; 42 import java.util.concurrent.Executors; 43 import java.util.concurrent.Future; 44 45 import org.apache.commons.logging.Log; 46 import org.apache.commons.logging.LogFactory; 47 import org.apache.hadoop.classification.InterfaceAudience; 48 import org.apache.hadoop.conf.Configuration; 49 import org.apache.hadoop.fs.CommonConfigurationKeys; 50 import org.apache.hadoop.fs.StorageType; 51 import org.apache.hadoop.hdfs.DFSConfigKeys; 52 import org.apache.hadoop.hdfs.DFSUtil; 53 import org.apache.hadoop.hdfs.DistributedFileSystem; 54 import org.apache.hadoop.hdfs.protocol.Block; 55 import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 56 import org.apache.hadoop.hdfs.protocol.ExtendedBlock; 57 import org.apache.hadoop.hdfs.protocol.HdfsConstants; 58 import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; 59 import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; 60 import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; 61 import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver; 62 import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil; 63 import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient; 64 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; 65 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; 66 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; 67 import org.apache.hadoop.hdfs.server.balancer.Dispatcher.DDatanode.StorageGroup; 68 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; 69 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; 70 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations; 71 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; 72 import org.apache.hadoop.io.IOUtils; 73 import org.apache.hadoop.net.NetUtils; 74 import org.apache.hadoop.net.NetworkTopology; 75 import org.apache.hadoop.security.token.Token; 76 import org.apache.hadoop.util.HostsFileReader; 77 import org.apache.hadoop.util.StringUtils; 78 import org.apache.hadoop.util.Time; 79 80 import com.google.common.annotations.VisibleForTesting; 81 import com.google.common.base.Preconditions; 82 83 /** Dispatching block replica moves between datanodes. */ 84 @InterfaceAudience.Private 85 public class Dispatcher { 86 static final Log LOG = LogFactory.getLog(Dispatcher.class); 87 88 private static final long GB = 1L << 30; // 1GB 89 private static final long MAX_BLOCKS_SIZE_TO_FETCH = 2 * GB; 90 91 private static final int MAX_NO_PENDING_MOVE_ITERATIONS = 5; 92 /** 93 * the period of time to delay the usage of a DataNode after hitting 94 * errors when using it for migrating data 95 */ 96 private static long delayAfterErrors = 10 * 1000; 97 98 private final NameNodeConnector nnc; 99 private final SaslDataTransferClient saslClient; 100 101 /** Set of datanodes to be excluded. */ 102 private final Set<String> excludedNodes; 103 /** Restrict to the following nodes. */ 104 private final Set<String> includedNodes; 105 106 private final Collection<Source> sources = new HashSet<Source>(); 107 private final Collection<StorageGroup> targets = new HashSet<StorageGroup>(); 108 109 private final GlobalBlockMap globalBlocks = new GlobalBlockMap(); 110 private final MovedBlocks<StorageGroup> movedBlocks; 111 112 /** Map (datanodeUuid,storageType -> StorageGroup) */ 113 private final StorageGroupMap<StorageGroup> storageGroupMap 114 = new StorageGroupMap<StorageGroup>(); 115 116 private NetworkTopology cluster; 117 118 private final ExecutorService moveExecutor; 119 private final ExecutorService dispatchExecutor; 120 121 /** The maximum number of concurrent blocks moves at a datanode */ 122 private final int maxConcurrentMovesPerNode; 123 124 private static class GlobalBlockMap { 125 private final Map<Block, DBlock> map = new HashMap<Block, DBlock>(); 126 127 /** 128 * Get the block from the map; 129 * if the block is not found, create a new block and put it in the map. 130 */ get(Block b)131 private DBlock get(Block b) { 132 DBlock block = map.get(b); 133 if (block == null) { 134 block = new DBlock(b); 135 map.put(b, block); 136 } 137 return block; 138 } 139 140 /** Remove all blocks except for the moved blocks. */ removeAllButRetain(MovedBlocks<StorageGroup> movedBlocks)141 private void removeAllButRetain(MovedBlocks<StorageGroup> movedBlocks) { 142 for (Iterator<Block> i = map.keySet().iterator(); i.hasNext();) { 143 if (!movedBlocks.contains(i.next())) { 144 i.remove(); 145 } 146 } 147 } 148 } 149 150 public static class StorageGroupMap<G extends StorageGroup> { toKey(String datanodeUuid, StorageType storageType)151 private static String toKey(String datanodeUuid, StorageType storageType) { 152 return datanodeUuid + ":" + storageType; 153 } 154 155 private final Map<String, G> map = new HashMap<String, G>(); 156 get(String datanodeUuid, StorageType storageType)157 public G get(String datanodeUuid, StorageType storageType) { 158 return map.get(toKey(datanodeUuid, storageType)); 159 } 160 put(G g)161 public void put(G g) { 162 final String key = toKey(g.getDatanodeInfo().getDatanodeUuid(), g.storageType); 163 final StorageGroup existing = map.put(key, g); 164 Preconditions.checkState(existing == null); 165 } 166 size()167 int size() { 168 return map.size(); 169 } 170 clear()171 void clear() { 172 map.clear(); 173 } 174 values()175 public Collection<G> values() { 176 return map.values(); 177 } 178 } 179 180 /** This class keeps track of a scheduled block move */ 181 public class PendingMove { 182 private DBlock block; 183 private Source source; 184 private DDatanode proxySource; 185 private StorageGroup target; 186 PendingMove(Source source, StorageGroup target)187 private PendingMove(Source source, StorageGroup target) { 188 this.source = source; 189 this.target = target; 190 } 191 192 @Override toString()193 public String toString() { 194 final Block b = block != null ? block.getBlock() : null; 195 String bStr = b != null ? (b + " with size=" + b.getNumBytes() + " ") 196 : " "; 197 return bStr + "from " + source.getDisplayName() + " to " + target 198 .getDisplayName() + " through " + (proxySource != null ? proxySource 199 .datanode : ""); 200 } 201 202 /** 203 * Choose a block & a proxy source for this pendingMove whose source & 204 * target have already been chosen. 205 * 206 * @return true if a block and its proxy are chosen; false otherwise 207 */ chooseBlockAndProxy()208 private boolean chooseBlockAndProxy() { 209 // source and target must have the same storage type 210 final StorageType t = source.getStorageType(); 211 // iterate all source's blocks until find a good one 212 for (Iterator<DBlock> i = source.getBlockIterator(); i.hasNext();) { 213 if (markMovedIfGoodBlock(i.next(), t)) { 214 i.remove(); 215 return true; 216 } 217 } 218 return false; 219 } 220 221 /** 222 * @return true if the given block is good for the tentative move. 223 */ markMovedIfGoodBlock(DBlock block, StorageType targetStorageType)224 private boolean markMovedIfGoodBlock(DBlock block, StorageType targetStorageType) { 225 synchronized (block) { 226 synchronized (movedBlocks) { 227 if (isGoodBlockCandidate(source, target, targetStorageType, block)) { 228 this.block = block; 229 if (chooseProxySource()) { 230 movedBlocks.put(block); 231 if (LOG.isDebugEnabled()) { 232 LOG.debug("Decided to move " + this); 233 } 234 return true; 235 } 236 } 237 } 238 } 239 return false; 240 } 241 242 /** 243 * Choose a proxy source. 244 * 245 * @return true if a proxy is found; otherwise false 246 */ chooseProxySource()247 private boolean chooseProxySource() { 248 final DatanodeInfo targetDN = target.getDatanodeInfo(); 249 // if source and target are same nodes then no need of proxy 250 if (source.getDatanodeInfo().equals(targetDN) && addTo(source)) { 251 return true; 252 } 253 // if node group is supported, first try add nodes in the same node group 254 if (cluster.isNodeGroupAware()) { 255 for (StorageGroup loc : block.getLocations()) { 256 if (cluster.isOnSameNodeGroup(loc.getDatanodeInfo(), targetDN) 257 && addTo(loc)) { 258 return true; 259 } 260 } 261 } 262 // check if there is replica which is on the same rack with the target 263 for (StorageGroup loc : block.getLocations()) { 264 if (cluster.isOnSameRack(loc.getDatanodeInfo(), targetDN) && addTo(loc)) { 265 return true; 266 } 267 } 268 // find out a non-busy replica 269 for (StorageGroup loc : block.getLocations()) { 270 if (addTo(loc)) { 271 return true; 272 } 273 } 274 return false; 275 } 276 277 /** add to a proxy source for specific block movement */ addTo(StorageGroup g)278 private boolean addTo(StorageGroup g) { 279 final DDatanode dn = g.getDDatanode(); 280 if (dn.addPendingBlock(this)) { 281 proxySource = dn; 282 return true; 283 } 284 return false; 285 } 286 287 /** Dispatch the move to the proxy source & wait for the response. */ dispatch()288 private void dispatch() { 289 if (LOG.isDebugEnabled()) { 290 LOG.debug("Start moving " + this); 291 } 292 293 Socket sock = new Socket(); 294 DataOutputStream out = null; 295 DataInputStream in = null; 296 try { 297 sock.connect( 298 NetUtils.createSocketAddr(target.getDatanodeInfo().getXferAddr()), 299 HdfsServerConstants.READ_TIMEOUT); 300 301 sock.setKeepAlive(true); 302 303 OutputStream unbufOut = sock.getOutputStream(); 304 InputStream unbufIn = sock.getInputStream(); 305 ExtendedBlock eb = new ExtendedBlock(nnc.getBlockpoolID(), 306 block.getBlock()); 307 final KeyManager km = nnc.getKeyManager(); 308 Token<BlockTokenIdentifier> accessToken = km.getAccessToken(eb); 309 IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut, 310 unbufIn, km, accessToken, target.getDatanodeInfo()); 311 unbufOut = saslStreams.out; 312 unbufIn = saslStreams.in; 313 out = new DataOutputStream(new BufferedOutputStream(unbufOut, 314 HdfsConstants.IO_FILE_BUFFER_SIZE)); 315 in = new DataInputStream(new BufferedInputStream(unbufIn, 316 HdfsConstants.IO_FILE_BUFFER_SIZE)); 317 318 sendRequest(out, eb, accessToken); 319 receiveResponse(in); 320 nnc.getBytesMoved().addAndGet(block.getNumBytes()); 321 LOG.info("Successfully moved " + this); 322 } catch (IOException e) { 323 LOG.warn("Failed to move " + this + ": " + e.getMessage()); 324 target.getDDatanode().setHasFailure(); 325 // Proxy or target may have some issues, delay before using these nodes 326 // further in order to avoid a potential storm of "threads quota 327 // exceeded" warnings when the dispatcher gets out of sync with work 328 // going on in datanodes. 329 proxySource.activateDelay(delayAfterErrors); 330 target.getDDatanode().activateDelay(delayAfterErrors); 331 } finally { 332 IOUtils.closeStream(out); 333 IOUtils.closeStream(in); 334 IOUtils.closeSocket(sock); 335 336 proxySource.removePendingBlock(this); 337 target.getDDatanode().removePendingBlock(this); 338 339 synchronized (this) { 340 reset(); 341 } 342 synchronized (Dispatcher.this) { 343 Dispatcher.this.notifyAll(); 344 } 345 } 346 } 347 348 /** Send a block replace request to the output stream */ sendRequest(DataOutputStream out, ExtendedBlock eb, Token<BlockTokenIdentifier> accessToken)349 private void sendRequest(DataOutputStream out, ExtendedBlock eb, 350 Token<BlockTokenIdentifier> accessToken) throws IOException { 351 new Sender(out).replaceBlock(eb, target.storageType, accessToken, 352 source.getDatanodeInfo().getDatanodeUuid(), proxySource.datanode); 353 } 354 355 /** Receive a block copy response from the input stream */ receiveResponse(DataInputStream in)356 private void receiveResponse(DataInputStream in) throws IOException { 357 BlockOpResponseProto response = 358 BlockOpResponseProto.parseFrom(vintPrefixed(in)); 359 while (response.getStatus() == Status.IN_PROGRESS) { 360 // read intermediate responses 361 response = BlockOpResponseProto.parseFrom(vintPrefixed(in)); 362 } 363 String logInfo = "block move is failed"; 364 DataTransferProtoUtil.checkBlockOpStatus(response, logInfo); 365 } 366 367 /** reset the object */ reset()368 private void reset() { 369 block = null; 370 source = null; 371 proxySource = null; 372 target = null; 373 } 374 } 375 376 /** A class for keeping track of block locations in the dispatcher. */ 377 public static class DBlock extends MovedBlocks.Locations<StorageGroup> { DBlock(Block block)378 public DBlock(Block block) { 379 super(block); 380 } 381 } 382 383 /** The class represents a desired move. */ 384 static class Task { 385 private final StorageGroup target; 386 private long size; // bytes scheduled to move 387 Task(StorageGroup target, long size)388 Task(StorageGroup target, long size) { 389 this.target = target; 390 this.size = size; 391 } 392 getSize()393 long getSize() { 394 return size; 395 } 396 } 397 398 /** A class that keeps track of a datanode. */ 399 public static class DDatanode { 400 401 /** A group of storages in a datanode with the same storage type. */ 402 public class StorageGroup { 403 final StorageType storageType; 404 final long maxSize2Move; 405 private long scheduledSize = 0L; 406 StorageGroup(StorageType storageType, long maxSize2Move)407 private StorageGroup(StorageType storageType, long maxSize2Move) { 408 this.storageType = storageType; 409 this.maxSize2Move = maxSize2Move; 410 } 411 getStorageType()412 public StorageType getStorageType() { 413 return storageType; 414 } 415 getDDatanode()416 private DDatanode getDDatanode() { 417 return DDatanode.this; 418 } 419 getDatanodeInfo()420 public DatanodeInfo getDatanodeInfo() { 421 return DDatanode.this.datanode; 422 } 423 424 /** Decide if still need to move more bytes */ hasSpaceForScheduling()425 boolean hasSpaceForScheduling() { 426 return hasSpaceForScheduling(0L); 427 } 428 hasSpaceForScheduling(long size)429 synchronized boolean hasSpaceForScheduling(long size) { 430 return availableSizeToMove() > size; 431 } 432 433 /** @return the total number of bytes that need to be moved */ availableSizeToMove()434 synchronized long availableSizeToMove() { 435 return maxSize2Move - scheduledSize; 436 } 437 438 /** increment scheduled size */ incScheduledSize(long size)439 public synchronized void incScheduledSize(long size) { 440 scheduledSize += size; 441 } 442 443 /** @return scheduled size */ getScheduledSize()444 synchronized long getScheduledSize() { 445 return scheduledSize; 446 } 447 448 /** Reset scheduled size to zero. */ resetScheduledSize()449 synchronized void resetScheduledSize() { 450 scheduledSize = 0L; 451 } 452 addPendingMove(DBlock block, final PendingMove pm)453 private PendingMove addPendingMove(DBlock block, final PendingMove pm) { 454 if (getDDatanode().addPendingBlock(pm)) { 455 if (pm.markMovedIfGoodBlock(block, getStorageType())) { 456 incScheduledSize(pm.block.getNumBytes()); 457 return pm; 458 } else { 459 getDDatanode().removePendingBlock(pm); 460 } 461 } 462 return null; 463 } 464 465 /** @return the name for display */ getDisplayName()466 String getDisplayName() { 467 return datanode + ":" + storageType; 468 } 469 470 @Override toString()471 public String toString() { 472 return getDisplayName(); 473 } 474 475 @Override hashCode()476 public int hashCode() { 477 return getStorageType().hashCode() ^ getDatanodeInfo().hashCode(); 478 } 479 480 @Override equals(Object obj)481 public boolean equals(Object obj) { 482 if (this == obj) { 483 return true; 484 } else if (obj == null || !(obj instanceof StorageGroup)) { 485 return false; 486 } else { 487 final StorageGroup that = (StorageGroup) obj; 488 return this.getStorageType() == that.getStorageType() 489 && this.getDatanodeInfo().equals(that.getDatanodeInfo()); 490 } 491 } 492 493 } 494 495 final DatanodeInfo datanode; 496 private final EnumMap<StorageType, Source> sourceMap 497 = new EnumMap<StorageType, Source>(StorageType.class); 498 private final EnumMap<StorageType, StorageGroup> targetMap 499 = new EnumMap<StorageType, StorageGroup>(StorageType.class); 500 protected long delayUntil = 0L; 501 /** blocks being moved but not confirmed yet */ 502 private final List<PendingMove> pendings; 503 private volatile boolean hasFailure = false; 504 private final int maxConcurrentMoves; 505 506 @Override toString()507 public String toString() { 508 return getClass().getSimpleName() + ":" + datanode; 509 } 510 DDatanode(DatanodeInfo datanode, int maxConcurrentMoves)511 private DDatanode(DatanodeInfo datanode, int maxConcurrentMoves) { 512 this.datanode = datanode; 513 this.maxConcurrentMoves = maxConcurrentMoves; 514 this.pendings = new ArrayList<PendingMove>(maxConcurrentMoves); 515 } 516 getDatanodeInfo()517 public DatanodeInfo getDatanodeInfo() { 518 return datanode; 519 } 520 put(StorageType storageType, G g, EnumMap<StorageType, G> map)521 private static <G extends StorageGroup> void put(StorageType storageType, 522 G g, EnumMap<StorageType, G> map) { 523 final StorageGroup existing = map.put(storageType, g); 524 Preconditions.checkState(existing == null); 525 } 526 addTarget(StorageType storageType, long maxSize2Move)527 public StorageGroup addTarget(StorageType storageType, long maxSize2Move) { 528 final StorageGroup g = new StorageGroup(storageType, maxSize2Move); 529 put(storageType, g, targetMap); 530 return g; 531 } 532 addSource(StorageType storageType, long maxSize2Move, Dispatcher d)533 public Source addSource(StorageType storageType, long maxSize2Move, Dispatcher d) { 534 final Source s = d.new Source(storageType, maxSize2Move, this); 535 put(storageType, s, sourceMap); 536 return s; 537 } 538 activateDelay(long delta)539 synchronized private void activateDelay(long delta) { 540 delayUntil = Time.monotonicNow() + delta; 541 } 542 isDelayActive()543 synchronized private boolean isDelayActive() { 544 if (delayUntil == 0 || Time.monotonicNow() > delayUntil) { 545 delayUntil = 0; 546 return false; 547 } 548 return true; 549 } 550 551 /** Check if the node can schedule more blocks to move */ isPendingQNotFull()552 synchronized boolean isPendingQNotFull() { 553 return pendings.size() < maxConcurrentMoves; 554 } 555 556 /** Check if all the dispatched moves are done */ isPendingQEmpty()557 synchronized boolean isPendingQEmpty() { 558 return pendings.isEmpty(); 559 } 560 561 /** Add a scheduled block move to the node */ addPendingBlock(PendingMove pendingBlock)562 synchronized boolean addPendingBlock(PendingMove pendingBlock) { 563 if (!isDelayActive() && isPendingQNotFull()) { 564 return pendings.add(pendingBlock); 565 } 566 return false; 567 } 568 569 /** Remove a scheduled block move from the node */ removePendingBlock(PendingMove pendingBlock)570 synchronized boolean removePendingBlock(PendingMove pendingBlock) { 571 return pendings.remove(pendingBlock); 572 } 573 setHasFailure()574 void setHasFailure() { 575 this.hasFailure = true; 576 } 577 } 578 579 /** A node that can be the sources of a block move */ 580 public class Source extends DDatanode.StorageGroup { 581 582 private final List<Task> tasks = new ArrayList<Task>(2); 583 private long blocksToReceive = 0L; 584 /** 585 * Source blocks point to the objects in {@link Dispatcher#globalBlocks} 586 * because we want to keep one copy of a block and be aware that the 587 * locations are changing over time. 588 */ 589 private final List<DBlock> srcBlocks = new ArrayList<DBlock>(); 590 Source(StorageType storageType, long maxSize2Move, DDatanode dn)591 private Source(StorageType storageType, long maxSize2Move, DDatanode dn) { 592 dn.super(storageType, maxSize2Move); 593 } 594 595 /** Add a task */ addTask(Task task)596 void addTask(Task task) { 597 Preconditions.checkState(task.target != this, 598 "Source and target are the same storage group " + getDisplayName()); 599 incScheduledSize(task.size); 600 tasks.add(task); 601 } 602 603 /** @return an iterator to this source's blocks */ getBlockIterator()604 Iterator<DBlock> getBlockIterator() { 605 return srcBlocks.iterator(); 606 } 607 608 /** 609 * Fetch new blocks of this source from namenode and update this source's 610 * block list & {@link Dispatcher#globalBlocks}. 611 * 612 * @return the total size of the received blocks in the number of bytes. 613 */ getBlockList()614 private long getBlockList() throws IOException { 615 final long size = Math.min(MAX_BLOCKS_SIZE_TO_FETCH, blocksToReceive); 616 final BlocksWithLocations newBlocks = nnc.getBlocks(getDatanodeInfo(), size); 617 618 long bytesReceived = 0; 619 for (BlockWithLocations blk : newBlocks.getBlocks()) { 620 bytesReceived += blk.getBlock().getNumBytes(); 621 synchronized (globalBlocks) { 622 final DBlock block = globalBlocks.get(blk.getBlock()); 623 synchronized (block) { 624 block.clearLocations(); 625 626 // update locations 627 final String[] datanodeUuids = blk.getDatanodeUuids(); 628 final StorageType[] storageTypes = blk.getStorageTypes(); 629 for (int i = 0; i < datanodeUuids.length; i++) { 630 final StorageGroup g = storageGroupMap.get( 631 datanodeUuids[i], storageTypes[i]); 632 if (g != null) { // not unknown 633 block.addLocation(g); 634 } 635 } 636 } 637 if (!srcBlocks.contains(block) && isGoodBlockCandidate(block)) { 638 // filter bad candidates 639 srcBlocks.add(block); 640 } 641 } 642 } 643 return bytesReceived; 644 } 645 646 /** Decide if the given block is a good candidate to move or not */ isGoodBlockCandidate(DBlock block)647 private boolean isGoodBlockCandidate(DBlock block) { 648 // source and target must have the same storage type 649 final StorageType sourceStorageType = getStorageType(); 650 for (Task t : tasks) { 651 if (Dispatcher.this.isGoodBlockCandidate(this, t.target, 652 sourceStorageType, block)) { 653 return true; 654 } 655 } 656 return false; 657 } 658 659 /** 660 * Choose a move for the source. The block's source, target, and proxy 661 * are determined too. When choosing proxy and target, source & 662 * target throttling has been considered. They are chosen only when they 663 * have the capacity to support this block move. The block should be 664 * dispatched immediately after this method is returned. 665 * 666 * @return a move that's good for the source to dispatch immediately. 667 */ chooseNextMove()668 private PendingMove chooseNextMove() { 669 for (Iterator<Task> i = tasks.iterator(); i.hasNext();) { 670 final Task task = i.next(); 671 final DDatanode target = task.target.getDDatanode(); 672 final PendingMove pendingBlock = new PendingMove(this, task.target); 673 if (target.addPendingBlock(pendingBlock)) { 674 // target is not busy, so do a tentative block allocation 675 if (pendingBlock.chooseBlockAndProxy()) { 676 long blockSize = pendingBlock.block.getNumBytes(); 677 incScheduledSize(-blockSize); 678 task.size -= blockSize; 679 if (task.size == 0) { 680 i.remove(); 681 } 682 return pendingBlock; 683 } else { 684 // cancel the tentative move 685 target.removePendingBlock(pendingBlock); 686 } 687 } 688 } 689 return null; 690 } 691 692 /** Add a pending move */ addPendingMove(DBlock block, StorageGroup target)693 public PendingMove addPendingMove(DBlock block, StorageGroup target) { 694 return target.addPendingMove(block, new PendingMove(this, target)); 695 } 696 697 /** Iterate all source's blocks to remove moved ones */ removeMovedBlocks()698 private void removeMovedBlocks() { 699 for (Iterator<DBlock> i = getBlockIterator(); i.hasNext();) { 700 if (movedBlocks.contains(i.next().getBlock())) { 701 i.remove(); 702 } 703 } 704 } 705 706 private static final int SOURCE_BLOCKS_MIN_SIZE = 5; 707 708 /** @return if should fetch more blocks from namenode */ shouldFetchMoreBlocks()709 private boolean shouldFetchMoreBlocks() { 710 return srcBlocks.size() < SOURCE_BLOCKS_MIN_SIZE && blocksToReceive > 0; 711 } 712 713 private static final long MAX_ITERATION_TIME = 20 * 60 * 1000L; // 20 mins 714 715 /** 716 * This method iteratively does the following: it first selects a block to 717 * move, then sends a request to the proxy source to start the block move 718 * when the source's block list falls below a threshold, it asks the 719 * namenode for more blocks. It terminates when it has dispatch enough block 720 * move tasks or it has received enough blocks from the namenode, or the 721 * elapsed time of the iteration has exceeded the max time limit. 722 */ dispatchBlocks()723 private void dispatchBlocks() { 724 final long startTime = Time.monotonicNow(); 725 this.blocksToReceive = 2 * getScheduledSize(); 726 boolean isTimeUp = false; 727 int noPendingMoveIteration = 0; 728 while (!isTimeUp && getScheduledSize() > 0 729 && (!srcBlocks.isEmpty() || blocksToReceive > 0)) { 730 final PendingMove p = chooseNextMove(); 731 if (p != null) { 732 // Reset no pending move counter 733 noPendingMoveIteration=0; 734 executePendingMove(p); 735 continue; 736 } 737 738 // Since we cannot schedule any block to move, 739 // remove any moved blocks from the source block list and 740 removeMovedBlocks(); // filter already moved blocks 741 // check if we should fetch more blocks from the namenode 742 if (shouldFetchMoreBlocks()) { 743 // fetch new blocks 744 try { 745 blocksToReceive -= getBlockList(); 746 continue; 747 } catch (IOException e) { 748 LOG.warn("Exception while getting block list", e); 749 return; 750 } 751 } else { 752 // source node cannot find a pending block to move, iteration +1 753 noPendingMoveIteration++; 754 // in case no blocks can be moved for source node's task, 755 // jump out of while-loop after 5 iterations. 756 if (noPendingMoveIteration >= MAX_NO_PENDING_MOVE_ITERATIONS) { 757 resetScheduledSize(); 758 } 759 } 760 761 // check if time is up or not 762 if (Time.monotonicNow() - startTime > MAX_ITERATION_TIME) { 763 isTimeUp = true; 764 continue; 765 } 766 767 // Now we can not schedule any block to move and there are 768 // no new blocks added to the source block list, so we wait. 769 try { 770 synchronized (Dispatcher.this) { 771 Dispatcher.this.wait(1000); // wait for targets/sources to be idle 772 } 773 } catch (InterruptedException ignored) { 774 } 775 } 776 } 777 778 @Override hashCode()779 public int hashCode() { 780 return super.hashCode(); 781 } 782 783 @Override equals(Object obj)784 public boolean equals(Object obj) { 785 return super.equals(obj); 786 } 787 } 788 Dispatcher(NameNodeConnector nnc, Set<String> includedNodes, Set<String> excludedNodes, long movedWinWidth, int moverThreads, int dispatcherThreads, int maxConcurrentMovesPerNode, Configuration conf)789 public Dispatcher(NameNodeConnector nnc, Set<String> includedNodes, 790 Set<String> excludedNodes, long movedWinWidth, int moverThreads, 791 int dispatcherThreads, int maxConcurrentMovesPerNode, Configuration conf) { 792 this.nnc = nnc; 793 this.excludedNodes = excludedNodes; 794 this.includedNodes = includedNodes; 795 this.movedBlocks = new MovedBlocks<StorageGroup>(movedWinWidth); 796 797 this.cluster = NetworkTopology.getInstance(conf); 798 799 this.moveExecutor = Executors.newFixedThreadPool(moverThreads); 800 this.dispatchExecutor = dispatcherThreads == 0? null 801 : Executors.newFixedThreadPool(dispatcherThreads); 802 this.maxConcurrentMovesPerNode = maxConcurrentMovesPerNode; 803 804 this.saslClient = new SaslDataTransferClient(conf, 805 DataTransferSaslUtil.getSaslPropertiesResolver(conf), 806 TrustedChannelResolver.getInstance(conf), nnc.fallbackToSimpleAuth); 807 } 808 getDistributedFileSystem()809 public DistributedFileSystem getDistributedFileSystem() { 810 return nnc.getDistributedFileSystem(); 811 } 812 getStorageGroupMap()813 public StorageGroupMap<StorageGroup> getStorageGroupMap() { 814 return storageGroupMap; 815 } 816 getCluster()817 public NetworkTopology getCluster() { 818 return cluster; 819 } 820 getBytesMoved()821 long getBytesMoved() { 822 return nnc.getBytesMoved().get(); 823 } 824 bytesToMove()825 long bytesToMove() { 826 Preconditions.checkState( 827 storageGroupMap.size() >= sources.size() + targets.size(), 828 "Mismatched number of storage groups (" + storageGroupMap.size() 829 + " < " + sources.size() + " sources + " + targets.size() 830 + " targets)"); 831 832 long b = 0L; 833 for (Source src : sources) { 834 b += src.getScheduledSize(); 835 } 836 return b; 837 } 838 add(Source source, StorageGroup target)839 void add(Source source, StorageGroup target) { 840 sources.add(source); 841 targets.add(target); 842 } 843 shouldIgnore(DatanodeInfo dn)844 private boolean shouldIgnore(DatanodeInfo dn) { 845 // ignore decommissioned nodes 846 final boolean decommissioned = dn.isDecommissioned(); 847 // ignore decommissioning nodes 848 final boolean decommissioning = dn.isDecommissionInProgress(); 849 // ignore nodes in exclude list 850 final boolean excluded = Util.isExcluded(excludedNodes, dn); 851 // ignore nodes not in the include list (if include list is not empty) 852 final boolean notIncluded = !Util.isIncluded(includedNodes, dn); 853 854 if (decommissioned || decommissioning || excluded || notIncluded) { 855 if (LOG.isTraceEnabled()) { 856 LOG.trace("Excluding datanode " + dn + ": " + decommissioned + ", " 857 + decommissioning + ", " + excluded + ", " + notIncluded); 858 } 859 return true; 860 } 861 return false; 862 } 863 864 /** Get live datanode storage reports and then build the network topology. */ init()865 public List<DatanodeStorageReport> init() throws IOException { 866 final DatanodeStorageReport[] reports = nnc.getLiveDatanodeStorageReport(); 867 final List<DatanodeStorageReport> trimmed = new ArrayList<DatanodeStorageReport>(); 868 // create network topology and classify utilization collections: 869 // over-utilized, above-average, below-average and under-utilized. 870 for (DatanodeStorageReport r : DFSUtil.shuffle(reports)) { 871 final DatanodeInfo datanode = r.getDatanodeInfo(); 872 if (shouldIgnore(datanode)) { 873 continue; 874 } 875 trimmed.add(r); 876 cluster.add(datanode); 877 } 878 return trimmed; 879 } 880 newDatanode(DatanodeInfo datanode)881 public DDatanode newDatanode(DatanodeInfo datanode) { 882 return new DDatanode(datanode, maxConcurrentMovesPerNode); 883 } 884 executePendingMove(final PendingMove p)885 public void executePendingMove(final PendingMove p) { 886 // move the block 887 moveExecutor.execute(new Runnable() { 888 @Override 889 public void run() { 890 p.dispatch(); 891 } 892 }); 893 } 894 dispatchAndCheckContinue()895 public boolean dispatchAndCheckContinue() throws InterruptedException { 896 return nnc.shouldContinue(dispatchBlockMoves()); 897 } 898 899 /** 900 * Dispatch block moves for each source. The thread selects blocks to move & 901 * sends request to proxy source to initiate block move. The process is flow 902 * controlled. Block selection is blocked if there are too many un-confirmed 903 * block moves. 904 * 905 * @return the total number of bytes successfully moved in this iteration. 906 */ dispatchBlockMoves()907 private long dispatchBlockMoves() throws InterruptedException { 908 final long bytesLastMoved = getBytesMoved(); 909 final Future<?>[] futures = new Future<?>[sources.size()]; 910 911 final Iterator<Source> i = sources.iterator(); 912 for (int j = 0; j < futures.length; j++) { 913 final Source s = i.next(); 914 futures[j] = dispatchExecutor.submit(new Runnable() { 915 @Override 916 public void run() { 917 s.dispatchBlocks(); 918 } 919 }); 920 } 921 922 // wait for all dispatcher threads to finish 923 for (Future<?> future : futures) { 924 try { 925 future.get(); 926 } catch (ExecutionException e) { 927 LOG.warn("Dispatcher thread failed", e.getCause()); 928 } 929 } 930 931 // wait for all block moving to be done 932 waitForMoveCompletion(targets); 933 934 return getBytesMoved() - bytesLastMoved; 935 } 936 937 /** The sleeping period before checking if block move is completed again */ 938 static private long blockMoveWaitTime = 30000L; 939 940 /** 941 * Wait for all block move confirmations. 942 * @return true if there is failed move execution 943 */ waitForMoveCompletion( Iterable<? extends StorageGroup> targets)944 public static boolean waitForMoveCompletion( 945 Iterable<? extends StorageGroup> targets) { 946 boolean hasFailure = false; 947 for(;;) { 948 boolean empty = true; 949 for (StorageGroup t : targets) { 950 if (!t.getDDatanode().isPendingQEmpty()) { 951 empty = false; 952 break; 953 } else { 954 hasFailure |= t.getDDatanode().hasFailure; 955 } 956 } 957 if (empty) { 958 return hasFailure; // all pending queues are empty 959 } 960 try { 961 Thread.sleep(blockMoveWaitTime); 962 } catch (InterruptedException ignored) { 963 } 964 } 965 } 966 967 /** 968 * Decide if the block is a good candidate to be moved from source to target. 969 * A block is a good candidate if 970 * 1. the block is not in the process of being moved/has not been moved; 971 * 2. the block does not have a replica on the target; 972 * 3. doing the move does not reduce the number of racks that the block has 973 */ isGoodBlockCandidate(StorageGroup source, StorageGroup target, StorageType targetStorageType, DBlock block)974 private boolean isGoodBlockCandidate(StorageGroup source, StorageGroup target, 975 StorageType targetStorageType, DBlock block) { 976 if (source.equals(target)) { 977 return false; 978 } 979 if (target.storageType != targetStorageType) { 980 return false; 981 } 982 // check if the block is moved or not 983 if (movedBlocks.contains(block.getBlock())) { 984 return false; 985 } 986 final DatanodeInfo targetDatanode = target.getDatanodeInfo(); 987 if (source.getDatanodeInfo().equals(targetDatanode)) { 988 // the block is moved inside same DN 989 return true; 990 } 991 992 // check if block has replica in target node 993 for (StorageGroup blockLocation : block.getLocations()) { 994 if (blockLocation.getDatanodeInfo().equals(targetDatanode)) { 995 return false; 996 } 997 } 998 999 if (cluster.isNodeGroupAware() 1000 && isOnSameNodeGroupWithReplicas(source, target, block)) { 1001 return false; 1002 } 1003 if (reduceNumOfRacks(source, target, block)) { 1004 return false; 1005 } 1006 return true; 1007 } 1008 1009 /** 1010 * Determine whether moving the given block replica from source to target 1011 * would reduce the number of racks of the block replicas. 1012 */ reduceNumOfRacks(StorageGroup source, StorageGroup target, DBlock block)1013 private boolean reduceNumOfRacks(StorageGroup source, StorageGroup target, 1014 DBlock block) { 1015 final DatanodeInfo sourceDn = source.getDatanodeInfo(); 1016 if (cluster.isOnSameRack(sourceDn, target.getDatanodeInfo())) { 1017 // source and target are on the same rack 1018 return false; 1019 } 1020 boolean notOnSameRack = true; 1021 synchronized (block) { 1022 for (StorageGroup loc : block.getLocations()) { 1023 if (cluster.isOnSameRack(loc.getDatanodeInfo(), target.getDatanodeInfo())) { 1024 notOnSameRack = false; 1025 break; 1026 } 1027 } 1028 } 1029 if (notOnSameRack) { 1030 // target is not on the same rack as any replica 1031 return false; 1032 } 1033 for (StorageGroup g : block.getLocations()) { 1034 if (g != source && cluster.isOnSameRack(g.getDatanodeInfo(), sourceDn)) { 1035 // source is on the same rack of another replica 1036 return false; 1037 } 1038 } 1039 return true; 1040 } 1041 1042 /** 1043 * Check if there are any replica (other than source) on the same node group 1044 * with target. If true, then target is not a good candidate for placing 1045 * specific replica as we don't want 2 replicas under the same nodegroup. 1046 * 1047 * @return true if there are any replica (other than source) on the same node 1048 * group with target 1049 */ isOnSameNodeGroupWithReplicas(StorageGroup source, StorageGroup target, DBlock block)1050 private boolean isOnSameNodeGroupWithReplicas(StorageGroup source, 1051 StorageGroup target, DBlock block) { 1052 final DatanodeInfo targetDn = target.getDatanodeInfo(); 1053 for (StorageGroup g : block.getLocations()) { 1054 if (g != source && cluster.isOnSameNodeGroup(g.getDatanodeInfo(), targetDn)) { 1055 return true; 1056 } 1057 } 1058 return false; 1059 } 1060 1061 /** Reset all fields in order to prepare for the next iteration */ reset(Configuration conf)1062 void reset(Configuration conf) { 1063 cluster = NetworkTopology.getInstance(conf); 1064 storageGroupMap.clear(); 1065 sources.clear(); 1066 targets.clear(); 1067 globalBlocks.removeAllButRetain(movedBlocks); 1068 movedBlocks.cleanup(); 1069 } 1070 1071 /** set the sleeping period for block move completion check */ 1072 @VisibleForTesting setBlockMoveWaitTime(long time)1073 public static void setBlockMoveWaitTime(long time) { 1074 blockMoveWaitTime = time; 1075 } 1076 1077 @VisibleForTesting setDelayAfterErrors(long time)1078 public static void setDelayAfterErrors(long time) { 1079 delayAfterErrors = time; 1080 } 1081 1082 /** shutdown thread pools */ shutdownNow()1083 public void shutdownNow() { 1084 if (dispatchExecutor != null) { 1085 dispatchExecutor.shutdownNow(); 1086 } 1087 moveExecutor.shutdownNow(); 1088 } 1089 1090 static class Util { 1091 /** @return true if data node is part of the excludedNodes. */ isExcluded(Set<String> excludedNodes, DatanodeInfo dn)1092 static boolean isExcluded(Set<String> excludedNodes, DatanodeInfo dn) { 1093 return isIn(excludedNodes, dn); 1094 } 1095 1096 /** 1097 * @return true if includedNodes is empty or data node is part of the 1098 * includedNodes. 1099 */ isIncluded(Set<String> includedNodes, DatanodeInfo dn)1100 static boolean isIncluded(Set<String> includedNodes, DatanodeInfo dn) { 1101 return (includedNodes.isEmpty() || isIn(includedNodes, dn)); 1102 } 1103 1104 /** 1105 * Match is checked using host name , ip address with and without port 1106 * number. 1107 * 1108 * @return true if the datanode's transfer address matches the set of nodes. 1109 */ isIn(Set<String> datanodes, DatanodeInfo dn)1110 private static boolean isIn(Set<String> datanodes, DatanodeInfo dn) { 1111 return isIn(datanodes, dn.getPeerHostName(), dn.getXferPort()) 1112 || isIn(datanodes, dn.getIpAddr(), dn.getXferPort()) 1113 || isIn(datanodes, dn.getHostName(), dn.getXferPort()); 1114 } 1115 1116 /** @return true if nodes contains host or host:port */ isIn(Set<String> nodes, String host, int port)1117 private static boolean isIn(Set<String> nodes, String host, int port) { 1118 if (host == null) { 1119 return false; 1120 } 1121 return (nodes.contains(host) || nodes.contains(host + ":" + port)); 1122 } 1123 1124 /** 1125 * Parse a comma separated string to obtain set of host names 1126 * 1127 * @return set of host names 1128 */ parseHostList(String string)1129 static Set<String> parseHostList(String string) { 1130 String[] addrs = StringUtils.getTrimmedStrings(string); 1131 return new HashSet<String>(Arrays.asList(addrs)); 1132 } 1133 1134 /** 1135 * Read set of host names from a file 1136 * 1137 * @return set of host names 1138 */ getHostListFromFile(String fileName, String type)1139 static Set<String> getHostListFromFile(String fileName, String type) { 1140 Set<String> nodes = new HashSet<String>(); 1141 try { 1142 HostsFileReader.readFileToSet(type, fileName, nodes); 1143 return StringUtils.getTrimmedStrings(nodes); 1144 } catch (IOException e) { 1145 throw new IllegalArgumentException( 1146 "Failed to read host list from file: " + fileName); 1147 } 1148 } 1149 } 1150 } 1151