1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17
18 // "Narrative" test for S3. This must be run manually against a S3 endpoint.
19 // The test bucket must exist and be empty (you can use --clear to delete its
20 // contents).
21
22 #include <iostream>
23 #include <sstream>
24 #include <string>
25
26 #include <gflags/gflags.h>
27
28 #include "arrow/filesystem/s3fs.h"
29 #include "arrow/filesystem/test_util.h"
30 #include "arrow/io/interfaces.h"
31 #include "arrow/result.h"
32 #include "arrow/status.h"
33 #include "arrow/testing/gtest_util.h"
34 #include "arrow/util/logging.h"
35
36 DEFINE_bool(clear, false, "delete all bucket contents");
37 DEFINE_bool(test, false, "run narrative test against bucket");
38
39 DEFINE_bool(verbose, false, "be more verbose (includes AWS warnings)");
40 DEFINE_bool(debug, false, "be very verbose (includes AWS debug logs)");
41
42 DEFINE_string(access_key, "", "S3 access key");
43 DEFINE_string(secret_key, "", "S3 secret key");
44
45 DEFINE_string(bucket, "", "bucket name");
46 DEFINE_string(region, arrow::fs::kS3DefaultRegion, "AWS region");
47 DEFINE_string(endpoint, "", "Endpoint override (e.g. '127.0.0.1:9000')");
48 DEFINE_string(scheme, "https", "Connection scheme");
49
50 namespace arrow {
51 namespace fs {
52
53 #define ASSERT_RAISES_PRINT(context_msg, error_type, expr) \
54 do { \
55 auto _status_or_result = (expr); \
56 ASSERT_RAISES(error_type, _status_or_result); \
57 PrintError(context_msg, _status_or_result); \
58 } while (0)
59
MakeFileSystem()60 std::shared_ptr<FileSystem> MakeFileSystem() {
61 std::shared_ptr<S3FileSystem> s3fs;
62 S3Options options;
63 if (!FLAGS_access_key.empty()) {
64 options = S3Options::FromAccessKey(FLAGS_access_key, FLAGS_secret_key);
65 } else {
66 options = S3Options::Defaults();
67 }
68 options.endpoint_override = FLAGS_endpoint;
69 options.scheme = FLAGS_scheme;
70 options.region = FLAGS_region;
71 s3fs = S3FileSystem::Make(options).ValueOrDie();
72 return std::make_shared<SubTreeFileSystem>(FLAGS_bucket, s3fs);
73 }
74
PrintError(const std::string & context_msg,const Status & st)75 void PrintError(const std::string& context_msg, const Status& st) {
76 if (FLAGS_verbose) {
77 std::cout << "-- Error printout (" << context_msg << ") --\n"
78 << st.ToString() << std::endl;
79 }
80 }
81
82 template <typename T>
PrintError(const std::string & context_msg,const Result<T> & result)83 void PrintError(const std::string& context_msg, const Result<T>& result) {
84 PrintError(context_msg, result.status());
85 }
86
ClearBucket(int argc,char ** argv)87 void ClearBucket(int argc, char** argv) {
88 auto fs = MakeFileSystem();
89
90 ASSERT_OK(fs->DeleteDirContents(""));
91 }
92
TestBucket(int argc,char ** argv)93 void TestBucket(int argc, char** argv) {
94 auto fs = MakeFileSystem();
95 std::vector<FileInfo> infos;
96 FileSelector select;
97 std::shared_ptr<io::InputStream> is;
98 std::shared_ptr<io::RandomAccessFile> file;
99 std::shared_ptr<Buffer> buf;
100 Status status;
101
102 // Check bucket exists and is empty
103 select.base_dir = "";
104 select.allow_not_found = false;
105 select.recursive = false;
106 ASSERT_OK_AND_ASSIGN(infos, fs->GetFileInfo(select));
107 ASSERT_EQ(infos.size(), 0) << "Bucket should be empty, perhaps use --clear?";
108
109 // Create directory structure
110 ASSERT_OK(fs->CreateDir("EmptyDir", /*recursive=*/false));
111 ASSERT_OK(fs->CreateDir("Dir1", /*recursive=*/false));
112 ASSERT_OK(fs->CreateDir("Dir1/Subdir", /*recursive=*/false));
113 ASSERT_RAISES_PRINT("CreateDir in nonexistent parent", IOError,
114 fs->CreateDir("Dir2/Subdir", /*recursive=*/false));
115 ASSERT_OK(fs->CreateDir("Dir2/Subdir", /*recursive=*/true));
116 CreateFile(fs.get(), "File1", "first data");
117 CreateFile(fs.get(), "Dir1/File2", "second data");
118 CreateFile(fs.get(), "Dir2/Subdir/File3", "third data");
119
120 // GetFileInfo(Selector)
121 select.base_dir = "";
122 ASSERT_OK_AND_ASSIGN(infos, fs->GetFileInfo(select));
123 ASSERT_EQ(infos.size(), 4);
124 SortInfos(&infos);
125 AssertFileInfo(infos[0], "Dir1", FileType::Directory);
126 AssertFileInfo(infos[1], "Dir2", FileType::Directory);
127 AssertFileInfo(infos[2], "EmptyDir", FileType::Directory);
128 AssertFileInfo(infos[3], "File1", FileType::File, 10);
129
130 select.base_dir = "zzzz";
131 ASSERT_RAISES_PRINT("GetFileInfo(Selector) with nonexisting base_dir", IOError,
132 fs->GetFileInfo(select));
133 select.allow_not_found = true;
134 ASSERT_OK_AND_ASSIGN(infos, fs->GetFileInfo(select));
135 ASSERT_EQ(infos.size(), 0);
136
137 select.base_dir = "Dir1";
138 select.allow_not_found = false;
139 ASSERT_OK_AND_ASSIGN(infos, fs->GetFileInfo(select));
140 ASSERT_EQ(infos.size(), 2);
141 AssertFileInfo(infos[0], "Dir1/File2", FileType::File, 11);
142 AssertFileInfo(infos[1], "Dir1/Subdir", FileType::Directory);
143
144 select.base_dir = "Dir2";
145 select.recursive = true;
146 ASSERT_OK_AND_ASSIGN(infos, fs->GetFileInfo(select));
147 ASSERT_EQ(infos.size(), 2);
148 AssertFileInfo(infos[0], "Dir2/Subdir", FileType::Directory);
149 AssertFileInfo(infos[1], "Dir2/Subdir/File3", FileType::File, 10);
150
151 // Read a file
152 ASSERT_RAISES_PRINT("OpenInputStream with nonexistent file", IOError,
153 fs->OpenInputStream("zzz"));
154 ASSERT_OK_AND_ASSIGN(is, fs->OpenInputStream("File1"));
155 ASSERT_OK_AND_ASSIGN(buf, is->Read(5));
156 AssertBufferEqual(*buf, "first");
157 ASSERT_OK_AND_ASSIGN(buf, is->Read(10));
158 AssertBufferEqual(*buf, " data");
159 ASSERT_OK_AND_ASSIGN(buf, is->Read(10));
160 AssertBufferEqual(*buf, "");
161 ASSERT_OK(is->Close());
162
163 ASSERT_OK_AND_ASSIGN(file, fs->OpenInputFile("Dir1/File2"));
164 ASSERT_OK_AND_EQ(0, file->Tell());
165 ASSERT_OK(file->Seek(7));
166 ASSERT_OK_AND_EQ(7, file->Tell());
167 ASSERT_OK_AND_ASSIGN(buf, file->Read(2));
168 AssertBufferEqual(*buf, "da");
169 ASSERT_OK_AND_EQ(9, file->Tell());
170 ASSERT_OK_AND_ASSIGN(buf, file->ReadAt(2, 4));
171 AssertBufferEqual(*buf, "cond");
172 ASSERT_OK(file->Close());
173
174 // Copy a file
175 ASSERT_OK(fs->CopyFile("File1", "Dir2/File4"));
176 AssertFileInfo(fs.get(), "File1", FileType::File, 10);
177 AssertFileInfo(fs.get(), "Dir2/File4", FileType::File, 10);
178 AssertFileContents(fs.get(), "Dir2/File4", "first data");
179
180 // Copy a file over itself
181 ASSERT_OK(fs->CopyFile("File1", "File1"));
182 AssertFileInfo(fs.get(), "File1", FileType::File, 10);
183 AssertFileContents(fs.get(), "File1", "first data");
184
185 // Move a file
186 ASSERT_OK(fs->Move("Dir2/File4", "File5"));
187 AssertFileInfo(fs.get(), "Dir2/File4", FileType::NotFound);
188 AssertFileInfo(fs.get(), "File5", FileType::File, 10);
189 AssertFileContents(fs.get(), "File5", "first data");
190
191 // Move a file over itself
192 ASSERT_OK(fs->Move("File5", "File5"));
193 AssertFileInfo(fs.get(), "File5", FileType::File, 10);
194 AssertFileContents(fs.get(), "File5", "first data");
195 }
196
TestMain(int argc,char ** argv)197 void TestMain(int argc, char** argv) {
198 S3GlobalOptions options;
199 options.log_level = FLAGS_debug
200 ? S3LogLevel::Debug
201 : (FLAGS_verbose ? S3LogLevel::Warn : S3LogLevel::Fatal);
202 ASSERT_OK(InitializeS3(options));
203
204 if (FLAGS_clear) {
205 ClearBucket(argc, argv);
206 } else if (FLAGS_test) {
207 TestBucket(argc, argv);
208 }
209
210 ASSERT_OK(FinalizeS3());
211 }
212
213 } // namespace fs
214 } // namespace arrow
215
main(int argc,char ** argv)216 int main(int argc, char** argv) {
217 std::stringstream ss;
218 ss << "Narrative test for S3. Needs an initialized empty bucket.\n";
219 ss << "Usage: " << argv[0];
220 gflags::SetUsageMessage(ss.str());
221 gflags::ParseCommandLineFlags(&argc, &argv, true);
222
223 if (FLAGS_clear + FLAGS_test != 1) {
224 ARROW_LOG(ERROR) << "Need exactly one of --test and --clear";
225 return 2;
226 }
227 if (FLAGS_bucket.empty()) {
228 ARROW_LOG(ERROR) << "--bucket is mandatory";
229 return 2;
230 }
231
232 arrow::fs::TestMain(argc, argv);
233 if (::testing::Test::HasFatalFailure() || ::testing::Test::HasNonfatalFailure()) {
234 return 1;
235 } else {
236 std::cout << "Ok" << std::endl;
237 return 0;
238 }
239 }
240