1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 package org.apache.hadoop.hdfs.server.datanode.fsdataset;
19 
20 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_BALANCED_SPACE_THRESHOLD_DEFAULT;
21 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_BALANCED_SPACE_THRESHOLD_KEY;
22 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_BALANCED_SPACE_PREFERENCE_FRACTION_DEFAULT;
23 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_BALANCED_SPACE_PREFERENCE_FRACTION_KEY;
24 
25 import java.io.IOException;
26 import java.util.ArrayList;
27 import java.util.List;
28 import java.util.Random;
29 
30 import org.apache.commons.logging.Log;
31 import org.apache.commons.logging.LogFactory;
32 import org.apache.hadoop.conf.Configurable;
33 import org.apache.hadoop.conf.Configuration;
34 import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
35 
36 /**
37  * A DN volume choosing policy which takes into account the amount of free
38  * space on each of the available volumes when considering where to assign a
39  * new replica allocation. By default this policy prefers assigning replicas to
40  * those volumes with more available free space, so as to over time balance the
41  * available space of all the volumes within a DN.
42  */
43 public class AvailableSpaceVolumeChoosingPolicy<V extends FsVolumeSpi>
44     implements VolumeChoosingPolicy<V>, Configurable {
45 
46   private static final Log LOG = LogFactory.getLog(AvailableSpaceVolumeChoosingPolicy.class);
47 
48   private final Random random;
49 
50   private long balancedSpaceThreshold = DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_BALANCED_SPACE_THRESHOLD_DEFAULT;
51   private float balancedPreferencePercent = DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_BALANCED_SPACE_PREFERENCE_FRACTION_DEFAULT;
52 
AvailableSpaceVolumeChoosingPolicy(Random random)53   AvailableSpaceVolumeChoosingPolicy(Random random) {
54     this.random = random;
55   }
56 
AvailableSpaceVolumeChoosingPolicy()57   public AvailableSpaceVolumeChoosingPolicy() {
58     this(new Random());
59   }
60 
61   @Override
setConf(Configuration conf)62   public synchronized void setConf(Configuration conf) {
63     balancedSpaceThreshold = conf.getLong(
64         DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_BALANCED_SPACE_THRESHOLD_KEY,
65         DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_BALANCED_SPACE_THRESHOLD_DEFAULT);
66     balancedPreferencePercent = conf.getFloat(
67         DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_BALANCED_SPACE_PREFERENCE_FRACTION_KEY,
68         DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_BALANCED_SPACE_PREFERENCE_FRACTION_DEFAULT);
69 
70     LOG.info("Available space volume choosing policy initialized: " +
71         DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_BALANCED_SPACE_THRESHOLD_KEY +
72         " = " + balancedSpaceThreshold + ", " +
73         DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_BALANCED_SPACE_PREFERENCE_FRACTION_KEY +
74         " = " + balancedPreferencePercent);
75 
76     if (balancedPreferencePercent > 1.0) {
77       LOG.warn("The value of " + DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_BALANCED_SPACE_PREFERENCE_FRACTION_KEY +
78                " is greater than 1.0 but should be in the range 0.0 - 1.0");
79     }
80 
81     if (balancedPreferencePercent < 0.5) {
82       LOG.warn("The value of " + DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_BALANCED_SPACE_PREFERENCE_FRACTION_KEY +
83                " is less than 0.5 so volumes with less available disk space will receive more block allocations");
84     }
85   }
86 
87   @Override
getConf()88   public synchronized Configuration getConf() {
89     // Nothing to do. Only added to fulfill the Configurable contract.
90     return null;
91   }
92 
93   private final VolumeChoosingPolicy<V> roundRobinPolicyBalanced =
94       new RoundRobinVolumeChoosingPolicy<V>();
95   private final VolumeChoosingPolicy<V> roundRobinPolicyHighAvailable =
96       new RoundRobinVolumeChoosingPolicy<V>();
97   private final VolumeChoosingPolicy<V> roundRobinPolicyLowAvailable =
98       new RoundRobinVolumeChoosingPolicy<V>();
99 
100   @Override
chooseVolume(List<V> volumes, long replicaSize)101   public synchronized V chooseVolume(List<V> volumes,
102       long replicaSize) throws IOException {
103     if (volumes.size() < 1) {
104       throw new DiskOutOfSpaceException("No more available volumes");
105     }
106 
107     AvailableSpaceVolumeList volumesWithSpaces =
108         new AvailableSpaceVolumeList(volumes);
109 
110     if (volumesWithSpaces.areAllVolumesWithinFreeSpaceThreshold()) {
111       // If they're actually not too far out of whack, fall back on pure round
112       // robin.
113       V volume = roundRobinPolicyBalanced.chooseVolume(volumes, replicaSize);
114       if (LOG.isDebugEnabled()) {
115         LOG.debug("All volumes are within the configured free space balance " +
116             "threshold. Selecting " + volume + " for write of block size " +
117             replicaSize);
118       }
119       return volume;
120     } else {
121       V volume = null;
122       // If none of the volumes with low free space have enough space for the
123       // replica, always try to choose a volume with a lot of free space.
124       long mostAvailableAmongLowVolumes = volumesWithSpaces
125           .getMostAvailableSpaceAmongVolumesWithLowAvailableSpace();
126 
127       List<V> highAvailableVolumes = extractVolumesFromPairs(
128           volumesWithSpaces.getVolumesWithHighAvailableSpace());
129       List<V> lowAvailableVolumes = extractVolumesFromPairs(
130           volumesWithSpaces.getVolumesWithLowAvailableSpace());
131 
132       float preferencePercentScaler =
133           (highAvailableVolumes.size() * balancedPreferencePercent) +
134           (lowAvailableVolumes.size() * (1 - balancedPreferencePercent));
135       float scaledPreferencePercent =
136           (highAvailableVolumes.size() * balancedPreferencePercent) /
137           preferencePercentScaler;
138       if (mostAvailableAmongLowVolumes < replicaSize ||
139           random.nextFloat() < scaledPreferencePercent) {
140         volume = roundRobinPolicyHighAvailable.chooseVolume(
141             highAvailableVolumes, replicaSize);
142         if (LOG.isDebugEnabled()) {
143           LOG.debug("Volumes are imbalanced. Selecting " + volume +
144               " from high available space volumes for write of block size "
145               + replicaSize);
146         }
147       } else {
148         volume = roundRobinPolicyLowAvailable.chooseVolume(
149             lowAvailableVolumes, replicaSize);
150         if (LOG.isDebugEnabled()) {
151           LOG.debug("Volumes are imbalanced. Selecting " + volume +
152               " from low available space volumes for write of block size "
153               + replicaSize);
154         }
155       }
156       return volume;
157     }
158   }
159 
160   /**
161    * Used to keep track of the list of volumes we're choosing from.
162    */
163   private class AvailableSpaceVolumeList {
164     private final List<AvailableSpaceVolumePair> volumes;
165 
AvailableSpaceVolumeList(List<V> volumes)166     public AvailableSpaceVolumeList(List<V> volumes) throws IOException {
167       this.volumes = new ArrayList<AvailableSpaceVolumePair>();
168       for (V volume : volumes) {
169         this.volumes.add(new AvailableSpaceVolumePair(volume));
170       }
171     }
172 
173     /**
174      * @return true if all volumes' free space is within the
175      *         configured threshold, false otherwise.
176      */
areAllVolumesWithinFreeSpaceThreshold()177     public boolean areAllVolumesWithinFreeSpaceThreshold() {
178       long leastAvailable = Long.MAX_VALUE;
179       long mostAvailable = 0;
180       for (AvailableSpaceVolumePair volume : volumes) {
181         leastAvailable = Math.min(leastAvailable, volume.getAvailable());
182         mostAvailable = Math.max(mostAvailable, volume.getAvailable());
183       }
184       return (mostAvailable - leastAvailable) < balancedSpaceThreshold;
185     }
186 
187     /**
188      * @return the minimum amount of space available on a single volume,
189      *         across all volumes.
190      */
getLeastAvailableSpace()191     private long getLeastAvailableSpace() {
192       long leastAvailable = Long.MAX_VALUE;
193       for (AvailableSpaceVolumePair volume : volumes) {
194         leastAvailable = Math.min(leastAvailable, volume.getAvailable());
195       }
196       return leastAvailable;
197     }
198 
199     /**
200      * @return the maximum amount of space available across volumes with low space.
201      */
getMostAvailableSpaceAmongVolumesWithLowAvailableSpace()202     public long getMostAvailableSpaceAmongVolumesWithLowAvailableSpace() {
203       long mostAvailable = Long.MIN_VALUE;
204       for (AvailableSpaceVolumePair volume : getVolumesWithLowAvailableSpace()) {
205         mostAvailable = Math.max(mostAvailable, volume.getAvailable());
206       }
207       return mostAvailable;
208     }
209 
210     /**
211      * @return the list of volumes with relatively low available space.
212      */
getVolumesWithLowAvailableSpace()213     public List<AvailableSpaceVolumePair> getVolumesWithLowAvailableSpace() {
214       long leastAvailable = getLeastAvailableSpace();
215       List<AvailableSpaceVolumePair> ret = new ArrayList<AvailableSpaceVolumePair>();
216       for (AvailableSpaceVolumePair volume : volumes) {
217         if (volume.getAvailable() <= leastAvailable + balancedSpaceThreshold) {
218           ret.add(volume);
219         }
220       }
221       return ret;
222     }
223 
224     /**
225      * @return the list of volumes with a lot of available space.
226      */
getVolumesWithHighAvailableSpace()227     public List<AvailableSpaceVolumePair> getVolumesWithHighAvailableSpace() {
228       long leastAvailable = getLeastAvailableSpace();
229       List<AvailableSpaceVolumePair> ret = new ArrayList<AvailableSpaceVolumePair>();
230       for (AvailableSpaceVolumePair volume : volumes) {
231         if (volume.getAvailable() > leastAvailable + balancedSpaceThreshold) {
232           ret.add(volume);
233         }
234       }
235       return ret;
236     }
237 
238   }
239 
240   /**
241    * Used so that we only check the available space on a given volume once, at
242    * the beginning of {@link AvailableSpaceVolumeChoosingPolicy#chooseVolume(List, long)}.
243    */
244   private class AvailableSpaceVolumePair {
245     private final V volume;
246     private final long availableSpace;
247 
AvailableSpaceVolumePair(V volume)248     public AvailableSpaceVolumePair(V volume) throws IOException {
249       this.volume = volume;
250       this.availableSpace = volume.getAvailable();
251     }
252 
getAvailable()253     public long getAvailable() {
254       return availableSpace;
255     }
256 
getVolume()257     public V getVolume() {
258       return volume;
259     }
260   }
261 
extractVolumesFromPairs(List<AvailableSpaceVolumePair> volumes)262   private List<V> extractVolumesFromPairs(List<AvailableSpaceVolumePair> volumes) {
263     List<V> ret = new ArrayList<V>();
264     for (AvailableSpaceVolumePair volume : volumes) {
265       ret.add(volume.getVolume());
266     }
267     return ret;
268   }
269 
270 }
271