1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17
18 #include "./arrow_types.h"
19
20 #if defined(ARROW_R_WITH_ARROW)
21
22 #include <arrow/filesystem/filesystem.h>
23 #include <arrow/filesystem/localfs.h>
24
25 namespace fs = ::arrow::fs;
26 namespace io = ::arrow::io;
27
28 namespace cpp11 {
29
get(const std::shared_ptr<fs::FileSystem> & file_system)30 const char* r6_class_name<fs::FileSystem>::get(
31 const std::shared_ptr<fs::FileSystem>& file_system) {
32 auto type_name = file_system->type_name();
33
34 if (type_name == "local") {
35 return "LocalFileSystem";
36 } else if (type_name == "s3") {
37 return "S3FileSystem";
38 } else if (type_name == "subtree") {
39 return "SubTreeFileSystem";
40 } else {
41 return "FileSystem";
42 }
43 }
44
45 } // namespace cpp11
46
47 // [[arrow::export]]
fs___FileInfo__type(const std::shared_ptr<fs::FileInfo> & x)48 fs::FileType fs___FileInfo__type(const std::shared_ptr<fs::FileInfo>& x) {
49 return x->type();
50 }
51
52 // [[arrow::export]]
fs___FileInfo__set_type(const std::shared_ptr<fs::FileInfo> & x,fs::FileType type)53 void fs___FileInfo__set_type(const std::shared_ptr<fs::FileInfo>& x, fs::FileType type) {
54 x->set_type(type);
55 }
56
57 // [[arrow::export]]
fs___FileInfo__path(const std::shared_ptr<fs::FileInfo> & x)58 std::string fs___FileInfo__path(const std::shared_ptr<fs::FileInfo>& x) {
59 return x->path();
60 }
61
62 // [[arrow::export]]
fs___FileInfo__set_path(const std::shared_ptr<fs::FileInfo> & x,const std::string & path)63 void fs___FileInfo__set_path(const std::shared_ptr<fs::FileInfo>& x,
64 const std::string& path) {
65 x->set_path(path);
66 }
67
68 // [[arrow::export]]
fs___FileInfo__size(const std::shared_ptr<fs::FileInfo> & x)69 int64_t fs___FileInfo__size(const std::shared_ptr<fs::FileInfo>& x) { return x->size(); }
70
71 // [[arrow::export]]
fs___FileInfo__set_size(const std::shared_ptr<fs::FileInfo> & x,int64_t size)72 void fs___FileInfo__set_size(const std::shared_ptr<fs::FileInfo>& x, int64_t size) {
73 x->set_size(size);
74 }
75
76 // [[arrow::export]]
fs___FileInfo__base_name(const std::shared_ptr<fs::FileInfo> & x)77 std::string fs___FileInfo__base_name(const std::shared_ptr<fs::FileInfo>& x) {
78 return x->base_name();
79 }
80
81 // [[arrow::export]]
fs___FileInfo__extension(const std::shared_ptr<fs::FileInfo> & x)82 std::string fs___FileInfo__extension(const std::shared_ptr<fs::FileInfo>& x) {
83 return x->extension();
84 }
85
86 // [[arrow::export]]
fs___FileInfo__mtime(const std::shared_ptr<fs::FileInfo> & x)87 SEXP fs___FileInfo__mtime(const std::shared_ptr<fs::FileInfo>& x) {
88 SEXP res = PROTECT(Rf_allocVector(REALSXP, 1));
89 // .mtime() gets us nanoseconds since epoch, POSIXct is seconds since epoch as a double
90 REAL(res)[0] = static_cast<double>(x->mtime().time_since_epoch().count()) / 1000000000;
91 Rf_classgets(res, arrow::r::data::classes_POSIXct);
92 UNPROTECT(1);
93 return res;
94 }
95
96 // [[arrow::export]]
fs___FileInfo__set_mtime(const std::shared_ptr<fs::FileInfo> & x,SEXP time)97 void fs___FileInfo__set_mtime(const std::shared_ptr<fs::FileInfo>& x, SEXP time) {
98 auto nanosecs =
99 std::chrono::nanoseconds(static_cast<int64_t>(REAL(time)[0] * 1000000000));
100 x->set_mtime(fs::TimePoint(nanosecs));
101 }
102
103 // Selector
104
105 // [[arrow::export]]
fs___FileSelector__base_dir(const std::shared_ptr<fs::FileSelector> & selector)106 std::string fs___FileSelector__base_dir(
107 const std::shared_ptr<fs::FileSelector>& selector) {
108 return selector->base_dir;
109 }
110
111 // [[arrow::export]]
fs___FileSelector__allow_not_found(const std::shared_ptr<fs::FileSelector> & selector)112 bool fs___FileSelector__allow_not_found(
113 const std::shared_ptr<fs::FileSelector>& selector) {
114 return selector->allow_not_found;
115 }
116
117 // [[arrow::export]]
fs___FileSelector__recursive(const std::shared_ptr<fs::FileSelector> & selector)118 bool fs___FileSelector__recursive(const std::shared_ptr<fs::FileSelector>& selector) {
119 return selector->recursive;
120 }
121
122 // [[arrow::export]]
fs___FileSelector__create(const std::string & base_dir,bool allow_not_found,bool recursive)123 std::shared_ptr<fs::FileSelector> fs___FileSelector__create(const std::string& base_dir,
124 bool allow_not_found,
125 bool recursive) {
126 auto selector = std::make_shared<fs::FileSelector>();
127 selector->base_dir = base_dir;
128 selector->allow_not_found = allow_not_found;
129 selector->recursive = recursive;
130 return selector;
131 }
132
133 // FileSystem
134
135 template <typename T>
shared_ptr_vector(const std::vector<T> & vec)136 std::vector<std::shared_ptr<T>> shared_ptr_vector(const std::vector<T>& vec) {
137 std::vector<std::shared_ptr<fs::FileInfo>> res(vec.size());
138 std::transform(vec.begin(), vec.end(), res.begin(),
139 [](const fs::FileInfo& x) { return std::make_shared<fs::FileInfo>(x); });
140 return res;
141 }
142
143 // [[arrow::export]]
fs___FileSystem__GetTargetInfos_Paths(const std::shared_ptr<fs::FileSystem> & file_system,const std::vector<std::string> & paths)144 cpp11::list fs___FileSystem__GetTargetInfos_Paths(
145 const std::shared_ptr<fs::FileSystem>& file_system,
146 const std::vector<std::string>& paths) {
147 auto results = ValueOrStop(file_system->GetFileInfo(paths));
148 return arrow::r::to_r_list(shared_ptr_vector(results));
149 }
150
151 // [[arrow::export]]
fs___FileSystem__GetTargetInfos_FileSelector(const std::shared_ptr<fs::FileSystem> & file_system,const std::shared_ptr<fs::FileSelector> & selector)152 cpp11::list fs___FileSystem__GetTargetInfos_FileSelector(
153 const std::shared_ptr<fs::FileSystem>& file_system,
154 const std::shared_ptr<fs::FileSelector>& selector) {
155 auto results = ValueOrStop(file_system->GetFileInfo(*selector));
156
157 return arrow::r::to_r_list(shared_ptr_vector(results));
158 }
159
160 // [[arrow::export]]
fs___FileSystem__CreateDir(const std::shared_ptr<fs::FileSystem> & file_system,const std::string & path,bool recursive)161 void fs___FileSystem__CreateDir(const std::shared_ptr<fs::FileSystem>& file_system,
162 const std::string& path, bool recursive) {
163 StopIfNotOk(file_system->CreateDir(path, recursive));
164 }
165
166 // [[arrow::export]]
fs___FileSystem__DeleteDir(const std::shared_ptr<fs::FileSystem> & file_system,const std::string & path)167 void fs___FileSystem__DeleteDir(const std::shared_ptr<fs::FileSystem>& file_system,
168 const std::string& path) {
169 StopIfNotOk(file_system->DeleteDir(path));
170 }
171
172 // [[arrow::export]]
fs___FileSystem__DeleteDirContents(const std::shared_ptr<fs::FileSystem> & file_system,const std::string & path)173 void fs___FileSystem__DeleteDirContents(
174 const std::shared_ptr<fs::FileSystem>& file_system, const std::string& path) {
175 StopIfNotOk(file_system->DeleteDirContents(path));
176 }
177
178 // [[arrow::export]]
fs___FileSystem__DeleteFile(const std::shared_ptr<fs::FileSystem> & file_system,const std::string & path)179 void fs___FileSystem__DeleteFile(const std::shared_ptr<fs::FileSystem>& file_system,
180 const std::string& path) {
181 StopIfNotOk(file_system->DeleteFile(path));
182 }
183
184 // [[arrow::export]]
fs___FileSystem__DeleteFiles(const std::shared_ptr<fs::FileSystem> & file_system,const std::vector<std::string> & paths)185 void fs___FileSystem__DeleteFiles(const std::shared_ptr<fs::FileSystem>& file_system,
186 const std::vector<std::string>& paths) {
187 StopIfNotOk(file_system->DeleteFiles(paths));
188 }
189
190 // [[arrow::export]]
fs___FileSystem__Move(const std::shared_ptr<fs::FileSystem> & file_system,const std::string & src,const std::string & dest)191 void fs___FileSystem__Move(const std::shared_ptr<fs::FileSystem>& file_system,
192 const std::string& src, const std::string& dest) {
193 StopIfNotOk(file_system->Move(src, dest));
194 }
195
196 // [[arrow::export]]
fs___FileSystem__CopyFile(const std::shared_ptr<fs::FileSystem> & file_system,const std::string & src,const std::string & dest)197 void fs___FileSystem__CopyFile(const std::shared_ptr<fs::FileSystem>& file_system,
198 const std::string& src, const std::string& dest) {
199 StopIfNotOk(file_system->CopyFile(src, dest));
200 }
201
202 // [[arrow::export]]
fs___FileSystem__OpenInputStream(const std::shared_ptr<fs::FileSystem> & file_system,const std::string & path)203 std::shared_ptr<arrow::io::InputStream> fs___FileSystem__OpenInputStream(
204 const std::shared_ptr<fs::FileSystem>& file_system, const std::string& path) {
205 return ValueOrStop(file_system->OpenInputStream(path));
206 }
207
208 // [[arrow::export]]
fs___FileSystem__OpenInputFile(const std::shared_ptr<fs::FileSystem> & file_system,const std::string & path)209 std::shared_ptr<arrow::io::RandomAccessFile> fs___FileSystem__OpenInputFile(
210 const std::shared_ptr<fs::FileSystem>& file_system, const std::string& path) {
211 return ValueOrStop(file_system->OpenInputFile(path));
212 }
213
214 // [[arrow::export]]
fs___FileSystem__OpenOutputStream(const std::shared_ptr<fs::FileSystem> & file_system,const std::string & path)215 std::shared_ptr<arrow::io::OutputStream> fs___FileSystem__OpenOutputStream(
216 const std::shared_ptr<fs::FileSystem>& file_system, const std::string& path) {
217 return ValueOrStop(file_system->OpenOutputStream(path));
218 }
219
220 // [[arrow::export]]
fs___FileSystem__OpenAppendStream(const std::shared_ptr<fs::FileSystem> & file_system,const std::string & path)221 std::shared_ptr<arrow::io::OutputStream> fs___FileSystem__OpenAppendStream(
222 const std::shared_ptr<fs::FileSystem>& file_system, const std::string& path) {
223 return ValueOrStop(file_system->OpenAppendStream(path));
224 }
225
226 // [[arrow::export]]
fs___FileSystem__type_name(const std::shared_ptr<fs::FileSystem> & file_system)227 std::string fs___FileSystem__type_name(
228 const std::shared_ptr<fs::FileSystem>& file_system) {
229 return file_system->type_name();
230 }
231
232 // [[arrow::export]]
fs___LocalFileSystem__create()233 std::shared_ptr<fs::LocalFileSystem> fs___LocalFileSystem__create() {
234 // Affects OpenInputFile/OpenInputStream
235 auto io_context = arrow::io::IOContext(gc_memory_pool());
236 return std::make_shared<fs::LocalFileSystem>(io_context);
237 }
238
239 // [[arrow::export]]
fs___SubTreeFileSystem__create(const std::string & base_path,const std::shared_ptr<fs::FileSystem> & base_fs)240 std::shared_ptr<fs::SubTreeFileSystem> fs___SubTreeFileSystem__create(
241 const std::string& base_path, const std::shared_ptr<fs::FileSystem>& base_fs) {
242 return std::make_shared<fs::SubTreeFileSystem>(base_path, base_fs);
243 }
244
245 // [[arrow::export]]
fs___SubTreeFileSystem__base_fs(const std::shared_ptr<fs::SubTreeFileSystem> & file_system)246 std::shared_ptr<fs::FileSystem> fs___SubTreeFileSystem__base_fs(
247 const std::shared_ptr<fs::SubTreeFileSystem>& file_system) {
248 return file_system->base_fs();
249 }
250
251 // [[arrow::export]]
fs___SubTreeFileSystem__base_path(const std::shared_ptr<fs::SubTreeFileSystem> & file_system)252 std::string fs___SubTreeFileSystem__base_path(
253 const std::shared_ptr<fs::SubTreeFileSystem>& file_system) {
254 return file_system->base_path();
255 }
256
257 // [[arrow::export]]
fs___FileSystemFromUri(const std::string & path)258 cpp11::writable::list fs___FileSystemFromUri(const std::string& path) {
259 using cpp11::literals::operator"" _nm;
260
261 std::string out_path;
262 return cpp11::writable::list(
263 {"fs"_nm = cpp11::to_r6(ValueOrStop(fs::FileSystemFromUri(path, &out_path))),
264 "path"_nm = out_path});
265 }
266
267 // [[arrow::export]]
fs___CopyFiles(const std::shared_ptr<fs::FileSystem> & source_fs,const std::shared_ptr<fs::FileSelector> & source_sel,const std::shared_ptr<fs::FileSystem> & destination_fs,const std::string & destination_base_dir,int64_t chunk_size=1024* 1024,bool use_threads=true)268 void fs___CopyFiles(const std::shared_ptr<fs::FileSystem>& source_fs,
269 const std::shared_ptr<fs::FileSelector>& source_sel,
270 const std::shared_ptr<fs::FileSystem>& destination_fs,
271 const std::string& destination_base_dir,
272 int64_t chunk_size = 1024 * 1024, bool use_threads = true) {
273 StopIfNotOk(fs::CopyFiles(source_fs, *source_sel, destination_fs, destination_base_dir,
274 io::default_io_context(), chunk_size, use_threads));
275 }
276
277 #endif
278
279 #if defined(ARROW_R_WITH_S3)
280
281 #include <arrow/filesystem/s3fs.h>
282
283 // [[s3::export]]
fs___S3FileSystem__create(bool anonymous=false,std::string access_key="",std::string secret_key="",std::string session_token="",std::string role_arn="",std::string session_name="",std::string external_id="",int load_frequency=900,std::string region="",std::string endpoint_override="",std::string scheme="",bool background_writes=true)284 std::shared_ptr<fs::S3FileSystem> fs___S3FileSystem__create(
285 bool anonymous = false, std::string access_key = "", std::string secret_key = "",
286 std::string session_token = "", std::string role_arn = "",
287 std::string session_name = "", std::string external_id = "", int load_frequency = 900,
288 std::string region = "", std::string endpoint_override = "", std::string scheme = "",
289 bool background_writes = true) {
290 fs::S3Options s3_opts;
291 // Handle auth (anonymous, keys, default)
292 // (validation/internal coherence handled in R)
293 if (anonymous) {
294 s3_opts = fs::S3Options::Anonymous();
295 } else if (access_key != "" && secret_key != "") {
296 s3_opts = fs::S3Options::FromAccessKey(access_key, secret_key, session_token);
297 } else if (role_arn != "") {
298 s3_opts = fs::S3Options::FromAssumeRole(role_arn, session_name, external_id,
299 load_frequency);
300 } else {
301 s3_opts = fs::S3Options::Defaults();
302 }
303
304 // Now handle the rest of the options
305 /// AWS region to connect to (default determined by AWS SDK)
306 if (region != "") {
307 s3_opts.region = region;
308 }
309 /// If non-empty, override region with a connect string such as "localhost:9000"
310 s3_opts.endpoint_override = endpoint_override;
311 /// S3 connection transport, default "https"
312 if (scheme != "") {
313 s3_opts.scheme = scheme;
314 }
315 /// Whether OutputStream writes will be issued in the background, without blocking
316 /// default true
317 s3_opts.background_writes = background_writes;
318
319 StopIfNotOk(fs::EnsureS3Initialized());
320 auto io_context = arrow::io::IOContext(gc_memory_pool());
321 return ValueOrStop(fs::S3FileSystem::Make(s3_opts, io_context));
322 }
323
324 // [[s3::export]]
fs___S3FileSystem__region(const std::shared_ptr<fs::S3FileSystem> & fs)325 std::string fs___S3FileSystem__region(const std::shared_ptr<fs::S3FileSystem>& fs) {
326 return fs->region();
327 }
328
329 #endif
330