1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 package org.apache.hadoop.hdfs.server.mover; 19 20 import java.io.IOException; 21 import java.net.URI; 22 import java.util.ArrayList; 23 import java.util.Arrays; 24 import java.util.Collection; 25 import java.util.Collections; 26 import java.util.List; 27 import java.util.Map; 28 29 import org.apache.commons.logging.Log; 30 import org.apache.commons.logging.LogFactory; 31 import org.apache.commons.logging.impl.Log4JLogger; 32 import org.apache.hadoop.conf.Configuration; 33 import org.apache.hadoop.fs.FSDataInputStream; 34 import org.apache.hadoop.fs.FSDataOutputStream; 35 import org.apache.hadoop.fs.FileUtil; 36 import org.apache.hadoop.fs.Path; 37 import org.apache.hadoop.fs.StorageType; 38 import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; 39 import org.apache.hadoop.hdfs.DFSConfigKeys; 40 import org.apache.hadoop.hdfs.DFSOutputStream; 41 import org.apache.hadoop.hdfs.DFSTestUtil; 42 import org.apache.hadoop.hdfs.DFSUtil; 43 import org.apache.hadoop.hdfs.DistributedFileSystem; 44 import org.apache.hadoop.hdfs.HdfsConfiguration; 45 import org.apache.hadoop.hdfs.MiniDFSCluster; 46 import org.apache.hadoop.hdfs.protocol.DirectoryListing; 47 import org.apache.hadoop.hdfs.protocol.HdfsConstants; 48 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; 49 import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus; 50 import org.apache.hadoop.hdfs.protocol.LocatedBlock; 51 import org.apache.hadoop.hdfs.protocol.LocatedBlocks; 52 import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol; 53 import org.apache.hadoop.hdfs.server.balancer.Dispatcher; 54 import org.apache.hadoop.hdfs.server.balancer.ExitStatus; 55 import org.apache.hadoop.hdfs.server.balancer.TestBalancer; 56 import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy; 57 import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; 58 import org.apache.hadoop.hdfs.server.datanode.DataNode; 59 import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; 60 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; 61 import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl; 62 import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper; 63 import org.apache.hadoop.io.IOUtils; 64 import org.apache.log4j.Level; 65 import org.junit.Assert; 66 import org.junit.Test; 67 68 import com.google.common.base.Preconditions; 69 import com.google.common.collect.Maps; 70 71 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC; 72 73 /** 74 * Test the data migration tool (for Archival Storage) 75 */ 76 public class TestStorageMover { 77 static final Log LOG = LogFactory.getLog(TestStorageMover.class); 78 static { 79 ((Log4JLogger)LogFactory.getLog(BlockPlacementPolicy.class) 80 ).getLogger().setLevel(Level.ALL); 81 ((Log4JLogger)LogFactory.getLog(Dispatcher.class) 82 ).getLogger().setLevel(Level.ALL); 83 ((Log4JLogger)LogFactory.getLog(DataTransferProtocol.class)).getLogger() 84 .setLevel(Level.ALL); 85 } 86 87 private static final int BLOCK_SIZE = 1024; 88 private static final short REPL = 3; 89 private static final int NUM_DATANODES = 6; 90 private static final Configuration DEFAULT_CONF = new HdfsConfiguration(); 91 private static final BlockStoragePolicySuite DEFAULT_POLICIES; 92 private static final BlockStoragePolicy HOT; 93 private static final BlockStoragePolicy WARM; 94 private static final BlockStoragePolicy COLD; 95 96 static { DEFAULT_CONF.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE)97 DEFAULT_CONF.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); DEFAULT_CONF.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L)98 DEFAULT_CONF.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L); DEFAULT_CONF.setLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 2L)99 DEFAULT_CONF.setLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 100 2L); DEFAULT_CONF.setLong(DFSConfigKeys.DFS_MOVER_MOVEDWINWIDTH_KEY, 2000L)101 DEFAULT_CONF.setLong(DFSConfigKeys.DFS_MOVER_MOVEDWINWIDTH_KEY, 2000L); 102 103 DEFAULT_POLICIES = BlockStoragePolicySuite.createDefaultSuite(); 104 HOT = DEFAULT_POLICIES.getPolicy(HdfsConstants.HOT_STORAGE_POLICY_NAME); 105 WARM = DEFAULT_POLICIES.getPolicy(HdfsConstants.WARM_STORAGE_POLICY_NAME); 106 COLD = DEFAULT_POLICIES.getPolicy(HdfsConstants.COLD_STORAGE_POLICY_NAME); TestBalancer.initTestSetup()107 TestBalancer.initTestSetup(); 108 Dispatcher.setDelayAfterErrors(1000L); 109 } 110 111 /** 112 * This scheme defines files/directories and their block storage policies. It 113 * also defines snapshots. 114 */ 115 static class NamespaceScheme { 116 final List<Path> dirs; 117 final List<Path> files; 118 final long fileSize; 119 final Map<Path, List<String>> snapshotMap; 120 final Map<Path, BlockStoragePolicy> policyMap; 121 NamespaceScheme(List<Path> dirs, List<Path> files, long fileSize, Map<Path,List<String>> snapshotMap, Map<Path, BlockStoragePolicy> policyMap)122 NamespaceScheme(List<Path> dirs, List<Path> files, long fileSize, 123 Map<Path,List<String>> snapshotMap, 124 Map<Path, BlockStoragePolicy> policyMap) { 125 this.dirs = dirs == null? Collections.<Path>emptyList(): dirs; 126 this.files = files == null? Collections.<Path>emptyList(): files; 127 this.fileSize = fileSize; 128 this.snapshotMap = snapshotMap == null ? 129 Collections.<Path, List<String>>emptyMap() : snapshotMap; 130 this.policyMap = policyMap; 131 } 132 133 /** 134 * Create files/directories/snapshots. 135 */ prepare(DistributedFileSystem dfs, short repl)136 void prepare(DistributedFileSystem dfs, short repl) throws Exception { 137 for (Path d : dirs) { 138 dfs.mkdirs(d); 139 } 140 for (Path file : files) { 141 DFSTestUtil.createFile(dfs, file, fileSize, repl, 0L); 142 } 143 for (Map.Entry<Path, List<String>> entry : snapshotMap.entrySet()) { 144 for (String snapshot : entry.getValue()) { 145 SnapshotTestHelper.createSnapshot(dfs, entry.getKey(), snapshot); 146 } 147 } 148 } 149 150 /** 151 * Set storage policies according to the corresponding scheme. 152 */ setStoragePolicy(DistributedFileSystem dfs)153 void setStoragePolicy(DistributedFileSystem dfs) throws Exception { 154 for (Map.Entry<Path, BlockStoragePolicy> entry : policyMap.entrySet()) { 155 dfs.setStoragePolicy(entry.getKey(), entry.getValue().getName()); 156 } 157 } 158 } 159 160 /** 161 * This scheme defines DataNodes and their storage, including storage types 162 * and remaining capacities. 163 */ 164 static class ClusterScheme { 165 final Configuration conf; 166 final int numDataNodes; 167 final short repl; 168 final StorageType[][] storageTypes; 169 final long[][] storageCapacities; 170 ClusterScheme()171 ClusterScheme() { 172 this(DEFAULT_CONF, NUM_DATANODES, REPL, 173 genStorageTypes(NUM_DATANODES), null); 174 } 175 ClusterScheme(Configuration conf, int numDataNodes, short repl, StorageType[][] types, long[][] capacities)176 ClusterScheme(Configuration conf, int numDataNodes, short repl, 177 StorageType[][] types, long[][] capacities) { 178 Preconditions.checkArgument(types == null || types.length == numDataNodes); 179 Preconditions.checkArgument(capacities == null || capacities.length == 180 numDataNodes); 181 this.conf = conf; 182 this.numDataNodes = numDataNodes; 183 this.repl = repl; 184 this.storageTypes = types; 185 this.storageCapacities = capacities; 186 } 187 } 188 189 class MigrationTest { 190 private final ClusterScheme clusterScheme; 191 private final NamespaceScheme nsScheme; 192 private final Configuration conf; 193 194 private MiniDFSCluster cluster; 195 private DistributedFileSystem dfs; 196 private final BlockStoragePolicySuite policies; 197 MigrationTest(ClusterScheme cScheme, NamespaceScheme nsScheme)198 MigrationTest(ClusterScheme cScheme, NamespaceScheme nsScheme) { 199 this.clusterScheme = cScheme; 200 this.nsScheme = nsScheme; 201 this.conf = clusterScheme.conf; 202 this.policies = DEFAULT_POLICIES; 203 } 204 205 /** 206 * Set up the cluster and start NameNode and DataNodes according to the 207 * corresponding scheme. 208 */ setupCluster()209 void setupCluster() throws Exception { 210 cluster = new MiniDFSCluster.Builder(conf).numDataNodes(clusterScheme 211 .numDataNodes).storageTypes(clusterScheme.storageTypes) 212 .storageCapacities(clusterScheme.storageCapacities).build(); 213 cluster.waitActive(); 214 dfs = cluster.getFileSystem(); 215 } 216 runBasicTest(boolean shutdown)217 private void runBasicTest(boolean shutdown) throws Exception { 218 setupCluster(); 219 try { 220 prepareNamespace(); 221 verify(true); 222 223 setStoragePolicy(); 224 migrate(); 225 verify(true); 226 } finally { 227 if (shutdown) { 228 shutdownCluster(); 229 } 230 } 231 } 232 shutdownCluster()233 void shutdownCluster() throws Exception { 234 IOUtils.cleanup(null, dfs); 235 if (cluster != null) { 236 cluster.shutdown(); 237 } 238 } 239 240 /** 241 * Create files/directories and set their storage policies according to the 242 * corresponding scheme. 243 */ prepareNamespace()244 void prepareNamespace() throws Exception { 245 nsScheme.prepare(dfs, clusterScheme.repl); 246 } 247 setStoragePolicy()248 void setStoragePolicy() throws Exception { 249 nsScheme.setStoragePolicy(dfs); 250 } 251 252 /** 253 * Run the migration tool. 254 */ migrate()255 void migrate() throws Exception { 256 runMover(); 257 Thread.sleep(5000); // let the NN finish deletion 258 } 259 260 /** 261 * Verify block locations after running the migration tool. 262 */ verify(boolean verifyAll)263 void verify(boolean verifyAll) throws Exception { 264 for (DataNode dn : cluster.getDataNodes()) { 265 DataNodeTestUtils.triggerBlockReport(dn); 266 } 267 if (verifyAll) { 268 verifyNamespace(); 269 } 270 } 271 runMover()272 private void runMover() throws Exception { 273 Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf); 274 Map<URI, List<Path>> nnMap = Maps.newHashMap(); 275 for (URI nn : namenodes) { 276 nnMap.put(nn, null); 277 } 278 int result = Mover.run(nnMap, conf); 279 Assert.assertEquals(ExitStatus.SUCCESS.getExitCode(), result); 280 } 281 verifyNamespace()282 private void verifyNamespace() throws Exception { 283 HdfsFileStatus status = dfs.getClient().getFileInfo("/"); 284 verifyRecursively(null, status); 285 } 286 verifyRecursively(final Path parent, final HdfsFileStatus status)287 private void verifyRecursively(final Path parent, 288 final HdfsFileStatus status) throws Exception { 289 if (status.isDir()) { 290 Path fullPath = parent == null ? 291 new Path("/") : status.getFullPath(parent); 292 DirectoryListing children = dfs.getClient().listPaths( 293 fullPath.toString(), HdfsFileStatus.EMPTY_NAME, true); 294 for (HdfsFileStatus child : children.getPartialListing()) { 295 verifyRecursively(fullPath, child); 296 } 297 } else if (!status.isSymlink()) { // is file 298 verifyFile(parent, status, null); 299 } 300 } 301 verifyFile(final Path file, final Byte expectedPolicyId)302 void verifyFile(final Path file, final Byte expectedPolicyId) 303 throws Exception { 304 final Path parent = file.getParent(); 305 DirectoryListing children = dfs.getClient().listPaths( 306 parent.toString(), HdfsFileStatus.EMPTY_NAME, true); 307 for (HdfsFileStatus child : children.getPartialListing()) { 308 if (child.getLocalName().equals(file.getName())) { 309 verifyFile(parent, child, expectedPolicyId); 310 return; 311 } 312 } 313 Assert.fail("File " + file + " not found."); 314 } 315 verifyFile(final Path parent, final HdfsFileStatus status, final Byte expectedPolicyId)316 private void verifyFile(final Path parent, final HdfsFileStatus status, 317 final Byte expectedPolicyId) throws Exception { 318 HdfsLocatedFileStatus fileStatus = (HdfsLocatedFileStatus) status; 319 byte policyId = fileStatus.getStoragePolicy(); 320 BlockStoragePolicy policy = policies.getPolicy(policyId); 321 if (expectedPolicyId != null) { 322 Assert.assertEquals((byte)expectedPolicyId, policy.getId()); 323 } 324 final List<StorageType> types = policy.chooseStorageTypes( 325 status.getReplication()); 326 for(LocatedBlock lb : fileStatus.getBlockLocations().getLocatedBlocks()) { 327 final Mover.StorageTypeDiff diff = new Mover.StorageTypeDiff(types, 328 lb.getStorageTypes()); 329 Assert.assertTrue(fileStatus.getFullName(parent.toString()) 330 + " with policy " + policy + " has non-empty overlap: " + diff 331 + ", the corresponding block is " + lb.getBlock().getLocalBlock(), 332 diff.removeOverlap(true)); 333 } 334 } 335 getReplication(Path file)336 Replication getReplication(Path file) throws IOException { 337 return getOrVerifyReplication(file, null); 338 } 339 verifyReplication(Path file, int expectedDiskCount, int expectedArchiveCount)340 Replication verifyReplication(Path file, int expectedDiskCount, 341 int expectedArchiveCount) throws IOException { 342 final Replication r = new Replication(); 343 r.disk = expectedDiskCount; 344 r.archive = expectedArchiveCount; 345 return getOrVerifyReplication(file, r); 346 } 347 getOrVerifyReplication(Path file, Replication expected)348 private Replication getOrVerifyReplication(Path file, Replication expected) 349 throws IOException { 350 final List<LocatedBlock> lbs = dfs.getClient().getLocatedBlocks( 351 file.toString(), 0).getLocatedBlocks(); 352 Assert.assertEquals(1, lbs.size()); 353 354 LocatedBlock lb = lbs.get(0); 355 StringBuilder types = new StringBuilder(); 356 final Replication r = new Replication(); 357 for(StorageType t : lb.getStorageTypes()) { 358 types.append(t).append(", "); 359 if (t == StorageType.DISK) { 360 r.disk++; 361 } else if (t == StorageType.ARCHIVE) { 362 r.archive++; 363 } else { 364 Assert.fail("Unexpected storage type " + t); 365 } 366 } 367 368 if (expected != null) { 369 final String s = "file = " + file + "\n types = [" + types + "]"; 370 Assert.assertEquals(s, expected, r); 371 } 372 return r; 373 } 374 } 375 376 static class Replication { 377 int disk; 378 int archive; 379 380 @Override hashCode()381 public int hashCode() { 382 return disk ^ archive; 383 } 384 385 @Override equals(Object obj)386 public boolean equals(Object obj) { 387 if (obj == this) { 388 return true; 389 } else if (obj == null || !(obj instanceof Replication)) { 390 return false; 391 } 392 final Replication that = (Replication)obj; 393 return this.disk == that.disk && this.archive == that.archive; 394 } 395 396 @Override toString()397 public String toString() { 398 return "[disk=" + disk + ", archive=" + archive + "]"; 399 } 400 } 401 genStorageTypes(int numDataNodes)402 private static StorageType[][] genStorageTypes(int numDataNodes) { 403 return genStorageTypes(numDataNodes, 0, 0, 0); 404 } 405 genStorageTypes(int numDataNodes, int numAllDisk, int numAllArchive)406 private static StorageType[][] genStorageTypes(int numDataNodes, 407 int numAllDisk, int numAllArchive) { 408 return genStorageTypes(numDataNodes, numAllDisk, numAllArchive, 0); 409 } 410 genStorageTypes(int numDataNodes, int numAllDisk, int numAllArchive, int numRamDisk)411 private static StorageType[][] genStorageTypes(int numDataNodes, 412 int numAllDisk, int numAllArchive, int numRamDisk) { 413 Preconditions.checkArgument( 414 (numAllDisk + numAllArchive + numRamDisk) <= numDataNodes); 415 416 StorageType[][] types = new StorageType[numDataNodes][]; 417 int i = 0; 418 for (; i < numRamDisk; i++) 419 { 420 types[i] = new StorageType[]{StorageType.RAM_DISK, StorageType.DISK}; 421 } 422 for (; i < numRamDisk + numAllDisk; i++) { 423 types[i] = new StorageType[]{StorageType.DISK, StorageType.DISK}; 424 } 425 for (; i < numRamDisk + numAllDisk + numAllArchive; i++) { 426 types[i] = new StorageType[]{StorageType.ARCHIVE, StorageType.ARCHIVE}; 427 } 428 for (; i < types.length; i++) { 429 types[i] = new StorageType[]{StorageType.DISK, StorageType.ARCHIVE}; 430 } 431 return types; 432 } 433 genCapacities(int nDatanodes, int numAllDisk, int numAllArchive, int numRamDisk, long diskCapacity, long archiveCapacity, long ramDiskCapacity)434 private static long[][] genCapacities(int nDatanodes, int numAllDisk, 435 int numAllArchive, int numRamDisk, long diskCapacity, 436 long archiveCapacity, long ramDiskCapacity) { 437 final long[][] capacities = new long[nDatanodes][]; 438 int i = 0; 439 for (; i < numRamDisk; i++) { 440 capacities[i] = new long[]{ramDiskCapacity, diskCapacity}; 441 } 442 for (; i < numRamDisk + numAllDisk; i++) { 443 capacities[i] = new long[]{diskCapacity, diskCapacity}; 444 } 445 for (; i < numRamDisk + numAllDisk + numAllArchive; i++) { 446 capacities[i] = new long[]{archiveCapacity, archiveCapacity}; 447 } 448 for(; i < capacities.length; i++) { 449 capacities[i] = new long[]{diskCapacity, archiveCapacity}; 450 } 451 return capacities; 452 } 453 454 private static class PathPolicyMap { 455 final Map<Path, BlockStoragePolicy> map = Maps.newHashMap(); 456 final Path hot = new Path("/hot"); 457 final Path warm = new Path("/warm"); 458 final Path cold = new Path("/cold"); 459 final List<Path> files; 460 PathPolicyMap(int filesPerDir)461 PathPolicyMap(int filesPerDir){ 462 map.put(hot, HOT); 463 map.put(warm, WARM); 464 map.put(cold, COLD); 465 files = new ArrayList<Path>(); 466 for(Path dir : map.keySet()) { 467 for(int i = 0; i < filesPerDir; i++) { 468 files.add(new Path(dir, "file" + i)); 469 } 470 } 471 } 472 newNamespaceScheme()473 NamespaceScheme newNamespaceScheme() { 474 return new NamespaceScheme(Arrays.asList(hot, warm, cold), 475 files, BLOCK_SIZE/2, null, map); 476 } 477 478 /** 479 * Move hot files to warm and cold, warm files to hot and cold, 480 * and cold files to hot and warm. 481 */ moveAround(DistributedFileSystem dfs)482 void moveAround(DistributedFileSystem dfs) throws Exception { 483 for(Path srcDir : map.keySet()) { 484 int i = 0; 485 for(Path dstDir : map.keySet()) { 486 if (!srcDir.equals(dstDir)) { 487 final Path src = new Path(srcDir, "file" + i++); 488 final Path dst = new Path(dstDir, srcDir.getName() + "2" + dstDir.getName()); 489 LOG.info("rename " + src + " to " + dst); 490 dfs.rename(src, dst); 491 } 492 } 493 } 494 } 495 } 496 497 /** 498 * A normal case for Mover: move a file into archival storage 499 */ 500 @Test testMigrateFileToArchival()501 public void testMigrateFileToArchival() throws Exception { 502 LOG.info("testMigrateFileToArchival"); 503 final Path foo = new Path("/foo"); 504 Map<Path, BlockStoragePolicy> policyMap = Maps.newHashMap(); 505 policyMap.put(foo, COLD); 506 NamespaceScheme nsScheme = new NamespaceScheme(null, Arrays.asList(foo), 507 2*BLOCK_SIZE, null, policyMap); 508 ClusterScheme clusterScheme = new ClusterScheme(DEFAULT_CONF, 509 NUM_DATANODES, REPL, genStorageTypes(NUM_DATANODES), null); 510 new MigrationTest(clusterScheme, nsScheme).runBasicTest(true); 511 } 512 513 /** 514 * Print a big banner in the test log to make debug easier. 515 */ banner(String string)516 static void banner(String string) { 517 LOG.info("\n\n\n\n================================================\n" + 518 string + "\n" + 519 "==================================================\n\n"); 520 } 521 522 /** 523 * Run Mover with arguments specifying files and directories 524 */ 525 @Test testMoveSpecificPaths()526 public void testMoveSpecificPaths() throws Exception { 527 LOG.info("testMoveSpecificPaths"); 528 final Path foo = new Path("/foo"); 529 final Path barFile = new Path(foo, "bar"); 530 final Path foo2 = new Path("/foo2"); 531 final Path bar2File = new Path(foo2, "bar2"); 532 Map<Path, BlockStoragePolicy> policyMap = Maps.newHashMap(); 533 policyMap.put(foo, COLD); 534 policyMap.put(foo2, WARM); 535 NamespaceScheme nsScheme = new NamespaceScheme(Arrays.asList(foo, foo2), 536 Arrays.asList(barFile, bar2File), BLOCK_SIZE, null, policyMap); 537 ClusterScheme clusterScheme = new ClusterScheme(DEFAULT_CONF, 538 NUM_DATANODES, REPL, genStorageTypes(NUM_DATANODES), null); 539 MigrationTest test = new MigrationTest(clusterScheme, nsScheme); 540 test.setupCluster(); 541 542 try { 543 test.prepareNamespace(); 544 test.setStoragePolicy(); 545 546 Map<URI, List<Path>> map = Mover.Cli.getNameNodePathsToMove(test.conf, 547 "-p", "/foo/bar", "/foo2"); 548 int result = Mover.run(map, test.conf); 549 Assert.assertEquals(ExitStatus.SUCCESS.getExitCode(), result); 550 551 Thread.sleep(5000); 552 test.verify(true); 553 } finally { 554 test.shutdownCluster(); 555 } 556 } 557 558 /** 559 * Move an open file into archival storage 560 */ 561 @Test testMigrateOpenFileToArchival()562 public void testMigrateOpenFileToArchival() throws Exception { 563 LOG.info("testMigrateOpenFileToArchival"); 564 final Path fooDir = new Path("/foo"); 565 Map<Path, BlockStoragePolicy> policyMap = Maps.newHashMap(); 566 policyMap.put(fooDir, COLD); 567 NamespaceScheme nsScheme = new NamespaceScheme(Arrays.asList(fooDir), null, 568 BLOCK_SIZE, null, policyMap); 569 ClusterScheme clusterScheme = new ClusterScheme(DEFAULT_CONF, 570 NUM_DATANODES, REPL, genStorageTypes(NUM_DATANODES), null); 571 MigrationTest test = new MigrationTest(clusterScheme, nsScheme); 572 test.setupCluster(); 573 574 // create an open file 575 banner("writing to file /foo/bar"); 576 final Path barFile = new Path(fooDir, "bar"); 577 DFSTestUtil.createFile(test.dfs, barFile, BLOCK_SIZE, (short) 1, 0L); 578 FSDataOutputStream out = test.dfs.append(barFile); 579 out.writeBytes("hello, "); 580 ((DFSOutputStream) out.getWrappedStream()).hsync(); 581 582 try { 583 banner("start data migration"); 584 test.setStoragePolicy(); // set /foo to COLD 585 test.migrate(); 586 587 // make sure the under construction block has not been migrated 588 LocatedBlocks lbs = test.dfs.getClient().getLocatedBlocks( 589 barFile.toString(), BLOCK_SIZE); 590 LOG.info("Locations: " + lbs); 591 List<LocatedBlock> blks = lbs.getLocatedBlocks(); 592 Assert.assertEquals(1, blks.size()); 593 Assert.assertEquals(1, blks.get(0).getLocations().length); 594 595 banner("finish the migration, continue writing"); 596 // make sure the writing can continue 597 out.writeBytes("world!"); 598 ((DFSOutputStream) out.getWrappedStream()).hsync(); 599 IOUtils.cleanup(LOG, out); 600 601 lbs = test.dfs.getClient().getLocatedBlocks( 602 barFile.toString(), BLOCK_SIZE); 603 LOG.info("Locations: " + lbs); 604 blks = lbs.getLocatedBlocks(); 605 Assert.assertEquals(1, blks.size()); 606 Assert.assertEquals(1, blks.get(0).getLocations().length); 607 608 banner("finish writing, starting reading"); 609 // check the content of /foo/bar 610 FSDataInputStream in = test.dfs.open(barFile); 611 byte[] buf = new byte[13]; 612 // read from offset 1024 613 in.readFully(BLOCK_SIZE, buf, 0, buf.length); 614 IOUtils.cleanup(LOG, in); 615 Assert.assertEquals("hello, world!", new String(buf)); 616 } finally { 617 test.shutdownCluster(); 618 } 619 } 620 621 /** 622 * Test directories with Hot, Warm and Cold polices. 623 */ 624 @Test testHotWarmColdDirs()625 public void testHotWarmColdDirs() throws Exception { 626 LOG.info("testHotWarmColdDirs"); 627 PathPolicyMap pathPolicyMap = new PathPolicyMap(3); 628 NamespaceScheme nsScheme = pathPolicyMap.newNamespaceScheme(); 629 ClusterScheme clusterScheme = new ClusterScheme(); 630 MigrationTest test = new MigrationTest(clusterScheme, nsScheme); 631 632 try { 633 test.runBasicTest(false); 634 pathPolicyMap.moveAround(test.dfs); 635 test.migrate(); 636 637 test.verify(true); 638 } finally { 639 test.shutdownCluster(); 640 } 641 } 642 waitForAllReplicas(int expectedReplicaNum, Path file, DistributedFileSystem dfs)643 private void waitForAllReplicas(int expectedReplicaNum, Path file, 644 DistributedFileSystem dfs) throws Exception { 645 for (int i = 0; i < 5; i++) { 646 LocatedBlocks lbs = dfs.getClient().getLocatedBlocks(file.toString(), 0, 647 BLOCK_SIZE); 648 LocatedBlock lb = lbs.get(0); 649 if (lb.getLocations().length >= expectedReplicaNum) { 650 return; 651 } else { 652 Thread.sleep(1000); 653 } 654 } 655 } 656 setVolumeFull(DataNode dn, StorageType type)657 private void setVolumeFull(DataNode dn, StorageType type) { 658 List<? extends FsVolumeSpi> volumes = dn.getFSDataset().getVolumes(); 659 for (FsVolumeSpi v : volumes) { 660 FsVolumeImpl volume = (FsVolumeImpl) v; 661 if (volume.getStorageType() == type) { 662 LOG.info("setCapacity to 0 for [" + volume.getStorageType() + "]" 663 + volume.getStorageID()); 664 volume.setCapacityForTesting(0); 665 } 666 } 667 } 668 669 /** 670 * Test DISK is running out of spaces. 671 */ 672 @Test testNoSpaceDisk()673 public void testNoSpaceDisk() throws Exception { 674 LOG.info("testNoSpaceDisk"); 675 final PathPolicyMap pathPolicyMap = new PathPolicyMap(0); 676 final NamespaceScheme nsScheme = pathPolicyMap.newNamespaceScheme(); 677 678 Configuration conf = new Configuration(DEFAULT_CONF); 679 final ClusterScheme clusterScheme = new ClusterScheme(conf, 680 NUM_DATANODES, REPL, genStorageTypes(NUM_DATANODES), null); 681 final MigrationTest test = new MigrationTest(clusterScheme, nsScheme); 682 683 try { 684 test.runBasicTest(false); 685 686 // create 2 hot files with replication 3 687 final short replication = 3; 688 for (int i = 0; i < 2; i++) { 689 final Path p = new Path(pathPolicyMap.hot, "file" + i); 690 DFSTestUtil.createFile(test.dfs, p, BLOCK_SIZE, replication, 0L); 691 waitForAllReplicas(replication, p, test.dfs); 692 } 693 694 // set all the DISK volume to full 695 for (DataNode dn : test.cluster.getDataNodes()) { 696 setVolumeFull(dn, StorageType.DISK); 697 DataNodeTestUtils.triggerHeartbeat(dn); 698 } 699 700 // test increasing replication. Since DISK is full, 701 // new replicas should be stored in ARCHIVE as a fallback storage. 702 final Path file0 = new Path(pathPolicyMap.hot, "file0"); 703 final Replication r = test.getReplication(file0); 704 final short newReplication = (short) 5; 705 test.dfs.setReplication(file0, newReplication); 706 Thread.sleep(10000); 707 test.verifyReplication(file0, r.disk, newReplication - r.disk); 708 709 // test creating a cold file and then increase replication 710 final Path p = new Path(pathPolicyMap.cold, "foo"); 711 DFSTestUtil.createFile(test.dfs, p, BLOCK_SIZE, replication, 0L); 712 test.verifyReplication(p, 0, replication); 713 714 test.dfs.setReplication(p, newReplication); 715 Thread.sleep(10000); 716 test.verifyReplication(p, 0, newReplication); 717 718 //test move a hot file to warm 719 final Path file1 = new Path(pathPolicyMap.hot, "file1"); 720 test.dfs.rename(file1, pathPolicyMap.warm); 721 test.migrate(); 722 test.verifyFile(new Path(pathPolicyMap.warm, "file1"), WARM.getId()); 723 } finally { 724 test.shutdownCluster(); 725 } 726 } 727 728 /** 729 * Test ARCHIVE is running out of spaces. 730 */ 731 @Test testNoSpaceArchive()732 public void testNoSpaceArchive() throws Exception { 733 LOG.info("testNoSpaceArchive"); 734 final PathPolicyMap pathPolicyMap = new PathPolicyMap(0); 735 final NamespaceScheme nsScheme = pathPolicyMap.newNamespaceScheme(); 736 737 final ClusterScheme clusterScheme = new ClusterScheme(DEFAULT_CONF, 738 NUM_DATANODES, REPL, genStorageTypes(NUM_DATANODES), null); 739 final MigrationTest test = new MigrationTest(clusterScheme, nsScheme); 740 741 try { 742 test.runBasicTest(false); 743 744 // create 2 hot files with replication 3 745 final short replication = 3; 746 for (int i = 0; i < 2; i++) { 747 final Path p = new Path(pathPolicyMap.cold, "file" + i); 748 DFSTestUtil.createFile(test.dfs, p, BLOCK_SIZE, replication, 0L); 749 waitForAllReplicas(replication, p, test.dfs); 750 } 751 752 // set all the ARCHIVE volume to full 753 for (DataNode dn : test.cluster.getDataNodes()) { 754 setVolumeFull(dn, StorageType.ARCHIVE); 755 DataNodeTestUtils.triggerHeartbeat(dn); 756 } 757 758 { // test increasing replication but new replicas cannot be created 759 // since no more ARCHIVE space. 760 final Path file0 = new Path(pathPolicyMap.cold, "file0"); 761 final Replication r = test.getReplication(file0); 762 Assert.assertEquals(0, r.disk); 763 764 final short newReplication = (short) 5; 765 test.dfs.setReplication(file0, newReplication); 766 Thread.sleep(10000); 767 768 test.verifyReplication(file0, 0, r.archive); 769 } 770 771 { // test creating a hot file 772 final Path p = new Path(pathPolicyMap.hot, "foo"); 773 DFSTestUtil.createFile(test.dfs, p, BLOCK_SIZE, (short) 3, 0L); 774 } 775 776 { //test move a cold file to warm 777 final Path file1 = new Path(pathPolicyMap.cold, "file1"); 778 test.dfs.rename(file1, pathPolicyMap.warm); 779 test.migrate(); 780 test.verify(true); 781 } 782 } finally { 783 test.shutdownCluster(); 784 } 785 } 786 } 787