1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 package org.apache.hadoop.hdfs.server.balancer;
19 
20 import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed;
21 
22 import java.io.BufferedInputStream;
23 import java.io.BufferedOutputStream;
24 import java.io.DataInputStream;
25 import java.io.DataOutputStream;
26 import java.io.IOException;
27 import java.io.InputStream;
28 import java.io.OutputStream;
29 import java.net.Socket;
30 import java.util.ArrayList;
31 import java.util.Arrays;
32 import java.util.Collection;
33 import java.util.EnumMap;
34 import java.util.HashMap;
35 import java.util.HashSet;
36 import java.util.Iterator;
37 import java.util.List;
38 import java.util.Map;
39 import java.util.Set;
40 import java.util.concurrent.ExecutionException;
41 import java.util.concurrent.ExecutorService;
42 import java.util.concurrent.Executors;
43 import java.util.concurrent.Future;
44 
45 import org.apache.commons.logging.Log;
46 import org.apache.commons.logging.LogFactory;
47 import org.apache.hadoop.classification.InterfaceAudience;
48 import org.apache.hadoop.conf.Configuration;
49 import org.apache.hadoop.fs.CommonConfigurationKeys;
50 import org.apache.hadoop.fs.StorageType;
51 import org.apache.hadoop.hdfs.DFSConfigKeys;
52 import org.apache.hadoop.hdfs.DFSUtil;
53 import org.apache.hadoop.hdfs.DistributedFileSystem;
54 import org.apache.hadoop.hdfs.protocol.Block;
55 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
56 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
57 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
58 import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
59 import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
60 import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
61 import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
62 import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil;
63 import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
64 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
65 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
66 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
67 import org.apache.hadoop.hdfs.server.balancer.Dispatcher.DDatanode.StorageGroup;
68 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
69 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
70 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
71 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
72 import org.apache.hadoop.io.IOUtils;
73 import org.apache.hadoop.net.NetUtils;
74 import org.apache.hadoop.net.NetworkTopology;
75 import org.apache.hadoop.security.token.Token;
76 import org.apache.hadoop.util.HostsFileReader;
77 import org.apache.hadoop.util.StringUtils;
78 import org.apache.hadoop.util.Time;
79 
80 import com.google.common.annotations.VisibleForTesting;
81 import com.google.common.base.Preconditions;
82 
83 /** Dispatching block replica moves between datanodes. */
84 @InterfaceAudience.Private
85 public class Dispatcher {
86   static final Log LOG = LogFactory.getLog(Dispatcher.class);
87 
88   private static final long GB = 1L << 30; // 1GB
89   private static final long MAX_BLOCKS_SIZE_TO_FETCH = 2 * GB;
90 
91   private static final int MAX_NO_PENDING_MOVE_ITERATIONS = 5;
92   /**
93    * the period of time to delay the usage of a DataNode after hitting
94    * errors when using it for migrating data
95    */
96   private static long delayAfterErrors = 10 * 1000;
97 
98   private final NameNodeConnector nnc;
99   private final SaslDataTransferClient saslClient;
100 
101   /** Set of datanodes to be excluded. */
102   private final Set<String> excludedNodes;
103   /** Restrict to the following nodes. */
104   private final Set<String> includedNodes;
105 
106   private final Collection<Source> sources = new HashSet<Source>();
107   private final Collection<StorageGroup> targets = new HashSet<StorageGroup>();
108 
109   private final GlobalBlockMap globalBlocks = new GlobalBlockMap();
110   private final MovedBlocks<StorageGroup> movedBlocks;
111 
112   /** Map (datanodeUuid,storageType -> StorageGroup) */
113   private final StorageGroupMap<StorageGroup> storageGroupMap
114       = new StorageGroupMap<StorageGroup>();
115 
116   private NetworkTopology cluster;
117 
118   private final ExecutorService moveExecutor;
119   private final ExecutorService dispatchExecutor;
120 
121   /** The maximum number of concurrent blocks moves at a datanode */
122   private final int maxConcurrentMovesPerNode;
123 
124   private static class GlobalBlockMap {
125     private final Map<Block, DBlock> map = new HashMap<Block, DBlock>();
126 
127     /**
128      * Get the block from the map;
129      * if the block is not found, create a new block and put it in the map.
130      */
get(Block b)131     private DBlock get(Block b) {
132       DBlock block = map.get(b);
133       if (block == null) {
134         block = new DBlock(b);
135         map.put(b, block);
136       }
137       return block;
138     }
139 
140     /** Remove all blocks except for the moved blocks. */
removeAllButRetain(MovedBlocks<StorageGroup> movedBlocks)141     private void removeAllButRetain(MovedBlocks<StorageGroup> movedBlocks) {
142       for (Iterator<Block> i = map.keySet().iterator(); i.hasNext();) {
143         if (!movedBlocks.contains(i.next())) {
144           i.remove();
145         }
146       }
147     }
148   }
149 
150   public static class StorageGroupMap<G extends StorageGroup> {
toKey(String datanodeUuid, StorageType storageType)151     private static String toKey(String datanodeUuid, StorageType storageType) {
152       return datanodeUuid + ":" + storageType;
153     }
154 
155     private final Map<String, G> map = new HashMap<String, G>();
156 
get(String datanodeUuid, StorageType storageType)157     public G get(String datanodeUuid, StorageType storageType) {
158       return map.get(toKey(datanodeUuid, storageType));
159     }
160 
put(G g)161     public void put(G g) {
162       final String key = toKey(g.getDatanodeInfo().getDatanodeUuid(), g.storageType);
163       final StorageGroup existing = map.put(key, g);
164       Preconditions.checkState(existing == null);
165     }
166 
size()167     int size() {
168       return map.size();
169     }
170 
clear()171     void clear() {
172       map.clear();
173     }
174 
values()175     public Collection<G> values() {
176       return map.values();
177     }
178   }
179 
180   /** This class keeps track of a scheduled block move */
181   public class PendingMove {
182     private DBlock block;
183     private Source source;
184     private DDatanode proxySource;
185     private StorageGroup target;
186 
PendingMove(Source source, StorageGroup target)187     private PendingMove(Source source, StorageGroup target) {
188       this.source = source;
189       this.target = target;
190     }
191 
192     @Override
toString()193     public String toString() {
194       final Block b = block != null ? block.getBlock() : null;
195       String bStr = b != null ? (b + " with size=" + b.getNumBytes() + " ")
196           : " ";
197       return bStr + "from " + source.getDisplayName() + " to " + target
198           .getDisplayName() + " through " + (proxySource != null ? proxySource
199           .datanode : "");
200     }
201 
202     /**
203      * Choose a block & a proxy source for this pendingMove whose source &
204      * target have already been chosen.
205      *
206      * @return true if a block and its proxy are chosen; false otherwise
207      */
chooseBlockAndProxy()208     private boolean chooseBlockAndProxy() {
209       // source and target must have the same storage type
210       final StorageType t = source.getStorageType();
211       // iterate all source's blocks until find a good one
212       for (Iterator<DBlock> i = source.getBlockIterator(); i.hasNext();) {
213         if (markMovedIfGoodBlock(i.next(), t)) {
214           i.remove();
215           return true;
216         }
217       }
218       return false;
219     }
220 
221     /**
222      * @return true if the given block is good for the tentative move.
223      */
markMovedIfGoodBlock(DBlock block, StorageType targetStorageType)224     private boolean markMovedIfGoodBlock(DBlock block, StorageType targetStorageType) {
225       synchronized (block) {
226         synchronized (movedBlocks) {
227           if (isGoodBlockCandidate(source, target, targetStorageType, block)) {
228             this.block = block;
229             if (chooseProxySource()) {
230               movedBlocks.put(block);
231               if (LOG.isDebugEnabled()) {
232                 LOG.debug("Decided to move " + this);
233               }
234               return true;
235             }
236           }
237         }
238       }
239       return false;
240     }
241 
242     /**
243      * Choose a proxy source.
244      *
245      * @return true if a proxy is found; otherwise false
246      */
chooseProxySource()247     private boolean chooseProxySource() {
248       final DatanodeInfo targetDN = target.getDatanodeInfo();
249       // if source and target are same nodes then no need of proxy
250       if (source.getDatanodeInfo().equals(targetDN) && addTo(source)) {
251         return true;
252       }
253       // if node group is supported, first try add nodes in the same node group
254       if (cluster.isNodeGroupAware()) {
255         for (StorageGroup loc : block.getLocations()) {
256           if (cluster.isOnSameNodeGroup(loc.getDatanodeInfo(), targetDN)
257               && addTo(loc)) {
258             return true;
259           }
260         }
261       }
262       // check if there is replica which is on the same rack with the target
263       for (StorageGroup loc : block.getLocations()) {
264         if (cluster.isOnSameRack(loc.getDatanodeInfo(), targetDN) && addTo(loc)) {
265           return true;
266         }
267       }
268       // find out a non-busy replica
269       for (StorageGroup loc : block.getLocations()) {
270         if (addTo(loc)) {
271           return true;
272         }
273       }
274       return false;
275     }
276 
277     /** add to a proxy source for specific block movement */
addTo(StorageGroup g)278     private boolean addTo(StorageGroup g) {
279       final DDatanode dn = g.getDDatanode();
280       if (dn.addPendingBlock(this)) {
281         proxySource = dn;
282         return true;
283       }
284       return false;
285     }
286 
287     /** Dispatch the move to the proxy source & wait for the response. */
dispatch()288     private void dispatch() {
289       if (LOG.isDebugEnabled()) {
290         LOG.debug("Start moving " + this);
291       }
292 
293       Socket sock = new Socket();
294       DataOutputStream out = null;
295       DataInputStream in = null;
296       try {
297         sock.connect(
298             NetUtils.createSocketAddr(target.getDatanodeInfo().getXferAddr()),
299             HdfsServerConstants.READ_TIMEOUT);
300 
301         sock.setKeepAlive(true);
302 
303         OutputStream unbufOut = sock.getOutputStream();
304         InputStream unbufIn = sock.getInputStream();
305         ExtendedBlock eb = new ExtendedBlock(nnc.getBlockpoolID(),
306             block.getBlock());
307         final KeyManager km = nnc.getKeyManager();
308         Token<BlockTokenIdentifier> accessToken = km.getAccessToken(eb);
309         IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut,
310             unbufIn, km, accessToken, target.getDatanodeInfo());
311         unbufOut = saslStreams.out;
312         unbufIn = saslStreams.in;
313         out = new DataOutputStream(new BufferedOutputStream(unbufOut,
314             HdfsConstants.IO_FILE_BUFFER_SIZE));
315         in = new DataInputStream(new BufferedInputStream(unbufIn,
316             HdfsConstants.IO_FILE_BUFFER_SIZE));
317 
318         sendRequest(out, eb, accessToken);
319         receiveResponse(in);
320         nnc.getBytesMoved().addAndGet(block.getNumBytes());
321         LOG.info("Successfully moved " + this);
322       } catch (IOException e) {
323         LOG.warn("Failed to move " + this + ": " + e.getMessage());
324         target.getDDatanode().setHasFailure();
325         // Proxy or target may have some issues, delay before using these nodes
326         // further in order to avoid a potential storm of "threads quota
327         // exceeded" warnings when the dispatcher gets out of sync with work
328         // going on in datanodes.
329         proxySource.activateDelay(delayAfterErrors);
330         target.getDDatanode().activateDelay(delayAfterErrors);
331       } finally {
332         IOUtils.closeStream(out);
333         IOUtils.closeStream(in);
334         IOUtils.closeSocket(sock);
335 
336         proxySource.removePendingBlock(this);
337         target.getDDatanode().removePendingBlock(this);
338 
339         synchronized (this) {
340           reset();
341         }
342         synchronized (Dispatcher.this) {
343           Dispatcher.this.notifyAll();
344         }
345       }
346     }
347 
348     /** Send a block replace request to the output stream */
sendRequest(DataOutputStream out, ExtendedBlock eb, Token<BlockTokenIdentifier> accessToken)349     private void sendRequest(DataOutputStream out, ExtendedBlock eb,
350         Token<BlockTokenIdentifier> accessToken) throws IOException {
351       new Sender(out).replaceBlock(eb, target.storageType, accessToken,
352           source.getDatanodeInfo().getDatanodeUuid(), proxySource.datanode);
353     }
354 
355     /** Receive a block copy response from the input stream */
receiveResponse(DataInputStream in)356     private void receiveResponse(DataInputStream in) throws IOException {
357       BlockOpResponseProto response =
358           BlockOpResponseProto.parseFrom(vintPrefixed(in));
359       while (response.getStatus() == Status.IN_PROGRESS) {
360         // read intermediate responses
361         response = BlockOpResponseProto.parseFrom(vintPrefixed(in));
362       }
363       String logInfo = "block move is failed";
364       DataTransferProtoUtil.checkBlockOpStatus(response, logInfo);
365     }
366 
367     /** reset the object */
reset()368     private void reset() {
369       block = null;
370       source = null;
371       proxySource = null;
372       target = null;
373     }
374   }
375 
376   /** A class for keeping track of block locations in the dispatcher. */
377   public static class DBlock extends MovedBlocks.Locations<StorageGroup> {
DBlock(Block block)378     public DBlock(Block block) {
379       super(block);
380     }
381   }
382 
383   /** The class represents a desired move. */
384   static class Task {
385     private final StorageGroup target;
386     private long size; // bytes scheduled to move
387 
Task(StorageGroup target, long size)388     Task(StorageGroup target, long size) {
389       this.target = target;
390       this.size = size;
391     }
392 
getSize()393     long getSize() {
394       return size;
395     }
396   }
397 
398   /** A class that keeps track of a datanode. */
399   public static class DDatanode {
400 
401     /** A group of storages in a datanode with the same storage type. */
402     public class StorageGroup {
403       final StorageType storageType;
404       final long maxSize2Move;
405       private long scheduledSize = 0L;
406 
StorageGroup(StorageType storageType, long maxSize2Move)407       private StorageGroup(StorageType storageType, long maxSize2Move) {
408         this.storageType = storageType;
409         this.maxSize2Move = maxSize2Move;
410       }
411 
getStorageType()412       public StorageType getStorageType() {
413         return storageType;
414       }
415 
getDDatanode()416       private DDatanode getDDatanode() {
417         return DDatanode.this;
418       }
419 
getDatanodeInfo()420       public DatanodeInfo getDatanodeInfo() {
421         return DDatanode.this.datanode;
422       }
423 
424       /** Decide if still need to move more bytes */
hasSpaceForScheduling()425       boolean hasSpaceForScheduling() {
426         return hasSpaceForScheduling(0L);
427       }
428 
hasSpaceForScheduling(long size)429       synchronized boolean hasSpaceForScheduling(long size) {
430         return availableSizeToMove() > size;
431       }
432 
433       /** @return the total number of bytes that need to be moved */
availableSizeToMove()434       synchronized long availableSizeToMove() {
435         return maxSize2Move - scheduledSize;
436       }
437 
438       /** increment scheduled size */
incScheduledSize(long size)439       public synchronized void incScheduledSize(long size) {
440         scheduledSize += size;
441       }
442 
443       /** @return scheduled size */
getScheduledSize()444       synchronized long getScheduledSize() {
445         return scheduledSize;
446       }
447 
448       /** Reset scheduled size to zero. */
resetScheduledSize()449       synchronized void resetScheduledSize() {
450         scheduledSize = 0L;
451       }
452 
addPendingMove(DBlock block, final PendingMove pm)453       private PendingMove addPendingMove(DBlock block, final PendingMove pm) {
454         if (getDDatanode().addPendingBlock(pm)) {
455           if (pm.markMovedIfGoodBlock(block, getStorageType())) {
456             incScheduledSize(pm.block.getNumBytes());
457             return pm;
458           } else {
459             getDDatanode().removePendingBlock(pm);
460           }
461         }
462         return null;
463       }
464 
465       /** @return the name for display */
getDisplayName()466       String getDisplayName() {
467         return datanode + ":" + storageType;
468       }
469 
470       @Override
toString()471       public String toString() {
472         return getDisplayName();
473       }
474 
475       @Override
hashCode()476       public int hashCode() {
477         return getStorageType().hashCode() ^ getDatanodeInfo().hashCode();
478       }
479 
480       @Override
equals(Object obj)481       public boolean equals(Object obj) {
482         if (this == obj) {
483           return true;
484         } else if (obj == null || !(obj instanceof StorageGroup)) {
485           return false;
486         } else {
487           final StorageGroup that = (StorageGroup) obj;
488           return this.getStorageType() == that.getStorageType()
489               && this.getDatanodeInfo().equals(that.getDatanodeInfo());
490         }
491       }
492 
493     }
494 
495     final DatanodeInfo datanode;
496     private final EnumMap<StorageType, Source> sourceMap
497         = new EnumMap<StorageType, Source>(StorageType.class);
498     private final EnumMap<StorageType, StorageGroup> targetMap
499         = new EnumMap<StorageType, StorageGroup>(StorageType.class);
500     protected long delayUntil = 0L;
501     /** blocks being moved but not confirmed yet */
502     private final List<PendingMove> pendings;
503     private volatile boolean hasFailure = false;
504     private final int maxConcurrentMoves;
505 
506     @Override
toString()507     public String toString() {
508       return getClass().getSimpleName() + ":" + datanode;
509     }
510 
DDatanode(DatanodeInfo datanode, int maxConcurrentMoves)511     private DDatanode(DatanodeInfo datanode, int maxConcurrentMoves) {
512       this.datanode = datanode;
513       this.maxConcurrentMoves = maxConcurrentMoves;
514       this.pendings = new ArrayList<PendingMove>(maxConcurrentMoves);
515     }
516 
getDatanodeInfo()517     public DatanodeInfo getDatanodeInfo() {
518       return datanode;
519     }
520 
put(StorageType storageType, G g, EnumMap<StorageType, G> map)521     private static <G extends StorageGroup> void put(StorageType storageType,
522         G g, EnumMap<StorageType, G> map) {
523       final StorageGroup existing = map.put(storageType, g);
524       Preconditions.checkState(existing == null);
525     }
526 
addTarget(StorageType storageType, long maxSize2Move)527     public StorageGroup addTarget(StorageType storageType, long maxSize2Move) {
528       final StorageGroup g = new StorageGroup(storageType, maxSize2Move);
529       put(storageType, g, targetMap);
530       return g;
531     }
532 
addSource(StorageType storageType, long maxSize2Move, Dispatcher d)533     public Source addSource(StorageType storageType, long maxSize2Move, Dispatcher d) {
534       final Source s = d.new Source(storageType, maxSize2Move, this);
535       put(storageType, s, sourceMap);
536       return s;
537     }
538 
activateDelay(long delta)539     synchronized private void activateDelay(long delta) {
540       delayUntil = Time.monotonicNow() + delta;
541     }
542 
isDelayActive()543     synchronized private boolean isDelayActive() {
544       if (delayUntil == 0 || Time.monotonicNow() > delayUntil) {
545         delayUntil = 0;
546         return false;
547       }
548       return true;
549     }
550 
551     /** Check if the node can schedule more blocks to move */
isPendingQNotFull()552     synchronized boolean isPendingQNotFull() {
553       return pendings.size() < maxConcurrentMoves;
554     }
555 
556     /** Check if all the dispatched moves are done */
isPendingQEmpty()557     synchronized boolean isPendingQEmpty() {
558       return pendings.isEmpty();
559     }
560 
561     /** Add a scheduled block move to the node */
addPendingBlock(PendingMove pendingBlock)562     synchronized boolean addPendingBlock(PendingMove pendingBlock) {
563       if (!isDelayActive() && isPendingQNotFull()) {
564         return pendings.add(pendingBlock);
565       }
566       return false;
567     }
568 
569     /** Remove a scheduled block move from the node */
removePendingBlock(PendingMove pendingBlock)570     synchronized boolean removePendingBlock(PendingMove pendingBlock) {
571       return pendings.remove(pendingBlock);
572     }
573 
setHasFailure()574     void setHasFailure() {
575       this.hasFailure = true;
576     }
577   }
578 
579   /** A node that can be the sources of a block move */
580   public class Source extends DDatanode.StorageGroup {
581 
582     private final List<Task> tasks = new ArrayList<Task>(2);
583     private long blocksToReceive = 0L;
584     /**
585      * Source blocks point to the objects in {@link Dispatcher#globalBlocks}
586      * because we want to keep one copy of a block and be aware that the
587      * locations are changing over time.
588      */
589     private final List<DBlock> srcBlocks = new ArrayList<DBlock>();
590 
Source(StorageType storageType, long maxSize2Move, DDatanode dn)591     private Source(StorageType storageType, long maxSize2Move, DDatanode dn) {
592       dn.super(storageType, maxSize2Move);
593     }
594 
595     /** Add a task */
addTask(Task task)596     void addTask(Task task) {
597       Preconditions.checkState(task.target != this,
598           "Source and target are the same storage group " + getDisplayName());
599       incScheduledSize(task.size);
600       tasks.add(task);
601     }
602 
603     /** @return an iterator to this source's blocks */
getBlockIterator()604     Iterator<DBlock> getBlockIterator() {
605       return srcBlocks.iterator();
606     }
607 
608     /**
609      * Fetch new blocks of this source from namenode and update this source's
610      * block list & {@link Dispatcher#globalBlocks}.
611      *
612      * @return the total size of the received blocks in the number of bytes.
613      */
getBlockList()614     private long getBlockList() throws IOException {
615       final long size = Math.min(MAX_BLOCKS_SIZE_TO_FETCH, blocksToReceive);
616       final BlocksWithLocations newBlocks = nnc.getBlocks(getDatanodeInfo(), size);
617 
618       long bytesReceived = 0;
619       for (BlockWithLocations blk : newBlocks.getBlocks()) {
620         bytesReceived += blk.getBlock().getNumBytes();
621         synchronized (globalBlocks) {
622           final DBlock block = globalBlocks.get(blk.getBlock());
623           synchronized (block) {
624             block.clearLocations();
625 
626             // update locations
627             final String[] datanodeUuids = blk.getDatanodeUuids();
628             final StorageType[] storageTypes = blk.getStorageTypes();
629             for (int i = 0; i < datanodeUuids.length; i++) {
630               final StorageGroup g = storageGroupMap.get(
631                   datanodeUuids[i], storageTypes[i]);
632               if (g != null) { // not unknown
633                 block.addLocation(g);
634               }
635             }
636           }
637           if (!srcBlocks.contains(block) && isGoodBlockCandidate(block)) {
638             // filter bad candidates
639             srcBlocks.add(block);
640           }
641         }
642       }
643       return bytesReceived;
644     }
645 
646     /** Decide if the given block is a good candidate to move or not */
isGoodBlockCandidate(DBlock block)647     private boolean isGoodBlockCandidate(DBlock block) {
648       // source and target must have the same storage type
649       final StorageType sourceStorageType = getStorageType();
650       for (Task t : tasks) {
651         if (Dispatcher.this.isGoodBlockCandidate(this, t.target,
652             sourceStorageType, block)) {
653           return true;
654         }
655       }
656       return false;
657     }
658 
659     /**
660      * Choose a move for the source. The block's source, target, and proxy
661      * are determined too. When choosing proxy and target, source &
662      * target throttling has been considered. They are chosen only when they
663      * have the capacity to support this block move. The block should be
664      * dispatched immediately after this method is returned.
665      *
666      * @return a move that's good for the source to dispatch immediately.
667      */
chooseNextMove()668     private PendingMove chooseNextMove() {
669       for (Iterator<Task> i = tasks.iterator(); i.hasNext();) {
670         final Task task = i.next();
671         final DDatanode target = task.target.getDDatanode();
672         final PendingMove pendingBlock = new PendingMove(this, task.target);
673         if (target.addPendingBlock(pendingBlock)) {
674           // target is not busy, so do a tentative block allocation
675           if (pendingBlock.chooseBlockAndProxy()) {
676             long blockSize = pendingBlock.block.getNumBytes();
677             incScheduledSize(-blockSize);
678             task.size -= blockSize;
679             if (task.size == 0) {
680               i.remove();
681             }
682             return pendingBlock;
683           } else {
684             // cancel the tentative move
685             target.removePendingBlock(pendingBlock);
686           }
687         }
688       }
689       return null;
690     }
691 
692     /** Add a pending move */
addPendingMove(DBlock block, StorageGroup target)693     public PendingMove addPendingMove(DBlock block, StorageGroup target) {
694       return target.addPendingMove(block, new PendingMove(this, target));
695     }
696 
697     /** Iterate all source's blocks to remove moved ones */
removeMovedBlocks()698     private void removeMovedBlocks() {
699       for (Iterator<DBlock> i = getBlockIterator(); i.hasNext();) {
700         if (movedBlocks.contains(i.next().getBlock())) {
701           i.remove();
702         }
703       }
704     }
705 
706     private static final int SOURCE_BLOCKS_MIN_SIZE = 5;
707 
708     /** @return if should fetch more blocks from namenode */
shouldFetchMoreBlocks()709     private boolean shouldFetchMoreBlocks() {
710       return srcBlocks.size() < SOURCE_BLOCKS_MIN_SIZE && blocksToReceive > 0;
711     }
712 
713     private static final long MAX_ITERATION_TIME = 20 * 60 * 1000L; // 20 mins
714 
715     /**
716      * This method iteratively does the following: it first selects a block to
717      * move, then sends a request to the proxy source to start the block move
718      * when the source's block list falls below a threshold, it asks the
719      * namenode for more blocks. It terminates when it has dispatch enough block
720      * move tasks or it has received enough blocks from the namenode, or the
721      * elapsed time of the iteration has exceeded the max time limit.
722      */
dispatchBlocks()723     private void dispatchBlocks() {
724       final long startTime = Time.monotonicNow();
725       this.blocksToReceive = 2 * getScheduledSize();
726       boolean isTimeUp = false;
727       int noPendingMoveIteration = 0;
728       while (!isTimeUp && getScheduledSize() > 0
729           && (!srcBlocks.isEmpty() || blocksToReceive > 0)) {
730         final PendingMove p = chooseNextMove();
731         if (p != null) {
732           // Reset no pending move counter
733           noPendingMoveIteration=0;
734           executePendingMove(p);
735           continue;
736         }
737 
738         // Since we cannot schedule any block to move,
739         // remove any moved blocks from the source block list and
740         removeMovedBlocks(); // filter already moved blocks
741         // check if we should fetch more blocks from the namenode
742         if (shouldFetchMoreBlocks()) {
743           // fetch new blocks
744           try {
745             blocksToReceive -= getBlockList();
746             continue;
747           } catch (IOException e) {
748             LOG.warn("Exception while getting block list", e);
749             return;
750           }
751         } else {
752           // source node cannot find a pending block to move, iteration +1
753           noPendingMoveIteration++;
754           // in case no blocks can be moved for source node's task,
755           // jump out of while-loop after 5 iterations.
756           if (noPendingMoveIteration >= MAX_NO_PENDING_MOVE_ITERATIONS) {
757             resetScheduledSize();
758           }
759         }
760 
761         // check if time is up or not
762         if (Time.monotonicNow() - startTime > MAX_ITERATION_TIME) {
763           isTimeUp = true;
764           continue;
765         }
766 
767         // Now we can not schedule any block to move and there are
768         // no new blocks added to the source block list, so we wait.
769         try {
770           synchronized (Dispatcher.this) {
771             Dispatcher.this.wait(1000); // wait for targets/sources to be idle
772           }
773         } catch (InterruptedException ignored) {
774         }
775       }
776     }
777 
778     @Override
hashCode()779     public int hashCode() {
780       return super.hashCode();
781     }
782 
783     @Override
equals(Object obj)784     public boolean equals(Object obj) {
785       return super.equals(obj);
786     }
787   }
788 
Dispatcher(NameNodeConnector nnc, Set<String> includedNodes, Set<String> excludedNodes, long movedWinWidth, int moverThreads, int dispatcherThreads, int maxConcurrentMovesPerNode, Configuration conf)789   public Dispatcher(NameNodeConnector nnc, Set<String> includedNodes,
790       Set<String> excludedNodes, long movedWinWidth, int moverThreads,
791       int dispatcherThreads, int maxConcurrentMovesPerNode, Configuration conf) {
792     this.nnc = nnc;
793     this.excludedNodes = excludedNodes;
794     this.includedNodes = includedNodes;
795     this.movedBlocks = new MovedBlocks<StorageGroup>(movedWinWidth);
796 
797     this.cluster = NetworkTopology.getInstance(conf);
798 
799     this.moveExecutor = Executors.newFixedThreadPool(moverThreads);
800     this.dispatchExecutor = dispatcherThreads == 0? null
801         : Executors.newFixedThreadPool(dispatcherThreads);
802     this.maxConcurrentMovesPerNode = maxConcurrentMovesPerNode;
803 
804     this.saslClient = new SaslDataTransferClient(conf,
805         DataTransferSaslUtil.getSaslPropertiesResolver(conf),
806         TrustedChannelResolver.getInstance(conf), nnc.fallbackToSimpleAuth);
807   }
808 
getDistributedFileSystem()809   public DistributedFileSystem getDistributedFileSystem() {
810     return nnc.getDistributedFileSystem();
811   }
812 
getStorageGroupMap()813   public StorageGroupMap<StorageGroup> getStorageGroupMap() {
814     return storageGroupMap;
815   }
816 
getCluster()817   public NetworkTopology getCluster() {
818     return cluster;
819   }
820 
getBytesMoved()821   long getBytesMoved() {
822     return nnc.getBytesMoved().get();
823   }
824 
bytesToMove()825   long bytesToMove() {
826     Preconditions.checkState(
827         storageGroupMap.size() >= sources.size() + targets.size(),
828         "Mismatched number of storage groups (" + storageGroupMap.size()
829             + " < " + sources.size() + " sources + " + targets.size()
830             + " targets)");
831 
832     long b = 0L;
833     for (Source src : sources) {
834       b += src.getScheduledSize();
835     }
836     return b;
837   }
838 
add(Source source, StorageGroup target)839   void add(Source source, StorageGroup target) {
840     sources.add(source);
841     targets.add(target);
842   }
843 
shouldIgnore(DatanodeInfo dn)844   private boolean shouldIgnore(DatanodeInfo dn) {
845     // ignore decommissioned nodes
846     final boolean decommissioned = dn.isDecommissioned();
847     // ignore decommissioning nodes
848     final boolean decommissioning = dn.isDecommissionInProgress();
849     // ignore nodes in exclude list
850     final boolean excluded = Util.isExcluded(excludedNodes, dn);
851     // ignore nodes not in the include list (if include list is not empty)
852     final boolean notIncluded = !Util.isIncluded(includedNodes, dn);
853 
854     if (decommissioned || decommissioning || excluded || notIncluded) {
855       if (LOG.isTraceEnabled()) {
856         LOG.trace("Excluding datanode " + dn + ": " + decommissioned + ", "
857             + decommissioning + ", " + excluded + ", " + notIncluded);
858       }
859       return true;
860     }
861     return false;
862   }
863 
864   /** Get live datanode storage reports and then build the network topology. */
init()865   public List<DatanodeStorageReport> init() throws IOException {
866     final DatanodeStorageReport[] reports = nnc.getLiveDatanodeStorageReport();
867     final List<DatanodeStorageReport> trimmed = new ArrayList<DatanodeStorageReport>();
868     // create network topology and classify utilization collections:
869     // over-utilized, above-average, below-average and under-utilized.
870     for (DatanodeStorageReport r : DFSUtil.shuffle(reports)) {
871       final DatanodeInfo datanode = r.getDatanodeInfo();
872       if (shouldIgnore(datanode)) {
873         continue;
874       }
875       trimmed.add(r);
876       cluster.add(datanode);
877     }
878     return trimmed;
879   }
880 
newDatanode(DatanodeInfo datanode)881   public DDatanode newDatanode(DatanodeInfo datanode) {
882     return new DDatanode(datanode, maxConcurrentMovesPerNode);
883   }
884 
executePendingMove(final PendingMove p)885   public void executePendingMove(final PendingMove p) {
886     // move the block
887     moveExecutor.execute(new Runnable() {
888       @Override
889       public void run() {
890         p.dispatch();
891       }
892     });
893   }
894 
dispatchAndCheckContinue()895   public boolean dispatchAndCheckContinue() throws InterruptedException {
896     return nnc.shouldContinue(dispatchBlockMoves());
897   }
898 
899   /**
900    * Dispatch block moves for each source. The thread selects blocks to move &
901    * sends request to proxy source to initiate block move. The process is flow
902    * controlled. Block selection is blocked if there are too many un-confirmed
903    * block moves.
904    *
905    * @return the total number of bytes successfully moved in this iteration.
906    */
dispatchBlockMoves()907   private long dispatchBlockMoves() throws InterruptedException {
908     final long bytesLastMoved = getBytesMoved();
909     final Future<?>[] futures = new Future<?>[sources.size()];
910 
911     final Iterator<Source> i = sources.iterator();
912     for (int j = 0; j < futures.length; j++) {
913       final Source s = i.next();
914       futures[j] = dispatchExecutor.submit(new Runnable() {
915         @Override
916         public void run() {
917           s.dispatchBlocks();
918         }
919       });
920     }
921 
922     // wait for all dispatcher threads to finish
923     for (Future<?> future : futures) {
924       try {
925         future.get();
926       } catch (ExecutionException e) {
927         LOG.warn("Dispatcher thread failed", e.getCause());
928       }
929     }
930 
931     // wait for all block moving to be done
932     waitForMoveCompletion(targets);
933 
934     return getBytesMoved() - bytesLastMoved;
935   }
936 
937   /** The sleeping period before checking if block move is completed again */
938   static private long blockMoveWaitTime = 30000L;
939 
940   /**
941    * Wait for all block move confirmations.
942    * @return true if there is failed move execution
943    */
waitForMoveCompletion( Iterable<? extends StorageGroup> targets)944   public static boolean waitForMoveCompletion(
945       Iterable<? extends StorageGroup> targets) {
946     boolean hasFailure = false;
947     for(;;) {
948       boolean empty = true;
949       for (StorageGroup t : targets) {
950         if (!t.getDDatanode().isPendingQEmpty()) {
951           empty = false;
952           break;
953         } else {
954           hasFailure |= t.getDDatanode().hasFailure;
955         }
956       }
957       if (empty) {
958         return hasFailure; // all pending queues are empty
959       }
960       try {
961         Thread.sleep(blockMoveWaitTime);
962       } catch (InterruptedException ignored) {
963       }
964     }
965   }
966 
967   /**
968    * Decide if the block is a good candidate to be moved from source to target.
969    * A block is a good candidate if
970    * 1. the block is not in the process of being moved/has not been moved;
971    * 2. the block does not have a replica on the target;
972    * 3. doing the move does not reduce the number of racks that the block has
973    */
isGoodBlockCandidate(StorageGroup source, StorageGroup target, StorageType targetStorageType, DBlock block)974   private boolean isGoodBlockCandidate(StorageGroup source, StorageGroup target,
975       StorageType targetStorageType, DBlock block) {
976     if (source.equals(target)) {
977       return false;
978     }
979     if (target.storageType != targetStorageType) {
980       return false;
981     }
982     // check if the block is moved or not
983     if (movedBlocks.contains(block.getBlock())) {
984       return false;
985     }
986     final DatanodeInfo targetDatanode = target.getDatanodeInfo();
987     if (source.getDatanodeInfo().equals(targetDatanode)) {
988       // the block is moved inside same DN
989       return true;
990     }
991 
992     // check if block has replica in target node
993     for (StorageGroup blockLocation : block.getLocations()) {
994       if (blockLocation.getDatanodeInfo().equals(targetDatanode)) {
995         return false;
996       }
997     }
998 
999     if (cluster.isNodeGroupAware()
1000         && isOnSameNodeGroupWithReplicas(source, target, block)) {
1001       return false;
1002     }
1003     if (reduceNumOfRacks(source, target, block)) {
1004       return false;
1005     }
1006     return true;
1007   }
1008 
1009   /**
1010    * Determine whether moving the given block replica from source to target
1011    * would reduce the number of racks of the block replicas.
1012    */
reduceNumOfRacks(StorageGroup source, StorageGroup target, DBlock block)1013   private boolean reduceNumOfRacks(StorageGroup source, StorageGroup target,
1014       DBlock block) {
1015     final DatanodeInfo sourceDn = source.getDatanodeInfo();
1016     if (cluster.isOnSameRack(sourceDn, target.getDatanodeInfo())) {
1017       // source and target are on the same rack
1018       return false;
1019     }
1020     boolean notOnSameRack = true;
1021     synchronized (block) {
1022       for (StorageGroup loc : block.getLocations()) {
1023         if (cluster.isOnSameRack(loc.getDatanodeInfo(), target.getDatanodeInfo())) {
1024           notOnSameRack = false;
1025           break;
1026         }
1027       }
1028     }
1029     if (notOnSameRack) {
1030       // target is not on the same rack as any replica
1031       return false;
1032     }
1033     for (StorageGroup g : block.getLocations()) {
1034       if (g != source && cluster.isOnSameRack(g.getDatanodeInfo(), sourceDn)) {
1035         // source is on the same rack of another replica
1036         return false;
1037       }
1038     }
1039     return true;
1040   }
1041 
1042   /**
1043    * Check if there are any replica (other than source) on the same node group
1044    * with target. If true, then target is not a good candidate for placing
1045    * specific replica as we don't want 2 replicas under the same nodegroup.
1046    *
1047    * @return true if there are any replica (other than source) on the same node
1048    *         group with target
1049    */
isOnSameNodeGroupWithReplicas(StorageGroup source, StorageGroup target, DBlock block)1050   private boolean isOnSameNodeGroupWithReplicas(StorageGroup source,
1051       StorageGroup target, DBlock block) {
1052     final DatanodeInfo targetDn = target.getDatanodeInfo();
1053     for (StorageGroup g : block.getLocations()) {
1054       if (g != source && cluster.isOnSameNodeGroup(g.getDatanodeInfo(), targetDn)) {
1055         return true;
1056       }
1057     }
1058     return false;
1059   }
1060 
1061   /** Reset all fields in order to prepare for the next iteration */
reset(Configuration conf)1062   void reset(Configuration conf) {
1063     cluster = NetworkTopology.getInstance(conf);
1064     storageGroupMap.clear();
1065     sources.clear();
1066     targets.clear();
1067     globalBlocks.removeAllButRetain(movedBlocks);
1068     movedBlocks.cleanup();
1069   }
1070 
1071   /** set the sleeping period for block move completion check */
1072   @VisibleForTesting
setBlockMoveWaitTime(long time)1073   public static void setBlockMoveWaitTime(long time) {
1074     blockMoveWaitTime = time;
1075   }
1076 
1077   @VisibleForTesting
setDelayAfterErrors(long time)1078   public static void setDelayAfterErrors(long time) {
1079     delayAfterErrors = time;
1080   }
1081 
1082   /** shutdown thread pools */
shutdownNow()1083   public void shutdownNow() {
1084     if (dispatchExecutor != null) {
1085       dispatchExecutor.shutdownNow();
1086     }
1087     moveExecutor.shutdownNow();
1088   }
1089 
1090   static class Util {
1091     /** @return true if data node is part of the excludedNodes. */
isExcluded(Set<String> excludedNodes, DatanodeInfo dn)1092     static boolean isExcluded(Set<String> excludedNodes, DatanodeInfo dn) {
1093       return isIn(excludedNodes, dn);
1094     }
1095 
1096     /**
1097      * @return true if includedNodes is empty or data node is part of the
1098      *         includedNodes.
1099      */
isIncluded(Set<String> includedNodes, DatanodeInfo dn)1100     static boolean isIncluded(Set<String> includedNodes, DatanodeInfo dn) {
1101       return (includedNodes.isEmpty() || isIn(includedNodes, dn));
1102     }
1103 
1104     /**
1105      * Match is checked using host name , ip address with and without port
1106      * number.
1107      *
1108      * @return true if the datanode's transfer address matches the set of nodes.
1109      */
isIn(Set<String> datanodes, DatanodeInfo dn)1110     private static boolean isIn(Set<String> datanodes, DatanodeInfo dn) {
1111       return isIn(datanodes, dn.getPeerHostName(), dn.getXferPort())
1112           || isIn(datanodes, dn.getIpAddr(), dn.getXferPort())
1113           || isIn(datanodes, dn.getHostName(), dn.getXferPort());
1114     }
1115 
1116     /** @return true if nodes contains host or host:port */
isIn(Set<String> nodes, String host, int port)1117     private static boolean isIn(Set<String> nodes, String host, int port) {
1118       if (host == null) {
1119         return false;
1120       }
1121       return (nodes.contains(host) || nodes.contains(host + ":" + port));
1122     }
1123 
1124     /**
1125      * Parse a comma separated string to obtain set of host names
1126      *
1127      * @return set of host names
1128      */
parseHostList(String string)1129     static Set<String> parseHostList(String string) {
1130       String[] addrs = StringUtils.getTrimmedStrings(string);
1131       return new HashSet<String>(Arrays.asList(addrs));
1132     }
1133 
1134     /**
1135      * Read set of host names from a file
1136      *
1137      * @return set of host names
1138      */
getHostListFromFile(String fileName, String type)1139     static Set<String> getHostListFromFile(String fileName, String type) {
1140       Set<String> nodes = new HashSet<String>();
1141       try {
1142         HostsFileReader.readFileToSet(type, fileName, nodes);
1143         return StringUtils.getTrimmedStrings(nodes);
1144       } catch (IOException e) {
1145         throw new IllegalArgumentException(
1146             "Failed to read host list from file: " + fileName);
1147       }
1148     }
1149   }
1150 }
1151