1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements.  See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership.  The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License.  You may obtain a copy of the License at
8 //
9 //   http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied.  See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 // "Narrative" test for S3.  This must be run manually against a S3 endpoint.
19 // The test bucket must exist and be empty (you can use --clear to delete its
20 // contents).
21 
22 #include <iostream>
23 #include <sstream>
24 #include <string>
25 
26 #include <gflags/gflags.h>
27 
28 #include "arrow/filesystem/s3fs.h"
29 #include "arrow/filesystem/test_util.h"
30 #include "arrow/io/interfaces.h"
31 #include "arrow/result.h"
32 #include "arrow/status.h"
33 #include "arrow/testing/gtest_util.h"
34 #include "arrow/util/logging.h"
35 
36 DEFINE_bool(clear, false, "delete all bucket contents");
37 DEFINE_bool(test, false, "run narrative test against bucket");
38 
39 DEFINE_bool(verbose, false, "be more verbose (includes AWS warnings)");
40 DEFINE_bool(debug, false, "be very verbose (includes AWS debug logs)");
41 
42 DEFINE_string(access_key, "", "S3 access key");
43 DEFINE_string(secret_key, "", "S3 secret key");
44 
45 DEFINE_string(bucket, "", "bucket name");
46 DEFINE_string(region, arrow::fs::kS3DefaultRegion, "AWS region");
47 DEFINE_string(endpoint, "", "Endpoint override (e.g. '127.0.0.1:9000')");
48 DEFINE_string(scheme, "https", "Connection scheme");
49 
50 namespace arrow {
51 namespace fs {
52 
53 #define ASSERT_RAISES_PRINT(context_msg, error_type, expr) \
54   do {                                                     \
55     auto _status_or_result = (expr);                       \
56     ASSERT_RAISES(error_type, _status_or_result);          \
57     PrintError(context_msg, _status_or_result);            \
58   } while (0)
59 
MakeFileSystem()60 std::shared_ptr<FileSystem> MakeFileSystem() {
61   std::shared_ptr<S3FileSystem> s3fs;
62   S3Options options;
63   if (!FLAGS_access_key.empty()) {
64     options = S3Options::FromAccessKey(FLAGS_access_key, FLAGS_secret_key);
65   } else {
66     options = S3Options::Defaults();
67   }
68   options.endpoint_override = FLAGS_endpoint;
69   options.scheme = FLAGS_scheme;
70   options.region = FLAGS_region;
71   s3fs = S3FileSystem::Make(options).ValueOrDie();
72   return std::make_shared<SubTreeFileSystem>(FLAGS_bucket, s3fs);
73 }
74 
PrintError(const std::string & context_msg,const Status & st)75 void PrintError(const std::string& context_msg, const Status& st) {
76   if (FLAGS_verbose) {
77     std::cout << "-- Error printout (" << context_msg << ") --\n"
78               << st.ToString() << std::endl;
79   }
80 }
81 
82 template <typename T>
PrintError(const std::string & context_msg,const Result<T> & result)83 void PrintError(const std::string& context_msg, const Result<T>& result) {
84   PrintError(context_msg, result.status());
85 }
86 
ClearBucket(int argc,char ** argv)87 void ClearBucket(int argc, char** argv) {
88   auto fs = MakeFileSystem();
89 
90   ASSERT_OK(fs->DeleteDirContents(""));
91 }
92 
TestBucket(int argc,char ** argv)93 void TestBucket(int argc, char** argv) {
94   auto fs = MakeFileSystem();
95   std::vector<FileInfo> infos;
96   FileSelector select;
97   std::shared_ptr<io::InputStream> is;
98   std::shared_ptr<io::RandomAccessFile> file;
99   std::shared_ptr<Buffer> buf;
100   Status status;
101 
102   // Check bucket exists and is empty
103   select.base_dir = "";
104   select.allow_not_found = false;
105   select.recursive = false;
106   ASSERT_OK_AND_ASSIGN(infos, fs->GetFileInfo(select));
107   ASSERT_EQ(infos.size(), 0) << "Bucket should be empty, perhaps use --clear?";
108 
109   // Create directory structure
110   ASSERT_OK(fs->CreateDir("EmptyDir", /*recursive=*/false));
111   ASSERT_OK(fs->CreateDir("Dir1", /*recursive=*/false));
112   ASSERT_OK(fs->CreateDir("Dir1/Subdir", /*recursive=*/false));
113   ASSERT_RAISES_PRINT("CreateDir in nonexistent parent", IOError,
114                       fs->CreateDir("Dir2/Subdir", /*recursive=*/false));
115   ASSERT_OK(fs->CreateDir("Dir2/Subdir", /*recursive=*/true));
116   CreateFile(fs.get(), "File1", "first data");
117   CreateFile(fs.get(), "Dir1/File2", "second data");
118   CreateFile(fs.get(), "Dir2/Subdir/File3", "third data");
119 
120   // GetFileInfo(Selector)
121   select.base_dir = "";
122   ASSERT_OK_AND_ASSIGN(infos, fs->GetFileInfo(select));
123   ASSERT_EQ(infos.size(), 4);
124   SortInfos(&infos);
125   AssertFileInfo(infos[0], "Dir1", FileType::Directory);
126   AssertFileInfo(infos[1], "Dir2", FileType::Directory);
127   AssertFileInfo(infos[2], "EmptyDir", FileType::Directory);
128   AssertFileInfo(infos[3], "File1", FileType::File, 10);
129 
130   select.base_dir = "zzzz";
131   ASSERT_RAISES_PRINT("GetFileInfo(Selector) with nonexisting base_dir", IOError,
132                       fs->GetFileInfo(select));
133   select.allow_not_found = true;
134   ASSERT_OK_AND_ASSIGN(infos, fs->GetFileInfo(select));
135   ASSERT_EQ(infos.size(), 0);
136 
137   select.base_dir = "Dir1";
138   select.allow_not_found = false;
139   ASSERT_OK_AND_ASSIGN(infos, fs->GetFileInfo(select));
140   ASSERT_EQ(infos.size(), 2);
141   AssertFileInfo(infos[0], "Dir1/File2", FileType::File, 11);
142   AssertFileInfo(infos[1], "Dir1/Subdir", FileType::Directory);
143 
144   select.base_dir = "Dir2";
145   select.recursive = true;
146   ASSERT_OK_AND_ASSIGN(infos, fs->GetFileInfo(select));
147   ASSERT_EQ(infos.size(), 2);
148   AssertFileInfo(infos[0], "Dir2/Subdir", FileType::Directory);
149   AssertFileInfo(infos[1], "Dir2/Subdir/File3", FileType::File, 10);
150 
151   // Read a file
152   ASSERT_RAISES_PRINT("OpenInputStream with nonexistent file", IOError,
153                       fs->OpenInputStream("zzz"));
154   ASSERT_OK_AND_ASSIGN(is, fs->OpenInputStream("File1"));
155   ASSERT_OK_AND_ASSIGN(buf, is->Read(5));
156   AssertBufferEqual(*buf, "first");
157   ASSERT_OK_AND_ASSIGN(buf, is->Read(10));
158   AssertBufferEqual(*buf, " data");
159   ASSERT_OK_AND_ASSIGN(buf, is->Read(10));
160   AssertBufferEqual(*buf, "");
161   ASSERT_OK(is->Close());
162 
163   ASSERT_OK_AND_ASSIGN(file, fs->OpenInputFile("Dir1/File2"));
164   ASSERT_OK_AND_EQ(0, file->Tell());
165   ASSERT_OK(file->Seek(7));
166   ASSERT_OK_AND_EQ(7, file->Tell());
167   ASSERT_OK_AND_ASSIGN(buf, file->Read(2));
168   AssertBufferEqual(*buf, "da");
169   ASSERT_OK_AND_EQ(9, file->Tell());
170   ASSERT_OK_AND_ASSIGN(buf, file->ReadAt(2, 4));
171   AssertBufferEqual(*buf, "cond");
172   ASSERT_OK(file->Close());
173 
174   // Copy a file
175   ASSERT_OK(fs->CopyFile("File1", "Dir2/File4"));
176   AssertFileInfo(fs.get(), "File1", FileType::File, 10);
177   AssertFileInfo(fs.get(), "Dir2/File4", FileType::File, 10);
178   AssertFileContents(fs.get(), "Dir2/File4", "first data");
179 
180   // Copy a file over itself
181   ASSERT_OK(fs->CopyFile("File1", "File1"));
182   AssertFileInfo(fs.get(), "File1", FileType::File, 10);
183   AssertFileContents(fs.get(), "File1", "first data");
184 
185   // Move a file
186   ASSERT_OK(fs->Move("Dir2/File4", "File5"));
187   AssertFileInfo(fs.get(), "Dir2/File4", FileType::NotFound);
188   AssertFileInfo(fs.get(), "File5", FileType::File, 10);
189   AssertFileContents(fs.get(), "File5", "first data");
190 
191   // Move a file over itself
192   ASSERT_OK(fs->Move("File5", "File5"));
193   AssertFileInfo(fs.get(), "File5", FileType::File, 10);
194   AssertFileContents(fs.get(), "File5", "first data");
195 }
196 
TestMain(int argc,char ** argv)197 void TestMain(int argc, char** argv) {
198   S3GlobalOptions options;
199   options.log_level = FLAGS_debug
200                           ? S3LogLevel::Debug
201                           : (FLAGS_verbose ? S3LogLevel::Warn : S3LogLevel::Fatal);
202   ASSERT_OK(InitializeS3(options));
203 
204   if (FLAGS_clear) {
205     ClearBucket(argc, argv);
206   } else if (FLAGS_test) {
207     TestBucket(argc, argv);
208   }
209 
210   ASSERT_OK(FinalizeS3());
211 }
212 
213 }  // namespace fs
214 }  // namespace arrow
215 
main(int argc,char ** argv)216 int main(int argc, char** argv) {
217   std::stringstream ss;
218   ss << "Narrative test for S3.  Needs an initialized empty bucket.\n";
219   ss << "Usage: " << argv[0];
220   gflags::SetUsageMessage(ss.str());
221   gflags::ParseCommandLineFlags(&argc, &argv, true);
222 
223   if (FLAGS_clear + FLAGS_test != 1) {
224     ARROW_LOG(ERROR) << "Need exactly one of --test and --clear";
225     return 2;
226   }
227   if (FLAGS_bucket.empty()) {
228     ARROW_LOG(ERROR) << "--bucket is mandatory";
229     return 2;
230   }
231 
232   arrow::fs::TestMain(argc, argv);
233   if (::testing::Test::HasFatalFailure() || ::testing::Test::HasNonfatalFailure()) {
234     return 1;
235   } else {
236     std::cout << "Ok" << std::endl;
237     return 0;
238   }
239 }
240