1 package org.broadinstitute.hellbender.tools.spark; 2 3 import org.apache.hadoop.conf.Configuration; 4 import org.apache.hadoop.fs.*; 5 import org.apache.hadoop.hdfs.MiniDFSCluster; 6 import org.broadinstitute.hellbender.CommandLineProgramTest; 7 import org.broadinstitute.hellbender.utils.Utils; 8 import org.broadinstitute.hellbender.utils.gcs.BucketUtils; 9 import org.broadinstitute.hellbender.utils.io.IOUtils; 10 import org.broadinstitute.hellbender.testutils.ArgumentsBuilder; 11 import org.broadinstitute.hellbender.testutils.IntegrationTestSpec; 12 import org.broadinstitute.hellbender.testutils.MiniClusterUtils; 13 import org.testng.Assert; 14 import org.testng.annotations.DataProvider; 15 import org.testng.annotations.Test; 16 17 import java.io.File; 18 import java.nio.file.Files; 19 import java.util.ArrayList; 20 import java.util.Collections; 21 import java.util.List; 22 23 24 public class ParallelCopyGCSDirectoryIntoHDFSSparkIntegrationTest extends CommandLineProgramTest { 25 26 @Override getTestedToolName()27 public String getTestedToolName() { 28 return ParallelCopyGCSDirectoryIntoHDFSSpark.class.getSimpleName(); 29 } 30 31 @Test(groups = {"spark", "bucket"}) testCopyLargeFile()32 public void testCopyLargeFile() throws Exception { 33 MiniDFSCluster cluster = null; 34 try { 35 final Configuration conf = new Configuration(); 36 // set the minicluster to have a very low block size so that we can test transferring a file in chunks without actually needing to move a big file 37 conf.set("dfs.blocksize", "1048576"); 38 cluster = MiniClusterUtils.getMiniCluster(conf); 39 40 // copy a multi-block file 41 final Path tempPath = MiniClusterUtils.getTempPath(cluster, "test", "dir"); 42 final String gcpInputPath = getGCPTestInputPath() + "huge/CEUTrio.HiSeq.WGS.b37.NA12878.chr1_4.bam.bai"; 43 String args = 44 "--" + ParallelCopyGCSDirectoryIntoHDFSSpark.INPUT_GCS_PATH_LONG_NAME + " " + gcpInputPath + 45 " --" + ParallelCopyGCSDirectoryIntoHDFSSpark.OUTPUT_HDFS_DIRECTORY_LONG_NAME + " " + tempPath; 46 ArgumentsBuilder ab = new ArgumentsBuilder().addRaw(args); 47 IntegrationTestSpec spec = new IntegrationTestSpec( 48 ab.getString(), 49 Collections.emptyList()); 50 spec.executeTest("testCopyLargeFile-" + args, this); 51 52 final long fileSizeOnGCS = Files.size(IOUtils.getPath(gcpInputPath)); 53 54 55 final String hdfsPath = tempPath + "/" + "CEUTrio.HiSeq.WGS.b37.NA12878.chr1_4.bam.bai"; 56 57 org.apache.hadoop.fs.Path outputHdfsDirectoryPath = new org.apache.hadoop.fs.Path(tempPath.toUri()); 58 59 try(FileSystem fs = outputHdfsDirectoryPath.getFileSystem(conf)) { 60 long chunkSize = ParallelCopyGCSDirectoryIntoHDFSSpark.getChunkSize(fs); 61 Assert.assertTrue(fileSizeOnGCS > chunkSize); 62 } 63 64 Assert.assertEquals(BucketUtils.fileSize(hdfsPath), 65 fileSizeOnGCS); 66 67 final File tempDir = createTempDir("ParallelCopy"); 68 69 BucketUtils.copyFile(hdfsPath, tempDir + "fileFromHDFS.bam.bai"); 70 Assert.assertEquals(Utils.calculateFileMD5(new File(tempDir + "fileFromHDFS.bam.bai")), "1a6baa5332e98ef1358ac0fb36f46aaf"); 71 } finally { 72 MiniClusterUtils.stopCluster(cluster); 73 } 74 } 75 76 @DataProvider(name = "directoryCopy") getDirectoryParams()77 public Object[][] getDirectoryParams() { 78 final String gcpInputPath = getGCPTestInputPath() + "parallel_copy/"; 79 final List<Object[]> tests = new ArrayList<>(); 80 tests.add(new Object[]{gcpInputPath, null, new String[] { "foo.txt", "bar.txt"}, new String[] { "d3b07384d113edec49eaa6238ad5ff00", "c157a79031e1c40f85931829bc5fc552"}}); 81 tests.add(new Object[]{gcpInputPath, "foo*", new String[] { "foo.txt" }, new String[] { "d3b07384d113edec49eaa6238ad5ff00" }}); 82 return tests.toArray(new Object[][]{}); 83 } 84 85 86 @Test(groups = {"spark", "bucket"}, dataProvider = "directoryCopy") testCopyDirectory(final String gcpInputPath, final String glob, final String[] expectedFilesCopied, final String[] expectedMD5s)87 public void testCopyDirectory(final String gcpInputPath, 88 final String glob, 89 final String[] expectedFilesCopied, 90 final String[] expectedMD5s) throws Exception { 91 MiniDFSCluster cluster = null; 92 try { 93 final Configuration conf = new Configuration(); 94 // set the minicluster to have a very low block size so that we can test transferring a file in chunks without actually needing to move a big file 95 conf.set("dfs.blocksize", "1048576"); 96 cluster = MiniClusterUtils.getMiniCluster(conf); 97 98 // copy a directory 99 final Path tempPath = MiniClusterUtils.getTempPath(cluster, "test", "dir"); 100 101 // directory contains two small files named foo.txt and bar.txt 102 103 String args = 104 "--" + ParallelCopyGCSDirectoryIntoHDFSSpark.INPUT_GCS_PATH_LONG_NAME + " " + gcpInputPath + 105 (glob == null ? "" : " --" + ParallelCopyGCSDirectoryIntoHDFSSpark.INPUT_GLOB + " " + glob) + 106 " --" + ParallelCopyGCSDirectoryIntoHDFSSpark.OUTPUT_HDFS_DIRECTORY_LONG_NAME + " " + tempPath; 107 ArgumentsBuilder ab = new ArgumentsBuilder().addRaw(args); 108 IntegrationTestSpec spec = new IntegrationTestSpec( 109 ab.getString(), 110 Collections.emptyList()); 111 spec.executeTest("testCopyDirectory-" + args, this); 112 113 org.apache.hadoop.fs.Path outputHdfsDirectoryPath = new org.apache.hadoop.fs.Path(tempPath.toUri()); 114 115 final File tempDir = createTempDir("ParallelCopyDir"); 116 117 int filesFound = 0; 118 try(FileSystem fs = outputHdfsDirectoryPath.getFileSystem(conf)) { 119 final RemoteIterator<LocatedFileStatus> hdfsCopies = fs.listFiles(outputHdfsDirectoryPath, false); 120 while (hdfsCopies.hasNext()) { 121 final FileStatus next = hdfsCopies.next(); 122 final Path path = next.getPath(); 123 BucketUtils.copyFile(path.toString(), tempDir + "/" + path.getName()); 124 filesFound ++; 125 } 126 } 127 128 Assert.assertEquals(filesFound, expectedFilesCopied.length); 129 130 for (int i = 0; i < expectedFilesCopied.length; i++) { 131 String fileName = expectedFilesCopied[i]; 132 String md5 = expectedMD5s[i]; 133 Assert.assertEquals(Utils.calculateFileMD5(new File(tempDir + "/" + fileName)), md5); 134 } 135 136 137 138 } finally { 139 MiniClusterUtils.stopCluster(cluster); 140 } 141 } 142 143 } 144