1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 package org.apache.hadoop.hdfs; 19 20 import static org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite.ID_UNSPECIFIED; 21 22 import java.io.File; 23 import java.io.FileNotFoundException; 24 import java.io.IOException; 25 import java.util.*; 26 27 import com.google.common.collect.Lists; 28 import org.apache.hadoop.conf.Configuration; 29 import org.apache.hadoop.fs.FSDataOutputStream; 30 import org.apache.hadoop.fs.FileSystem; 31 import org.apache.hadoop.fs.Path; 32 import org.apache.hadoop.fs.StorageType; 33 import org.apache.hadoop.hdfs.protocol.*; 34 import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; 35 import org.apache.hadoop.hdfs.server.blockmanagement.*; 36 import org.apache.hadoop.hdfs.server.datanode.DataNode; 37 import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; 38 import org.apache.hadoop.hdfs.server.namenode.NameNode; 39 import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper; 40 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; 41 import org.apache.hadoop.io.IOUtils; 42 import org.apache.hadoop.ipc.RemoteException; 43 import org.apache.hadoop.net.NetworkTopology; 44 import org.apache.hadoop.net.Node; 45 import org.apache.hadoop.test.GenericTestUtils; 46 import org.apache.hadoop.test.PathUtils; 47 import org.junit.Assert; 48 import org.junit.Test; 49 50 /** Test {@link BlockStoragePolicy} */ 51 public class TestBlockStoragePolicy { 52 public static final BlockStoragePolicySuite POLICY_SUITE; 53 public static final BlockStoragePolicy DEFAULT_STORAGE_POLICY; 54 public static final Configuration conf; 55 56 static { 57 conf = new HdfsConfiguration(); conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1)58 conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1); conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1)59 conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1); 60 POLICY_SUITE = BlockStoragePolicySuite.createDefaultSuite(); 61 DEFAULT_STORAGE_POLICY = POLICY_SUITE.getDefaultPolicy(); 62 } 63 64 static final EnumSet<StorageType> none = EnumSet.noneOf(StorageType.class); 65 static final EnumSet<StorageType> archive = EnumSet.of(StorageType.ARCHIVE); 66 static final EnumSet<StorageType> disk = EnumSet.of(StorageType.DISK); 67 static final EnumSet<StorageType> both = EnumSet.of(StorageType.DISK, StorageType.ARCHIVE); 68 69 static final long FILE_LEN = 1024; 70 static final short REPLICATION = 3; 71 72 static final byte COLD = HdfsConstants.COLD_STORAGE_POLICY_ID; 73 static final byte WARM = HdfsConstants.WARM_STORAGE_POLICY_ID; 74 static final byte HOT = HdfsConstants.HOT_STORAGE_POLICY_ID; 75 static final byte ONESSD = HdfsConstants.ONESSD_STORAGE_POLICY_ID; 76 static final byte ALLSSD = HdfsConstants.ALLSSD_STORAGE_POLICY_ID; 77 static final byte LAZY_PERSIST = HdfsConstants.MEMORY_STORAGE_POLICY_ID; 78 79 @Test (timeout=300000) testConfigKeyEnabled()80 public void testConfigKeyEnabled() throws IOException { 81 Configuration conf = new HdfsConfiguration(); 82 conf.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY, true); 83 MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) 84 .numDataNodes(1).build(); 85 try { 86 cluster.waitActive(); 87 cluster.getFileSystem().setStoragePolicy(new Path("/"), 88 HdfsConstants.COLD_STORAGE_POLICY_NAME); 89 } finally { 90 cluster.shutdown(); 91 } 92 } 93 94 /** 95 * Ensure that setStoragePolicy throws IOException when 96 * dfs.storage.policy.enabled is set to false. 97 * @throws IOException 98 */ 99 @Test (timeout=300000, expected=IOException.class) testConfigKeyDisabled()100 public void testConfigKeyDisabled() throws IOException { 101 Configuration conf = new HdfsConfiguration(); 102 conf.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY, false); 103 MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) 104 .numDataNodes(1).build(); 105 try { 106 cluster.waitActive(); 107 cluster.getFileSystem().setStoragePolicy(new Path("/"), 108 HdfsConstants.COLD_STORAGE_POLICY_NAME); 109 } finally { 110 cluster.shutdown(); 111 } 112 } 113 114 @Test testDefaultPolicies()115 public void testDefaultPolicies() { 116 final Map<Byte, String> expectedPolicyStrings = new HashMap<Byte, String>(); 117 expectedPolicyStrings.put(COLD, 118 "BlockStoragePolicy{COLD:" + COLD + ", storageTypes=[ARCHIVE], " + 119 "creationFallbacks=[], replicationFallbacks=[]}"); 120 expectedPolicyStrings.put(WARM, 121 "BlockStoragePolicy{WARM:" + WARM + ", storageTypes=[DISK, ARCHIVE], " + 122 "creationFallbacks=[DISK, ARCHIVE], " + 123 "replicationFallbacks=[DISK, ARCHIVE]}"); 124 expectedPolicyStrings.put(HOT, 125 "BlockStoragePolicy{HOT:" + HOT + ", storageTypes=[DISK], " + 126 "creationFallbacks=[], replicationFallbacks=[ARCHIVE]}"); 127 expectedPolicyStrings.put(ONESSD, "BlockStoragePolicy{ONE_SSD:" + ONESSD + 128 ", storageTypes=[SSD, DISK], creationFallbacks=[SSD, DISK], " + 129 "replicationFallbacks=[SSD, DISK]}"); 130 expectedPolicyStrings.put(ALLSSD, "BlockStoragePolicy{ALL_SSD:" + ALLSSD + 131 ", storageTypes=[SSD], creationFallbacks=[DISK], " + 132 "replicationFallbacks=[DISK]}"); 133 expectedPolicyStrings.put(LAZY_PERSIST, 134 "BlockStoragePolicy{LAZY_PERSIST:" + LAZY_PERSIST + ", storageTypes=[RAM_DISK, DISK], " + 135 "creationFallbacks=[DISK], replicationFallbacks=[DISK]}"); 136 137 for(byte i = 1; i < 16; i++) { 138 final BlockStoragePolicy policy = POLICY_SUITE.getPolicy(i); 139 if (policy != null) { 140 final String s = policy.toString(); 141 Assert.assertEquals(expectedPolicyStrings.get(i), s); 142 } 143 } 144 Assert.assertEquals(POLICY_SUITE.getPolicy(HOT), POLICY_SUITE.getDefaultPolicy()); 145 146 { // check Cold policy 147 final BlockStoragePolicy cold = POLICY_SUITE.getPolicy(COLD); 148 for(short replication = 1; replication < 6; replication++) { 149 final List<StorageType> computed = cold.chooseStorageTypes(replication); 150 assertStorageType(computed, replication, StorageType.ARCHIVE); 151 } 152 assertCreationFallback(cold, null, null, null); 153 assertReplicationFallback(cold, null, null, null); 154 } 155 156 { // check Warm policy 157 final BlockStoragePolicy warm = POLICY_SUITE.getPolicy(WARM); 158 for(short replication = 1; replication < 6; replication++) { 159 final List<StorageType> computed = warm.chooseStorageTypes(replication); 160 assertStorageType(computed, replication, StorageType.DISK, StorageType.ARCHIVE); 161 } 162 assertCreationFallback(warm, StorageType.DISK, StorageType.DISK, StorageType.ARCHIVE); 163 assertReplicationFallback(warm, StorageType.DISK, StorageType.DISK, StorageType.ARCHIVE); 164 } 165 166 { // check Hot policy 167 final BlockStoragePolicy hot = POLICY_SUITE.getPolicy(HOT); 168 for(short replication = 1; replication < 6; replication++) { 169 final List<StorageType> computed = hot.chooseStorageTypes(replication); 170 assertStorageType(computed, replication, StorageType.DISK); 171 } 172 assertCreationFallback(hot, null, null, null); 173 assertReplicationFallback(hot, StorageType.ARCHIVE, null, StorageType.ARCHIVE); 174 } 175 } 176 newStorageTypes(int nDisk, int nArchive)177 static StorageType[] newStorageTypes(int nDisk, int nArchive) { 178 final StorageType[] t = new StorageType[nDisk + nArchive]; 179 Arrays.fill(t, 0, nDisk, StorageType.DISK); 180 Arrays.fill(t, nDisk, t.length, StorageType.ARCHIVE); 181 return t; 182 } 183 asList(int nDisk, int nArchive)184 static List<StorageType> asList(int nDisk, int nArchive) { 185 return Arrays.asList(newStorageTypes(nDisk, nArchive)); 186 } 187 assertStorageType(List<StorageType> computed, short replication, StorageType... answers)188 static void assertStorageType(List<StorageType> computed, short replication, 189 StorageType... answers) { 190 Assert.assertEquals(replication, computed.size()); 191 final StorageType last = answers[answers.length - 1]; 192 for(int i = 0; i < computed.size(); i++) { 193 final StorageType expected = i < answers.length? answers[i]: last; 194 Assert.assertEquals(expected, computed.get(i)); 195 } 196 } 197 198 static void assertCreationFallback(BlockStoragePolicy policy, StorageType noneExpected, 199 StorageType archiveExpected, StorageType diskExpected) { 200 Assert.assertEquals(noneExpected, policy.getCreationFallback(none)); 201 Assert.assertEquals(archiveExpected, policy.getCreationFallback(archive)); 202 Assert.assertEquals(diskExpected, policy.getCreationFallback(disk)); 203 Assert.assertEquals(null, policy.getCreationFallback(both)); 204 } 205 206 static void assertReplicationFallback(BlockStoragePolicy policy, StorageType noneExpected, 207 StorageType archiveExpected, StorageType diskExpected) { 208 Assert.assertEquals(noneExpected, policy.getReplicationFallback(none)); 209 Assert.assertEquals(archiveExpected, policy.getReplicationFallback(archive)); 210 Assert.assertEquals(diskExpected, policy.getReplicationFallback(disk)); 211 Assert.assertEquals(null, policy.getReplicationFallback(both)); 212 } 213 214 private static interface CheckChooseStorageTypes { 215 public void checkChooseStorageTypes(BlockStoragePolicy p, short replication, 216 List<StorageType> chosen, StorageType... expected); 217 218 /** Basic case: pass only replication and chosen */ 219 static final CheckChooseStorageTypes Basic = new CheckChooseStorageTypes() { 220 @Override 221 public void checkChooseStorageTypes(BlockStoragePolicy p, short replication, 222 List<StorageType> chosen, StorageType... expected) { 223 final List<StorageType> types = p.chooseStorageTypes(replication, chosen); 224 assertStorageTypes(types, expected); 225 } 226 }; 227 228 /** With empty unavailables and isNewBlock=true */ 229 static final CheckChooseStorageTypes EmptyUnavailablesAndNewBlock 230 = new CheckChooseStorageTypes() { 231 @Override 232 public void checkChooseStorageTypes(BlockStoragePolicy p, 233 short replication, List<StorageType> chosen, StorageType... expected) { 234 final List<StorageType> types = p.chooseStorageTypes(replication, 235 chosen, none, true); 236 assertStorageTypes(types, expected); 237 } 238 }; 239 240 /** With empty unavailables and isNewBlock=false */ 241 static final CheckChooseStorageTypes EmptyUnavailablesAndNonNewBlock 242 = new CheckChooseStorageTypes() { 243 @Override 244 public void checkChooseStorageTypes(BlockStoragePolicy p, 245 short replication, List<StorageType> chosen, StorageType... expected) { 246 final List<StorageType> types = p.chooseStorageTypes(replication, 247 chosen, none, false); 248 assertStorageTypes(types, expected); 249 } 250 }; 251 252 /** With both DISK and ARCHIVE unavailables and isNewBlock=true */ 253 static final CheckChooseStorageTypes BothUnavailableAndNewBlock 254 = new CheckChooseStorageTypes() { 255 @Override 256 public void checkChooseStorageTypes(BlockStoragePolicy p, 257 short replication, List<StorageType> chosen, StorageType... expected) { 258 final List<StorageType> types = p.chooseStorageTypes(replication, 259 chosen, both, true); 260 assertStorageTypes(types, expected); 261 } 262 }; 263 264 /** With both DISK and ARCHIVE unavailable and isNewBlock=false */ 265 static final CheckChooseStorageTypes BothUnavailableAndNonNewBlock 266 = new CheckChooseStorageTypes() { 267 @Override 268 public void checkChooseStorageTypes(BlockStoragePolicy p, 269 short replication, List<StorageType> chosen, StorageType... expected) { 270 final List<StorageType> types = p.chooseStorageTypes(replication, 271 chosen, both, false); 272 assertStorageTypes(types, expected); 273 } 274 }; 275 276 /** With ARCHIVE unavailable and isNewBlock=true */ 277 static final CheckChooseStorageTypes ArchivalUnavailableAndNewBlock 278 = new CheckChooseStorageTypes() { 279 @Override 280 public void checkChooseStorageTypes(BlockStoragePolicy p, 281 short replication, List<StorageType> chosen, StorageType... expected) { 282 final List<StorageType> types = p.chooseStorageTypes(replication, 283 chosen, archive, true); 284 assertStorageTypes(types, expected); 285 } 286 }; 287 288 /** With ARCHIVE unavailable and isNewBlock=true */ 289 static final CheckChooseStorageTypes ArchivalUnavailableAndNonNewBlock 290 = new CheckChooseStorageTypes() { 291 @Override 292 public void checkChooseStorageTypes(BlockStoragePolicy p, 293 short replication, List<StorageType> chosen, StorageType... expected) { 294 final List<StorageType> types = p.chooseStorageTypes(replication, 295 chosen, archive, false); 296 assertStorageTypes(types, expected); 297 } 298 }; 299 } 300 301 @Test 302 public void testChooseStorageTypes() { 303 run(CheckChooseStorageTypes.Basic); 304 run(CheckChooseStorageTypes.EmptyUnavailablesAndNewBlock); 305 run(CheckChooseStorageTypes.EmptyUnavailablesAndNonNewBlock); 306 } 307 308 private static void run(CheckChooseStorageTypes method) { 309 final BlockStoragePolicy hot = POLICY_SUITE.getPolicy(HOT); 310 final BlockStoragePolicy warm = POLICY_SUITE.getPolicy(WARM); 311 final BlockStoragePolicy cold = POLICY_SUITE.getPolicy(COLD); 312 313 final short replication = 3; 314 { 315 final List<StorageType> chosen = Lists.newArrayList(); 316 method.checkChooseStorageTypes(hot, replication, chosen, 317 StorageType.DISK, StorageType.DISK, StorageType.DISK); 318 method.checkChooseStorageTypes(warm, replication, chosen, 319 StorageType.DISK, StorageType.ARCHIVE, StorageType.ARCHIVE); 320 method.checkChooseStorageTypes(cold, replication, chosen, 321 StorageType.ARCHIVE, StorageType.ARCHIVE, StorageType.ARCHIVE); 322 } 323 324 { 325 final List<StorageType> chosen = Arrays.asList(StorageType.DISK); 326 method.checkChooseStorageTypes(hot, replication, chosen, 327 StorageType.DISK, StorageType.DISK); 328 method.checkChooseStorageTypes(warm, replication, chosen, 329 StorageType.ARCHIVE, StorageType.ARCHIVE); 330 method.checkChooseStorageTypes(cold, replication, chosen, 331 StorageType.ARCHIVE, StorageType.ARCHIVE, StorageType.ARCHIVE); 332 } 333 334 { 335 final List<StorageType> chosen = Arrays.asList(StorageType.ARCHIVE); 336 method.checkChooseStorageTypes(hot, replication, chosen, 337 StorageType.DISK, StorageType.DISK, StorageType.DISK); 338 method.checkChooseStorageTypes(warm, replication, chosen, 339 StorageType.DISK, StorageType.ARCHIVE); 340 method.checkChooseStorageTypes(cold, replication, chosen, 341 StorageType.ARCHIVE, StorageType.ARCHIVE); 342 } 343 344 { 345 final List<StorageType> chosen = Arrays.asList( 346 StorageType.DISK, StorageType.DISK); 347 method.checkChooseStorageTypes(hot, replication, chosen, 348 StorageType.DISK); 349 method.checkChooseStorageTypes(warm, replication, chosen, 350 StorageType.ARCHIVE, StorageType.ARCHIVE); 351 method.checkChooseStorageTypes(cold, replication, chosen, 352 StorageType.ARCHIVE, StorageType.ARCHIVE, StorageType.ARCHIVE); 353 } 354 355 { 356 final List<StorageType> chosen = Arrays.asList( 357 StorageType.DISK, StorageType.ARCHIVE); 358 method.checkChooseStorageTypes(hot, replication, chosen, 359 StorageType.DISK, StorageType.DISK); 360 method.checkChooseStorageTypes(warm, replication, chosen, 361 StorageType.ARCHIVE); 362 method.checkChooseStorageTypes(cold, replication, chosen, 363 StorageType.ARCHIVE, StorageType.ARCHIVE); 364 } 365 366 { 367 final List<StorageType> chosen = Arrays.asList( 368 StorageType.ARCHIVE, StorageType.ARCHIVE); 369 method.checkChooseStorageTypes(hot, replication, chosen, 370 StorageType.DISK, StorageType.DISK, StorageType.DISK); 371 method.checkChooseStorageTypes(warm, replication, chosen, 372 StorageType.DISK); 373 method.checkChooseStorageTypes(cold, replication, chosen, 374 StorageType.ARCHIVE); 375 } 376 377 { 378 final List<StorageType> chosen = Arrays.asList( 379 StorageType.DISK, StorageType.DISK, StorageType.DISK); 380 method.checkChooseStorageTypes(hot, replication, chosen); 381 method.checkChooseStorageTypes(warm, replication, chosen, 382 StorageType.ARCHIVE, StorageType.ARCHIVE); 383 method.checkChooseStorageTypes(cold, replication, chosen, 384 StorageType.ARCHIVE, StorageType.ARCHIVE, StorageType.ARCHIVE); 385 } 386 387 { 388 final List<StorageType> chosen = Arrays.asList( 389 StorageType.DISK, StorageType.DISK, StorageType.ARCHIVE); 390 method.checkChooseStorageTypes(hot, replication, chosen, 391 StorageType.DISK); 392 method.checkChooseStorageTypes(warm, replication, chosen, 393 StorageType.ARCHIVE); 394 method.checkChooseStorageTypes(cold, replication, chosen, 395 StorageType.ARCHIVE, StorageType.ARCHIVE); 396 } 397 398 { 399 final List<StorageType> chosen = Arrays.asList( 400 StorageType.DISK, StorageType.ARCHIVE, StorageType.ARCHIVE); 401 method.checkChooseStorageTypes(hot, replication, chosen, 402 StorageType.DISK, StorageType.DISK); 403 method.checkChooseStorageTypes(warm, replication, chosen); 404 method.checkChooseStorageTypes(cold, replication, chosen, 405 StorageType.ARCHIVE); 406 } 407 408 { 409 final List<StorageType> chosen = Arrays.asList( 410 StorageType.ARCHIVE, StorageType.ARCHIVE, StorageType.ARCHIVE); 411 method.checkChooseStorageTypes(hot, replication, chosen, 412 StorageType.DISK, StorageType.DISK, StorageType.DISK); 413 method.checkChooseStorageTypes(warm, replication, chosen, 414 StorageType.DISK); 415 method.checkChooseStorageTypes(cold, replication, chosen); 416 } 417 } 418 419 @Test 420 public void testChooseStorageTypesWithBothUnavailable() { 421 runWithBothUnavailable(CheckChooseStorageTypes.BothUnavailableAndNewBlock); 422 runWithBothUnavailable(CheckChooseStorageTypes.BothUnavailableAndNonNewBlock); 423 } 424 425 private static void runWithBothUnavailable(CheckChooseStorageTypes method) { 426 final BlockStoragePolicy hot = POLICY_SUITE.getPolicy(HOT); 427 final BlockStoragePolicy warm = POLICY_SUITE.getPolicy(WARM); 428 final BlockStoragePolicy cold = POLICY_SUITE.getPolicy(COLD); 429 430 final short replication = 3; 431 for(int n = 0; n <= 3; n++) { 432 for(int d = 0; d <= n; d++) { 433 final int a = n - d; 434 final List<StorageType> chosen = asList(d, a); 435 method.checkChooseStorageTypes(hot, replication, chosen); 436 method.checkChooseStorageTypes(warm, replication, chosen); 437 method.checkChooseStorageTypes(cold, replication, chosen); 438 } 439 } 440 } 441 442 @Test 443 public void testChooseStorageTypesWithDiskUnavailableAndNewBlock() { 444 final BlockStoragePolicy hot = POLICY_SUITE.getPolicy(HOT); 445 final BlockStoragePolicy warm = POLICY_SUITE.getPolicy(WARM); 446 final BlockStoragePolicy cold = POLICY_SUITE.getPolicy(COLD); 447 448 final short replication = 3; 449 final EnumSet<StorageType> unavailables = disk; 450 final boolean isNewBlock = true; 451 { 452 final List<StorageType> chosen = Lists.newArrayList(); 453 checkChooseStorageTypes(hot, replication, chosen, unavailables, isNewBlock); 454 checkChooseStorageTypes(warm, replication, chosen, unavailables, isNewBlock, 455 StorageType.ARCHIVE, StorageType.ARCHIVE, StorageType.ARCHIVE); 456 checkChooseStorageTypes(cold, replication, chosen, unavailables, isNewBlock, 457 StorageType.ARCHIVE, StorageType.ARCHIVE, StorageType.ARCHIVE); 458 } 459 460 { 461 final List<StorageType> chosen = Arrays.asList(StorageType.DISK); 462 checkChooseStorageTypes(hot, replication, chosen, unavailables, isNewBlock); 463 checkChooseStorageTypes(warm, replication, chosen, unavailables, isNewBlock, 464 StorageType.ARCHIVE, StorageType.ARCHIVE); 465 checkChooseStorageTypes(cold, replication, chosen, unavailables, isNewBlock, 466 StorageType.ARCHIVE, StorageType.ARCHIVE, StorageType.ARCHIVE); 467 } 468 469 { 470 final List<StorageType> chosen = Arrays.asList(StorageType.ARCHIVE); 471 checkChooseStorageTypes(hot, replication, chosen, unavailables, isNewBlock); 472 checkChooseStorageTypes(warm, replication, chosen, unavailables, isNewBlock, 473 StorageType.ARCHIVE, StorageType.ARCHIVE); 474 checkChooseStorageTypes(cold, replication, chosen, unavailables, isNewBlock, 475 StorageType.ARCHIVE, StorageType.ARCHIVE); 476 } 477 478 { 479 final List<StorageType> chosen = Arrays.asList( 480 StorageType.DISK, StorageType.DISK); 481 checkChooseStorageTypes(hot, replication, chosen, unavailables, isNewBlock); 482 checkChooseStorageTypes(warm, replication, chosen, unavailables, isNewBlock, 483 StorageType.ARCHIVE, StorageType.ARCHIVE); 484 checkChooseStorageTypes(cold, replication, chosen, unavailables, isNewBlock, 485 StorageType.ARCHIVE, StorageType.ARCHIVE, StorageType.ARCHIVE); 486 } 487 488 { 489 final List<StorageType> chosen = Arrays.asList( 490 StorageType.DISK, StorageType.ARCHIVE); 491 checkChooseStorageTypes(hot, replication, chosen, unavailables, isNewBlock); 492 checkChooseStorageTypes(warm, replication, chosen, unavailables, isNewBlock, 493 StorageType.ARCHIVE); 494 checkChooseStorageTypes(cold, replication, chosen, unavailables, isNewBlock, 495 StorageType.ARCHIVE, StorageType.ARCHIVE); 496 } 497 498 { 499 final List<StorageType> chosen = Arrays.asList( 500 StorageType.ARCHIVE, StorageType.ARCHIVE); 501 checkChooseStorageTypes(hot, replication, chosen, unavailables, isNewBlock); 502 checkChooseStorageTypes(warm, replication, chosen, unavailables, isNewBlock, 503 StorageType.ARCHIVE); 504 checkChooseStorageTypes(cold, replication, chosen, unavailables, isNewBlock, 505 StorageType.ARCHIVE); 506 } 507 508 { 509 final List<StorageType> chosen = Arrays.asList( 510 StorageType.DISK, StorageType.DISK, StorageType.DISK); 511 checkChooseStorageTypes(hot, replication, chosen, unavailables, isNewBlock); 512 checkChooseStorageTypes(warm, replication, chosen, unavailables, isNewBlock, 513 StorageType.ARCHIVE, StorageType.ARCHIVE); 514 checkChooseStorageTypes(cold, replication, chosen, unavailables, isNewBlock, 515 StorageType.ARCHIVE, StorageType.ARCHIVE, StorageType.ARCHIVE); 516 } 517 518 { 519 final List<StorageType> chosen = Arrays.asList( 520 StorageType.DISK, StorageType.DISK, StorageType.ARCHIVE); 521 checkChooseStorageTypes(hot, replication, chosen, unavailables, isNewBlock); 522 checkChooseStorageTypes(warm, replication, chosen, unavailables, isNewBlock, 523 StorageType.ARCHIVE); 524 checkChooseStorageTypes(cold, replication, chosen, unavailables, isNewBlock, 525 StorageType.ARCHIVE, StorageType.ARCHIVE); 526 } 527 528 { 529 final List<StorageType> chosen = Arrays.asList( 530 StorageType.DISK, StorageType.ARCHIVE, StorageType.ARCHIVE); 531 checkChooseStorageTypes(hot, replication, chosen, unavailables, isNewBlock); 532 checkChooseStorageTypes(warm, replication, chosen, unavailables, isNewBlock); 533 checkChooseStorageTypes(cold, replication, chosen, unavailables, isNewBlock, 534 StorageType.ARCHIVE); 535 } 536 537 { 538 final List<StorageType> chosen = Arrays.asList( 539 StorageType.ARCHIVE, StorageType.ARCHIVE, StorageType.ARCHIVE); 540 checkChooseStorageTypes(hot, replication, chosen, unavailables, isNewBlock); 541 checkChooseStorageTypes(warm, replication, chosen, unavailables, isNewBlock); 542 checkChooseStorageTypes(cold, replication, chosen, unavailables, isNewBlock); 543 } 544 } 545 546 @Test 547 public void testChooseStorageTypesWithArchiveUnavailable() { 548 runWithArchiveUnavailable(CheckChooseStorageTypes.ArchivalUnavailableAndNewBlock); 549 runWithArchiveUnavailable(CheckChooseStorageTypes.ArchivalUnavailableAndNonNewBlock); 550 } 551 552 private static void runWithArchiveUnavailable(CheckChooseStorageTypes method) { 553 final BlockStoragePolicy hot = POLICY_SUITE.getPolicy(HOT); 554 final BlockStoragePolicy warm = POLICY_SUITE.getPolicy(WARM); 555 final BlockStoragePolicy cold = POLICY_SUITE.getPolicy(COLD); 556 557 final short replication = 3; 558 { 559 final List<StorageType> chosen = Lists.newArrayList(); 560 method.checkChooseStorageTypes(hot, replication, chosen, 561 StorageType.DISK, StorageType.DISK, StorageType.DISK); 562 method.checkChooseStorageTypes(warm, replication, chosen, 563 StorageType.DISK, StorageType.DISK, StorageType.DISK); 564 method.checkChooseStorageTypes(cold, replication, chosen); 565 } 566 567 { 568 final List<StorageType> chosen = Arrays.asList(StorageType.DISK); 569 method.checkChooseStorageTypes(hot, replication, chosen, 570 StorageType.DISK, StorageType.DISK); 571 method.checkChooseStorageTypes(warm, replication, chosen, 572 StorageType.DISK, StorageType.DISK); 573 method.checkChooseStorageTypes(cold, replication, chosen); 574 } 575 576 { 577 final List<StorageType> chosen = Arrays.asList(StorageType.ARCHIVE); 578 method.checkChooseStorageTypes(hot, replication, chosen, 579 StorageType.DISK, StorageType.DISK, StorageType.DISK); 580 method.checkChooseStorageTypes(warm, replication, chosen, 581 StorageType.DISK, StorageType.DISK); 582 method.checkChooseStorageTypes(cold, replication, chosen); 583 } 584 585 { 586 final List<StorageType> chosen = Arrays.asList( 587 StorageType.DISK, StorageType.DISK); 588 method.checkChooseStorageTypes(hot, replication, chosen, 589 StorageType.DISK); 590 method.checkChooseStorageTypes(warm, replication, chosen, 591 StorageType.DISK); 592 method.checkChooseStorageTypes(cold, replication, chosen); 593 } 594 595 { 596 final List<StorageType> chosen = Arrays.asList( 597 StorageType.DISK, StorageType.ARCHIVE); 598 method.checkChooseStorageTypes(hot, replication, chosen, 599 StorageType.DISK, StorageType.DISK); 600 method.checkChooseStorageTypes(warm, replication, chosen, 601 StorageType.DISK); 602 method.checkChooseStorageTypes(cold, replication, chosen); 603 } 604 605 { 606 final List<StorageType> chosen = Arrays.asList( 607 StorageType.ARCHIVE, StorageType.ARCHIVE); 608 method.checkChooseStorageTypes(hot, replication, chosen, 609 StorageType.DISK, StorageType.DISK, StorageType.DISK); 610 method.checkChooseStorageTypes(warm, replication, chosen, 611 StorageType.DISK); 612 method.checkChooseStorageTypes(cold, replication, chosen); 613 } 614 615 { 616 final List<StorageType> chosen = Arrays.asList( 617 StorageType.DISK, StorageType.DISK, StorageType.DISK); 618 method.checkChooseStorageTypes(hot, replication, chosen); 619 method.checkChooseStorageTypes(warm, replication, chosen); 620 method.checkChooseStorageTypes(cold, replication, chosen); 621 } 622 623 { 624 final List<StorageType> chosen = Arrays.asList( 625 StorageType.DISK, StorageType.DISK, StorageType.ARCHIVE); 626 method.checkChooseStorageTypes(hot, replication, chosen, 627 StorageType.DISK); 628 method.checkChooseStorageTypes(warm, replication, chosen); 629 method.checkChooseStorageTypes(cold, replication, chosen); 630 } 631 632 { 633 final List<StorageType> chosen = Arrays.asList( 634 StorageType.DISK, StorageType.ARCHIVE, StorageType.ARCHIVE); 635 method.checkChooseStorageTypes(hot, replication, chosen, 636 StorageType.DISK, StorageType.DISK); 637 method.checkChooseStorageTypes(warm, replication, chosen); 638 method.checkChooseStorageTypes(cold, replication, chosen); 639 } 640 641 { 642 final List<StorageType> chosen = Arrays.asList( 643 StorageType.ARCHIVE, StorageType.ARCHIVE, StorageType.ARCHIVE); 644 method.checkChooseStorageTypes(hot, replication, chosen, 645 StorageType.DISK, StorageType.DISK, StorageType.DISK); 646 method.checkChooseStorageTypes(warm, replication, chosen, 647 StorageType.DISK); 648 method.checkChooseStorageTypes(cold, replication, chosen); 649 } 650 } 651 652 @Test 653 public void testChooseStorageTypesWithDiskUnavailableAndNonNewBlock() { 654 final BlockStoragePolicy hot = POLICY_SUITE.getPolicy(HOT); 655 final BlockStoragePolicy warm = POLICY_SUITE.getPolicy(WARM); 656 final BlockStoragePolicy cold = POLICY_SUITE.getPolicy(COLD); 657 658 final short replication = 3; 659 final EnumSet<StorageType> unavailables = disk; 660 final boolean isNewBlock = false; 661 { 662 final List<StorageType> chosen = Lists.newArrayList(); 663 checkChooseStorageTypes(hot, replication, chosen, unavailables, isNewBlock, 664 StorageType.ARCHIVE, StorageType.ARCHIVE, StorageType.ARCHIVE); 665 checkChooseStorageTypes(warm, replication, chosen, unavailables, isNewBlock, 666 StorageType.ARCHIVE, StorageType.ARCHIVE, StorageType.ARCHIVE); 667 checkChooseStorageTypes(cold, replication, chosen, unavailables, isNewBlock, 668 StorageType.ARCHIVE, StorageType.ARCHIVE, StorageType.ARCHIVE); 669 } 670 671 { 672 final List<StorageType> chosen = Arrays.asList(StorageType.DISK); 673 checkChooseStorageTypes(hot, replication, chosen, unavailables, isNewBlock, 674 StorageType.ARCHIVE, StorageType.ARCHIVE); 675 checkChooseStorageTypes(warm, replication, chosen, unavailables, isNewBlock, 676 StorageType.ARCHIVE, StorageType.ARCHIVE); 677 checkChooseStorageTypes(cold, replication, chosen, unavailables, isNewBlock, 678 StorageType.ARCHIVE, StorageType.ARCHIVE, StorageType.ARCHIVE); 679 } 680 681 { 682 final List<StorageType> chosen = Arrays.asList(StorageType.ARCHIVE); 683 checkChooseStorageTypes(hot, replication, chosen, unavailables, isNewBlock, 684 StorageType.ARCHIVE, StorageType.ARCHIVE); 685 checkChooseStorageTypes(warm, replication, chosen, unavailables, isNewBlock, 686 StorageType.ARCHIVE, StorageType.ARCHIVE); 687 checkChooseStorageTypes(cold, replication, chosen, unavailables, isNewBlock, 688 StorageType.ARCHIVE, StorageType.ARCHIVE); 689 } 690 691 { 692 final List<StorageType> chosen = Arrays.asList( 693 StorageType.DISK, StorageType.DISK); 694 checkChooseStorageTypes(hot, replication, chosen, unavailables, isNewBlock, 695 StorageType.ARCHIVE); 696 checkChooseStorageTypes(warm, replication, chosen, unavailables, isNewBlock, 697 StorageType.ARCHIVE, StorageType.ARCHIVE); 698 checkChooseStorageTypes(cold, replication, chosen, unavailables, isNewBlock, 699 StorageType.ARCHIVE, StorageType.ARCHIVE, StorageType.ARCHIVE); 700 } 701 702 { 703 final List<StorageType> chosen = Arrays.asList( 704 StorageType.DISK, StorageType.ARCHIVE); 705 checkChooseStorageTypes(hot, replication, chosen, unavailables, isNewBlock, 706 StorageType.ARCHIVE); 707 checkChooseStorageTypes(warm, replication, chosen, unavailables, isNewBlock, 708 StorageType.ARCHIVE); 709 checkChooseStorageTypes(cold, replication, chosen, unavailables, isNewBlock, 710 StorageType.ARCHIVE, StorageType.ARCHIVE); 711 } 712 713 { 714 final List<StorageType> chosen = Arrays.asList( 715 StorageType.ARCHIVE, StorageType.ARCHIVE); 716 checkChooseStorageTypes(hot, replication, chosen, unavailables, isNewBlock, 717 StorageType.ARCHIVE); 718 checkChooseStorageTypes(warm, replication, chosen, unavailables, isNewBlock, 719 StorageType.ARCHIVE); 720 checkChooseStorageTypes(cold, replication, chosen, unavailables, isNewBlock, 721 StorageType.ARCHIVE); 722 } 723 724 { 725 final List<StorageType> chosen = Arrays.asList( 726 StorageType.DISK, StorageType.DISK, StorageType.DISK); 727 checkChooseStorageTypes(hot, replication, chosen, unavailables, isNewBlock); 728 checkChooseStorageTypes(warm, replication, chosen, unavailables, isNewBlock, 729 StorageType.ARCHIVE, StorageType.ARCHIVE); 730 checkChooseStorageTypes(cold, replication, chosen, unavailables, isNewBlock, 731 StorageType.ARCHIVE, StorageType.ARCHIVE, StorageType.ARCHIVE); 732 } 733 734 { 735 final List<StorageType> chosen = Arrays.asList( 736 StorageType.DISK, StorageType.DISK, StorageType.ARCHIVE); 737 checkChooseStorageTypes(hot, replication, chosen, unavailables, isNewBlock); 738 checkChooseStorageTypes(warm, replication, chosen, unavailables, isNewBlock, 739 StorageType.ARCHIVE); 740 checkChooseStorageTypes(cold, replication, chosen, unavailables, isNewBlock, 741 StorageType.ARCHIVE, StorageType.ARCHIVE); 742 } 743 744 { 745 final List<StorageType> chosen = Arrays.asList( 746 StorageType.DISK, StorageType.ARCHIVE, StorageType.ARCHIVE); 747 checkChooseStorageTypes(hot, replication, chosen, unavailables, isNewBlock); 748 checkChooseStorageTypes(warm, replication, chosen, unavailables, isNewBlock); 749 checkChooseStorageTypes(cold, replication, chosen, unavailables, isNewBlock, 750 StorageType.ARCHIVE); 751 } 752 753 { 754 final List<StorageType> chosen = Arrays.asList( 755 StorageType.ARCHIVE, StorageType.ARCHIVE, StorageType.ARCHIVE); 756 checkChooseStorageTypes(hot, replication, chosen, unavailables, isNewBlock); 757 checkChooseStorageTypes(warm, replication, chosen, unavailables, isNewBlock); 758 checkChooseStorageTypes(cold, replication, chosen, unavailables, isNewBlock); 759 } 760 } 761 762 static void checkChooseStorageTypes(BlockStoragePolicy p, short replication, 763 List<StorageType> chosen, EnumSet<StorageType> unavailables, 764 boolean isNewBlock, StorageType... expected) { 765 final List<StorageType> types = p.chooseStorageTypes(replication, chosen, 766 unavailables, isNewBlock); 767 assertStorageTypes(types, expected); 768 } 769 770 static void assertStorageTypes(List<StorageType> computed, StorageType... expected) { 771 assertStorageTypes(computed.toArray(StorageType.EMPTY_ARRAY), expected); 772 } 773 774 static void assertStorageTypes(StorageType[] computed, StorageType... expected) { 775 Arrays.sort(expected); 776 Arrays.sort(computed); 777 Assert.assertArrayEquals(expected, computed); 778 } 779 780 @Test 781 public void testChooseExcess() { 782 final BlockStoragePolicy hot = POLICY_SUITE.getPolicy(HOT); 783 final BlockStoragePolicy warm = POLICY_SUITE.getPolicy(WARM); 784 final BlockStoragePolicy cold = POLICY_SUITE.getPolicy(COLD); 785 786 final short replication = 3; 787 for(int n = 0; n <= 6; n++) { 788 for(int d = 0; d <= n; d++) { 789 final int a = n - d; 790 final List<StorageType> chosen = asList(d, a); 791 { 792 final int nDisk = Math.max(0, d - replication); 793 final int nArchive = a; 794 final StorageType[] expected = newStorageTypes(nDisk, nArchive); 795 checkChooseExcess(hot, replication, chosen, expected); 796 } 797 798 { 799 final int nDisk = Math.max(0, d - 1); 800 final int nArchive = Math.max(0, a - replication + 1); 801 final StorageType[] expected = newStorageTypes(nDisk, nArchive); 802 checkChooseExcess(warm, replication, chosen, expected); 803 } 804 805 { 806 final int nDisk = d; 807 final int nArchive = Math.max(0, a - replication ); 808 final StorageType[] expected = newStorageTypes(nDisk, nArchive); 809 checkChooseExcess(cold, replication, chosen, expected); 810 } 811 } 812 } 813 } 814 815 static void checkChooseExcess(BlockStoragePolicy p, short replication, 816 List<StorageType> chosen, StorageType... expected) { 817 final List<StorageType> types = p.chooseExcess(replication, chosen); 818 assertStorageTypes(types, expected); 819 } 820 821 private void checkDirectoryListing(HdfsFileStatus[] stats, byte... policies) { 822 Assert.assertEquals(stats.length, policies.length); 823 for (int i = 0; i < stats.length; i++) { 824 Assert.assertEquals(stats[i].getStoragePolicy(), policies[i]); 825 } 826 } 827 828 @Test 829 public void testSetStoragePolicy() throws Exception { 830 final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) 831 .numDataNodes(REPLICATION).build(); 832 cluster.waitActive(); 833 final DistributedFileSystem fs = cluster.getFileSystem(); 834 try { 835 final Path dir = new Path("/testSetStoragePolicy"); 836 final Path fooFile = new Path(dir, "foo"); 837 final Path barDir = new Path(dir, "bar"); 838 final Path barFile1= new Path(barDir, "f1"); 839 final Path barFile2= new Path(barDir, "f2"); 840 DFSTestUtil.createFile(fs, fooFile, FILE_LEN, REPLICATION, 0L); 841 DFSTestUtil.createFile(fs, barFile1, FILE_LEN, REPLICATION, 0L); 842 DFSTestUtil.createFile(fs, barFile2, FILE_LEN, REPLICATION, 0L); 843 844 final String invalidPolicyName = "INVALID-POLICY"; 845 try { 846 fs.setStoragePolicy(fooFile, invalidPolicyName); 847 Assert.fail("Should throw a HadoopIllegalArgumentException"); 848 } catch (RemoteException e) { 849 GenericTestUtils.assertExceptionContains(invalidPolicyName, e); 850 } 851 852 // check storage policy 853 HdfsFileStatus[] dirList = fs.getClient().listPaths(dir.toString(), 854 HdfsFileStatus.EMPTY_NAME, true).getPartialListing(); 855 HdfsFileStatus[] barList = fs.getClient().listPaths(barDir.toString(), 856 HdfsFileStatus.EMPTY_NAME, true).getPartialListing(); 857 checkDirectoryListing(dirList, ID_UNSPECIFIED, ID_UNSPECIFIED); 858 checkDirectoryListing(barList, ID_UNSPECIFIED, ID_UNSPECIFIED); 859 860 final Path invalidPath = new Path("/invalidPath"); 861 try { 862 fs.setStoragePolicy(invalidPath, HdfsConstants.WARM_STORAGE_POLICY_NAME); 863 Assert.fail("Should throw a FileNotFoundException"); 864 } catch (FileNotFoundException e) { 865 GenericTestUtils.assertExceptionContains(invalidPath.toString(), e); 866 } 867 868 fs.setStoragePolicy(fooFile, HdfsConstants.COLD_STORAGE_POLICY_NAME); 869 fs.setStoragePolicy(barDir, HdfsConstants.WARM_STORAGE_POLICY_NAME); 870 fs.setStoragePolicy(barFile2, HdfsConstants.HOT_STORAGE_POLICY_NAME); 871 872 dirList = fs.getClient().listPaths(dir.toString(), 873 HdfsFileStatus.EMPTY_NAME).getPartialListing(); 874 barList = fs.getClient().listPaths(barDir.toString(), 875 HdfsFileStatus.EMPTY_NAME).getPartialListing(); 876 checkDirectoryListing(dirList, WARM, COLD); // bar is warm, foo is cold 877 checkDirectoryListing(barList, WARM, HOT); 878 879 // restart namenode to make sure the editlog is correct 880 cluster.restartNameNode(true); 881 dirList = fs.getClient().listPaths(dir.toString(), 882 HdfsFileStatus.EMPTY_NAME, true).getPartialListing(); 883 barList = fs.getClient().listPaths(barDir.toString(), 884 HdfsFileStatus.EMPTY_NAME, true).getPartialListing(); 885 checkDirectoryListing(dirList, WARM, COLD); // bar is warm, foo is cold 886 checkDirectoryListing(barList, WARM, HOT); 887 888 // restart namenode with checkpoint to make sure the fsimage is correct 889 fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER); 890 fs.saveNamespace(); 891 fs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE); 892 cluster.restartNameNode(true); 893 dirList = fs.getClient().listPaths(dir.toString(), 894 HdfsFileStatus.EMPTY_NAME).getPartialListing(); 895 barList = fs.getClient().listPaths(barDir.toString(), 896 HdfsFileStatus.EMPTY_NAME).getPartialListing(); 897 checkDirectoryListing(dirList, WARM, COLD); // bar is warm, foo is cold 898 checkDirectoryListing(barList, WARM, HOT); 899 } finally { 900 cluster.shutdown(); 901 } 902 } 903 904 @Test 905 public void testSetStoragePolicyWithSnapshot() throws Exception { 906 final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) 907 .numDataNodes(REPLICATION).build(); 908 cluster.waitActive(); 909 final DistributedFileSystem fs = cluster.getFileSystem(); 910 try { 911 final Path dir = new Path("/testSetStoragePolicyWithSnapshot"); 912 final Path fooDir = new Path(dir, "foo"); 913 final Path fooFile1= new Path(fooDir, "f1"); 914 final Path fooFile2= new Path(fooDir, "f2"); 915 DFSTestUtil.createFile(fs, fooFile1, FILE_LEN, REPLICATION, 0L); 916 DFSTestUtil.createFile(fs, fooFile2, FILE_LEN, REPLICATION, 0L); 917 918 fs.setStoragePolicy(fooDir, HdfsConstants.WARM_STORAGE_POLICY_NAME); 919 920 HdfsFileStatus[] dirList = fs.getClient().listPaths(dir.toString(), 921 HdfsFileStatus.EMPTY_NAME, true).getPartialListing(); 922 checkDirectoryListing(dirList, WARM); 923 HdfsFileStatus[] fooList = fs.getClient().listPaths(fooDir.toString(), 924 HdfsFileStatus.EMPTY_NAME, true).getPartialListing(); 925 checkDirectoryListing(fooList, WARM, WARM); 926 927 // take snapshot 928 SnapshotTestHelper.createSnapshot(fs, dir, "s1"); 929 // change the storage policy of fooFile1 930 fs.setStoragePolicy(fooFile1, HdfsConstants.COLD_STORAGE_POLICY_NAME); 931 932 fooList = fs.getClient().listPaths(fooDir.toString(), 933 HdfsFileStatus.EMPTY_NAME).getPartialListing(); 934 checkDirectoryListing(fooList, COLD, WARM); 935 936 // check the policy for /dir/.snapshot/s1/foo/f1. Note we always return 937 // the latest storage policy for a file/directory. 938 Path s1f1 = SnapshotTestHelper.getSnapshotPath(dir, "s1", "foo/f1"); 939 DirectoryListing f1Listing = fs.getClient().listPaths(s1f1.toString(), 940 HdfsFileStatus.EMPTY_NAME); 941 checkDirectoryListing(f1Listing.getPartialListing(), COLD); 942 943 // delete f1 944 fs.delete(fooFile1, true); 945 fooList = fs.getClient().listPaths(fooDir.toString(), 946 HdfsFileStatus.EMPTY_NAME).getPartialListing(); 947 checkDirectoryListing(fooList, WARM); 948 // check the policy for /dir/.snapshot/s1/foo/f1 again after the deletion 949 checkDirectoryListing(fs.getClient().listPaths(s1f1.toString(), 950 HdfsFileStatus.EMPTY_NAME).getPartialListing(), COLD); 951 952 // change the storage policy of foo dir 953 fs.setStoragePolicy(fooDir, HdfsConstants.HOT_STORAGE_POLICY_NAME); 954 // /dir/foo is now hot 955 dirList = fs.getClient().listPaths(dir.toString(), 956 HdfsFileStatus.EMPTY_NAME, true).getPartialListing(); 957 checkDirectoryListing(dirList, HOT); 958 // /dir/foo/f2 is hot 959 fooList = fs.getClient().listPaths(fooDir.toString(), 960 HdfsFileStatus.EMPTY_NAME).getPartialListing(); 961 checkDirectoryListing(fooList, HOT); 962 963 // check storage policy of snapshot path 964 Path s1 = SnapshotTestHelper.getSnapshotRoot(dir, "s1"); 965 Path s1foo = SnapshotTestHelper.getSnapshotPath(dir, "s1", "foo"); 966 checkDirectoryListing(fs.getClient().listPaths(s1.toString(), 967 HdfsFileStatus.EMPTY_NAME).getPartialListing(), HOT); 968 // /dir/.snapshot/.s1/foo/f1 and /dir/.snapshot/.s1/foo/f2 should still 969 // follow the latest 970 checkDirectoryListing(fs.getClient().listPaths(s1foo.toString(), 971 HdfsFileStatus.EMPTY_NAME).getPartialListing(), COLD, HOT); 972 973 // delete foo 974 fs.delete(fooDir, true); 975 checkDirectoryListing(fs.getClient().listPaths(s1.toString(), 976 HdfsFileStatus.EMPTY_NAME).getPartialListing(), HOT); 977 checkDirectoryListing(fs.getClient().listPaths(s1foo.toString(), 978 HdfsFileStatus.EMPTY_NAME).getPartialListing(), COLD, HOT); 979 } finally { 980 cluster.shutdown(); 981 } 982 } 983 984 private static StorageType[][] genStorageTypes(int numDataNodes) { 985 StorageType[][] types = new StorageType[numDataNodes][]; 986 for (int i = 0; i < types.length; i++) { 987 types[i] = new StorageType[]{StorageType.DISK, StorageType.ARCHIVE}; 988 } 989 return types; 990 } 991 992 private void checkLocatedBlocks(HdfsLocatedFileStatus status, int blockNum, 993 int replicaNum, StorageType... types) { 994 List<StorageType> typeList = Lists.newArrayList(); 995 Collections.addAll(typeList, types); 996 LocatedBlocks lbs = status.getBlockLocations(); 997 Assert.assertEquals(blockNum, lbs.getLocatedBlocks().size()); 998 for (LocatedBlock lb : lbs.getLocatedBlocks()) { 999 Assert.assertEquals(replicaNum, lb.getStorageTypes().length); 1000 for (StorageType type : lb.getStorageTypes()) { 1001 Assert.assertTrue(typeList.remove(type)); 1002 } 1003 } 1004 Assert.assertTrue(typeList.isEmpty()); 1005 } 1006 1007 private void testChangeFileRep(String policyName, byte policyId, 1008 StorageType[] before, 1009 StorageType[] after) throws Exception { 1010 final int numDataNodes = 5; 1011 final StorageType[][] types = genStorageTypes(numDataNodes); 1012 final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) 1013 .numDataNodes(numDataNodes).storageTypes(types).build(); 1014 cluster.waitActive(); 1015 final DistributedFileSystem fs = cluster.getFileSystem(); 1016 try { 1017 final Path dir = new Path("/test"); 1018 fs.mkdirs(dir); 1019 fs.setStoragePolicy(dir, policyName); 1020 1021 final Path foo = new Path(dir, "foo"); 1022 DFSTestUtil.createFile(fs, foo, FILE_LEN, REPLICATION, 0L); 1023 1024 HdfsFileStatus[] status = fs.getClient().listPaths(foo.toString(), 1025 HdfsFileStatus.EMPTY_NAME, true).getPartialListing(); 1026 checkDirectoryListing(status, policyId); 1027 HdfsLocatedFileStatus fooStatus = (HdfsLocatedFileStatus) status[0]; 1028 checkLocatedBlocks(fooStatus, 1, 3, before); 1029 1030 // change the replication factor to 5 1031 fs.setReplication(foo, (short) numDataNodes); 1032 Thread.sleep(1000); 1033 for (DataNode dn : cluster.getDataNodes()) { 1034 DataNodeTestUtils.triggerHeartbeat(dn); 1035 } 1036 Thread.sleep(1000); 1037 status = fs.getClient().listPaths(foo.toString(), 1038 HdfsFileStatus.EMPTY_NAME, true).getPartialListing(); 1039 checkDirectoryListing(status, policyId); 1040 fooStatus = (HdfsLocatedFileStatus) status[0]; 1041 checkLocatedBlocks(fooStatus, 1, numDataNodes, after); 1042 1043 // change the replication factor back to 3 1044 fs.setReplication(foo, REPLICATION); 1045 Thread.sleep(1000); 1046 for (DataNode dn : cluster.getDataNodes()) { 1047 DataNodeTestUtils.triggerHeartbeat(dn); 1048 } 1049 Thread.sleep(1000); 1050 for (DataNode dn : cluster.getDataNodes()) { 1051 DataNodeTestUtils.triggerBlockReport(dn); 1052 } 1053 Thread.sleep(1000); 1054 status = fs.getClient().listPaths(foo.toString(), 1055 HdfsFileStatus.EMPTY_NAME, true).getPartialListing(); 1056 checkDirectoryListing(status, policyId); 1057 fooStatus = (HdfsLocatedFileStatus) status[0]; 1058 checkLocatedBlocks(fooStatus, 1, REPLICATION, before); 1059 } finally { 1060 cluster.shutdown(); 1061 } 1062 } 1063 1064 /** 1065 * Consider a File with Hot storage policy. Increase replication factor of 1066 * that file from 3 to 5. Make sure all replications are created in DISKS. 1067 */ 1068 @Test 1069 public void testChangeHotFileRep() throws Exception { 1070 testChangeFileRep(HdfsConstants.HOT_STORAGE_POLICY_NAME, HOT, 1071 new StorageType[]{StorageType.DISK, StorageType.DISK, 1072 StorageType.DISK}, 1073 new StorageType[]{StorageType.DISK, StorageType.DISK, StorageType.DISK, 1074 StorageType.DISK, StorageType.DISK}); 1075 } 1076 1077 /** 1078 * Consider a File with Warm temperature. Increase replication factor of 1079 * that file from 3 to 5. Make sure all replicas are created in DISKS 1080 * and ARCHIVE. 1081 */ 1082 @Test 1083 public void testChangeWarmRep() throws Exception { 1084 testChangeFileRep(HdfsConstants.WARM_STORAGE_POLICY_NAME, WARM, 1085 new StorageType[]{StorageType.DISK, StorageType.ARCHIVE, 1086 StorageType.ARCHIVE}, 1087 new StorageType[]{StorageType.DISK, StorageType.ARCHIVE, 1088 StorageType.ARCHIVE, StorageType.ARCHIVE, StorageType.ARCHIVE}); 1089 } 1090 1091 /** 1092 * Consider a File with Cold temperature. Increase replication factor of 1093 * that file from 3 to 5. Make sure all replicas are created in ARCHIVE. 1094 */ 1095 @Test 1096 public void testChangeColdRep() throws Exception { 1097 testChangeFileRep(HdfsConstants.COLD_STORAGE_POLICY_NAME, COLD, 1098 new StorageType[]{StorageType.ARCHIVE, StorageType.ARCHIVE, 1099 StorageType.ARCHIVE}, 1100 new StorageType[]{StorageType.ARCHIVE, StorageType.ARCHIVE, 1101 StorageType.ARCHIVE, StorageType.ARCHIVE, StorageType.ARCHIVE}); 1102 } 1103 1104 @Test 1105 public void testChooseTargetWithTopology() throws Exception { 1106 BlockStoragePolicy policy1 = new BlockStoragePolicy((byte) 9, "TEST1", 1107 new StorageType[]{StorageType.SSD, StorageType.DISK, 1108 StorageType.ARCHIVE}, new StorageType[]{}, new StorageType[]{}); 1109 BlockStoragePolicy policy2 = new BlockStoragePolicy((byte) 11, "TEST2", 1110 new StorageType[]{StorageType.DISK, StorageType.SSD, 1111 StorageType.ARCHIVE}, new StorageType[]{}, new StorageType[]{}); 1112 1113 final String[] racks = {"/d1/r1", "/d1/r2", "/d1/r2"}; 1114 final String[] hosts = {"host1", "host2", "host3"}; 1115 final StorageType[] types = {StorageType.DISK, StorageType.SSD, 1116 StorageType.ARCHIVE}; 1117 1118 final DatanodeStorageInfo[] storages = DFSTestUtil 1119 .createDatanodeStorageInfos(3, racks, hosts, types); 1120 final DatanodeDescriptor[] dataNodes = DFSTestUtil 1121 .toDatanodeDescriptor(storages); 1122 1123 FileSystem.setDefaultUri(conf, "hdfs://localhost:0"); 1124 conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:0"); 1125 File baseDir = PathUtils.getTestDir(TestReplicationPolicy.class); 1126 conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, 1127 new File(baseDir, "name").getPath()); 1128 DFSTestUtil.formatNameNode(conf); 1129 NameNode namenode = new NameNode(conf); 1130 1131 final BlockManager bm = namenode.getNamesystem().getBlockManager(); 1132 BlockPlacementPolicy replicator = bm.getBlockPlacementPolicy(); 1133 NetworkTopology cluster = bm.getDatanodeManager().getNetworkTopology(); 1134 for (DatanodeDescriptor datanode : dataNodes) { 1135 cluster.add(datanode); 1136 } 1137 1138 DatanodeStorageInfo[] targets = replicator.chooseTarget("/foo", 3, 1139 dataNodes[0], Collections.<DatanodeStorageInfo>emptyList(), false, 1140 new HashSet<Node>(), 0, policy1); 1141 System.out.println(Arrays.asList(targets)); 1142 Assert.assertEquals(3, targets.length); 1143 targets = replicator.chooseTarget("/foo", 3, 1144 dataNodes[0], Collections.<DatanodeStorageInfo>emptyList(), false, 1145 new HashSet<Node>(), 0, policy2); 1146 System.out.println(Arrays.asList(targets)); 1147 Assert.assertEquals(3, targets.length); 1148 } 1149 1150 /** 1151 * Test getting all the storage policies from the namenode 1152 */ 1153 @Test 1154 public void testGetAllStoragePolicies() throws Exception { 1155 final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) 1156 .numDataNodes(0).build(); 1157 cluster.waitActive(); 1158 final DistributedFileSystem fs = cluster.getFileSystem(); 1159 try { 1160 BlockStoragePolicy[] policies = fs.getStoragePolicies(); 1161 Assert.assertEquals(6, policies.length); 1162 Assert.assertEquals(POLICY_SUITE.getPolicy(COLD).toString(), 1163 policies[0].toString()); 1164 Assert.assertEquals(POLICY_SUITE.getPolicy(WARM).toString(), 1165 policies[1].toString()); 1166 Assert.assertEquals(POLICY_SUITE.getPolicy(HOT).toString(), 1167 policies[2].toString()); 1168 Assert.assertEquals(POLICY_SUITE.getPolicy(ONESSD).toString(), 1169 policies[3].toString()); 1170 Assert.assertEquals(POLICY_SUITE.getPolicy(ALLSSD).toString(), 1171 policies[4].toString()); 1172 Assert.assertEquals(POLICY_SUITE.getPolicy(LAZY_PERSIST).toString(), 1173 policies[5].toString()); 1174 } finally { 1175 IOUtils.cleanup(null, fs); 1176 cluster.shutdown(); 1177 } 1178 } 1179 1180 @Test 1181 public void testChooseSsdOverDisk() throws Exception { 1182 BlockStoragePolicy policy = new BlockStoragePolicy((byte) 9, "TEST1", 1183 new StorageType[]{StorageType.SSD, StorageType.DISK, 1184 StorageType.ARCHIVE}, new StorageType[]{}, new StorageType[]{}); 1185 1186 final String[] racks = {"/d1/r1", "/d1/r1", "/d1/r1"}; 1187 final String[] hosts = {"host1", "host2", "host3"}; 1188 final StorageType[] disks = {StorageType.DISK, StorageType.DISK, StorageType.DISK}; 1189 1190 final DatanodeStorageInfo[] diskStorages 1191 = DFSTestUtil.createDatanodeStorageInfos(3, racks, hosts, disks); 1192 final DatanodeDescriptor[] dataNodes 1193 = DFSTestUtil.toDatanodeDescriptor(diskStorages); 1194 for(int i = 0; i < dataNodes.length; i++) { 1195 BlockManagerTestUtil.updateStorage(dataNodes[i], 1196 new DatanodeStorage("ssd" + i, DatanodeStorage.State.NORMAL, 1197 StorageType.SSD)); 1198 } 1199 1200 FileSystem.setDefaultUri(conf, "hdfs://localhost:0"); 1201 conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:0"); 1202 File baseDir = PathUtils.getTestDir(TestReplicationPolicy.class); 1203 conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, 1204 new File(baseDir, "name").getPath()); 1205 DFSTestUtil.formatNameNode(conf); 1206 NameNode namenode = new NameNode(conf); 1207 1208 final BlockManager bm = namenode.getNamesystem().getBlockManager(); 1209 BlockPlacementPolicy replicator = bm.getBlockPlacementPolicy(); 1210 NetworkTopology cluster = bm.getDatanodeManager().getNetworkTopology(); 1211 for (DatanodeDescriptor datanode : dataNodes) { 1212 cluster.add(datanode); 1213 } 1214 1215 DatanodeStorageInfo[] targets = replicator.chooseTarget("/foo", 3, 1216 dataNodes[0], Collections.<DatanodeStorageInfo>emptyList(), false, 1217 new HashSet<Node>(), 0, policy); 1218 System.out.println(policy.getName() + ": " + Arrays.asList(targets)); 1219 Assert.assertEquals(2, targets.length); 1220 Assert.assertEquals(StorageType.SSD, targets[0].getStorageType()); 1221 Assert.assertEquals(StorageType.DISK, targets[1].getStorageType()); 1222 } 1223 1224 @Test 1225 public void testStorageType() { 1226 final EnumMap<StorageType, Integer> map = new EnumMap<>(StorageType.class); 1227 1228 //put storage type is reversed order 1229 map.put(StorageType.ARCHIVE, 1); 1230 map.put(StorageType.DISK, 1); 1231 map.put(StorageType.SSD, 1); 1232 map.put(StorageType.RAM_DISK, 1); 1233 1234 { 1235 final Iterator<StorageType> i = map.keySet().iterator(); 1236 Assert.assertEquals(StorageType.RAM_DISK, i.next()); 1237 Assert.assertEquals(StorageType.SSD, i.next()); 1238 Assert.assertEquals(StorageType.DISK, i.next()); 1239 Assert.assertEquals(StorageType.ARCHIVE, i.next()); 1240 } 1241 1242 { 1243 final Iterator<Map.Entry<StorageType, Integer>> i 1244 = map.entrySet().iterator(); 1245 Assert.assertEquals(StorageType.RAM_DISK, i.next().getKey()); 1246 Assert.assertEquals(StorageType.SSD, i.next().getKey()); 1247 Assert.assertEquals(StorageType.DISK, i.next().getKey()); 1248 Assert.assertEquals(StorageType.ARCHIVE, i.next().getKey()); 1249 } 1250 } 1251 1252 public void testGetFileStoragePolicyAfterRestartNN() throws Exception { 1253 //HDFS8219 1254 final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) 1255 .numDataNodes(REPLICATION) 1256 .storageTypes( 1257 new StorageType[] {StorageType.DISK, StorageType.ARCHIVE}) 1258 .build(); 1259 cluster.waitActive(); 1260 final DistributedFileSystem fs = cluster.getFileSystem(); 1261 try { 1262 final String file = "/testScheduleWithinSameNode/file"; 1263 Path dir = new Path("/testScheduleWithinSameNode"); 1264 fs.mkdirs(dir); 1265 // 2. Set Dir policy 1266 fs.setStoragePolicy(dir, "COLD"); 1267 // 3. Create file 1268 final FSDataOutputStream out = fs.create(new Path(file)); 1269 out.writeChars("testScheduleWithinSameNode"); 1270 out.close(); 1271 // 4. Set Dir policy 1272 fs.setStoragePolicy(dir, "HOT"); 1273 HdfsFileStatus status = fs.getClient().getFileInfo(file); 1274 // 5. get file policy, it should be parent policy. 1275 Assert 1276 .assertTrue( 1277 "File storage policy should be HOT", 1278 status.getStoragePolicy() 1279 == HdfsConstants.HOT_STORAGE_POLICY_ID); 1280 // 6. restart NameNode for reloading edits logs. 1281 cluster.restartNameNode(true); 1282 // 7. get file policy, it should be parent policy. 1283 status = fs.getClient().getFileInfo(file); 1284 Assert 1285 .assertTrue( 1286 "File storage policy should be HOT", 1287 status.getStoragePolicy() 1288 == HdfsConstants.HOT_STORAGE_POLICY_ID); 1289 1290 } finally { 1291 cluster.shutdown(); 1292 } 1293 } 1294 } 1295