1 package org.broadinstitute.hellbender.tools.spark;
2 
3 import org.apache.hadoop.conf.Configuration;
4 import org.apache.hadoop.fs.*;
5 import org.apache.hadoop.hdfs.MiniDFSCluster;
6 import org.broadinstitute.hellbender.CommandLineProgramTest;
7 import org.broadinstitute.hellbender.utils.Utils;
8 import org.broadinstitute.hellbender.utils.gcs.BucketUtils;
9 import org.broadinstitute.hellbender.utils.io.IOUtils;
10 import org.broadinstitute.hellbender.testutils.ArgumentsBuilder;
11 import org.broadinstitute.hellbender.testutils.IntegrationTestSpec;
12 import org.broadinstitute.hellbender.testutils.MiniClusterUtils;
13 import org.testng.Assert;
14 import org.testng.annotations.DataProvider;
15 import org.testng.annotations.Test;
16 
17 import java.io.File;
18 import java.nio.file.Files;
19 import java.util.ArrayList;
20 import java.util.Collections;
21 import java.util.List;
22 
23 
24 public class ParallelCopyGCSDirectoryIntoHDFSSparkIntegrationTest extends CommandLineProgramTest {
25 
26     @Override
getTestedToolName()27     public String getTestedToolName() {
28         return ParallelCopyGCSDirectoryIntoHDFSSpark.class.getSimpleName();
29     }
30 
31     @Test(groups = {"spark", "bucket"})
testCopyLargeFile()32     public void testCopyLargeFile() throws Exception {
33         MiniDFSCluster cluster = null;
34         try {
35             final Configuration conf = new Configuration();
36             // set the minicluster to have a very low block size so that we can test transferring a file in chunks without actually needing to move a big file
37             conf.set("dfs.blocksize", "1048576");
38             cluster = MiniClusterUtils.getMiniCluster(conf);
39 
40             // copy a multi-block file
41             final Path tempPath = MiniClusterUtils.getTempPath(cluster, "test", "dir");
42             final String gcpInputPath = getGCPTestInputPath() + "huge/CEUTrio.HiSeq.WGS.b37.NA12878.chr1_4.bam.bai";
43             String args =
44                     "--" + ParallelCopyGCSDirectoryIntoHDFSSpark.INPUT_GCS_PATH_LONG_NAME + " " + gcpInputPath +
45                             " --" + ParallelCopyGCSDirectoryIntoHDFSSpark.OUTPUT_HDFS_DIRECTORY_LONG_NAME + " " + tempPath;
46             ArgumentsBuilder ab = new ArgumentsBuilder().addRaw(args);
47             IntegrationTestSpec spec = new IntegrationTestSpec(
48                     ab.getString(),
49                     Collections.emptyList());
50             spec.executeTest("testCopyLargeFile-" + args, this);
51 
52             final long fileSizeOnGCS = Files.size(IOUtils.getPath(gcpInputPath));
53 
54 
55             final String hdfsPath = tempPath + "/" + "CEUTrio.HiSeq.WGS.b37.NA12878.chr1_4.bam.bai";
56 
57             org.apache.hadoop.fs.Path outputHdfsDirectoryPath = new org.apache.hadoop.fs.Path(tempPath.toUri());
58 
59             try(FileSystem fs = outputHdfsDirectoryPath.getFileSystem(conf)) {
60                 long chunkSize = ParallelCopyGCSDirectoryIntoHDFSSpark.getChunkSize(fs);
61                 Assert.assertTrue(fileSizeOnGCS > chunkSize);
62             }
63 
64             Assert.assertEquals(BucketUtils.fileSize(hdfsPath),
65                     fileSizeOnGCS);
66 
67             final File tempDir = createTempDir("ParallelCopy");
68 
69             BucketUtils.copyFile(hdfsPath, tempDir + "fileFromHDFS.bam.bai");
70             Assert.assertEquals(Utils.calculateFileMD5(new File(tempDir + "fileFromHDFS.bam.bai")), "1a6baa5332e98ef1358ac0fb36f46aaf");
71         } finally {
72             MiniClusterUtils.stopCluster(cluster);
73         }
74     }
75 
76     @DataProvider(name = "directoryCopy")
getDirectoryParams()77     public Object[][] getDirectoryParams() {
78         final String gcpInputPath = getGCPTestInputPath() + "parallel_copy/";
79         final List<Object[]> tests = new ArrayList<>();
80         tests.add(new Object[]{gcpInputPath, null, new String[] { "foo.txt", "bar.txt"}, new String[] { "d3b07384d113edec49eaa6238ad5ff00", "c157a79031e1c40f85931829bc5fc552"}});
81         tests.add(new Object[]{gcpInputPath, "foo*", new String[] { "foo.txt" }, new String[] { "d3b07384d113edec49eaa6238ad5ff00" }});
82         return tests.toArray(new Object[][]{});
83     }
84 
85 
86     @Test(groups = {"spark", "bucket"}, dataProvider = "directoryCopy")
testCopyDirectory(final String gcpInputPath, final String glob, final String[] expectedFilesCopied, final String[] expectedMD5s)87     public void testCopyDirectory(final String gcpInputPath,
88                                   final String glob,
89                                   final String[] expectedFilesCopied,
90                                   final String[] expectedMD5s) throws Exception {
91         MiniDFSCluster cluster = null;
92         try {
93             final Configuration conf = new Configuration();
94             // set the minicluster to have a very low block size so that we can test transferring a file in chunks without actually needing to move a big file
95             conf.set("dfs.blocksize", "1048576");
96             cluster = MiniClusterUtils.getMiniCluster(conf);
97 
98             // copy a directory
99             final Path tempPath = MiniClusterUtils.getTempPath(cluster, "test", "dir");
100 
101             // directory contains two small files named foo.txt and bar.txt
102 
103             String args =
104                     "--" + ParallelCopyGCSDirectoryIntoHDFSSpark.INPUT_GCS_PATH_LONG_NAME + " " + gcpInputPath +
105                             (glob == null ? "" : " --" + ParallelCopyGCSDirectoryIntoHDFSSpark.INPUT_GLOB + " " + glob) +
106                             " --" + ParallelCopyGCSDirectoryIntoHDFSSpark.OUTPUT_HDFS_DIRECTORY_LONG_NAME + " " + tempPath;
107             ArgumentsBuilder ab = new ArgumentsBuilder().addRaw(args);
108             IntegrationTestSpec spec = new IntegrationTestSpec(
109                     ab.getString(),
110                     Collections.emptyList());
111             spec.executeTest("testCopyDirectory-" + args, this);
112 
113             org.apache.hadoop.fs.Path outputHdfsDirectoryPath = new org.apache.hadoop.fs.Path(tempPath.toUri());
114 
115             final File tempDir = createTempDir("ParallelCopyDir");
116 
117             int filesFound = 0;
118             try(FileSystem fs = outputHdfsDirectoryPath.getFileSystem(conf)) {
119                 final RemoteIterator<LocatedFileStatus> hdfsCopies = fs.listFiles(outputHdfsDirectoryPath, false);
120                 while (hdfsCopies.hasNext()) {
121                     final FileStatus next =  hdfsCopies.next();
122                     final Path path = next.getPath();
123                     BucketUtils.copyFile(path.toString(), tempDir + "/" + path.getName());
124                     filesFound ++;
125                 }
126             }
127 
128             Assert.assertEquals(filesFound, expectedFilesCopied.length);
129 
130             for (int i = 0; i < expectedFilesCopied.length; i++) {
131                 String fileName = expectedFilesCopied[i];
132                 String md5 = expectedMD5s[i];
133                 Assert.assertEquals(Utils.calculateFileMD5(new File(tempDir + "/" + fileName)), md5);
134             }
135 
136 
137 
138         } finally {
139             MiniClusterUtils.stopCluster(cluster);
140         }
141     }
142 
143 }
144