1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements.  See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership.  The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License.  You may obtain a copy of the License at
8 //
9 //   http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied.  See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 #include "./arrow_types.h"
19 
20 #if defined(ARROW_R_WITH_ARROW)
21 
22 #include <arrow/filesystem/filesystem.h>
23 #include <arrow/filesystem/localfs.h>
24 
25 namespace fs = ::arrow::fs;
26 namespace io = ::arrow::io;
27 
28 namespace cpp11 {
29 
get(const std::shared_ptr<fs::FileSystem> & file_system)30 const char* r6_class_name<fs::FileSystem>::get(
31     const std::shared_ptr<fs::FileSystem>& file_system) {
32   auto type_name = file_system->type_name();
33 
34   if (type_name == "local") {
35     return "LocalFileSystem";
36   } else if (type_name == "s3") {
37     return "S3FileSystem";
38   } else if (type_name == "subtree") {
39     return "SubTreeFileSystem";
40   } else {
41     return "FileSystem";
42   }
43 }
44 
45 }  // namespace cpp11
46 
47 // [[arrow::export]]
fs___FileInfo__type(const std::shared_ptr<fs::FileInfo> & x)48 fs::FileType fs___FileInfo__type(const std::shared_ptr<fs::FileInfo>& x) {
49   return x->type();
50 }
51 
52 // [[arrow::export]]
fs___FileInfo__set_type(const std::shared_ptr<fs::FileInfo> & x,fs::FileType type)53 void fs___FileInfo__set_type(const std::shared_ptr<fs::FileInfo>& x, fs::FileType type) {
54   x->set_type(type);
55 }
56 
57 // [[arrow::export]]
fs___FileInfo__path(const std::shared_ptr<fs::FileInfo> & x)58 std::string fs___FileInfo__path(const std::shared_ptr<fs::FileInfo>& x) {
59   return x->path();
60 }
61 
62 // [[arrow::export]]
fs___FileInfo__set_path(const std::shared_ptr<fs::FileInfo> & x,const std::string & path)63 void fs___FileInfo__set_path(const std::shared_ptr<fs::FileInfo>& x,
64                              const std::string& path) {
65   x->set_path(path);
66 }
67 
68 // [[arrow::export]]
fs___FileInfo__size(const std::shared_ptr<fs::FileInfo> & x)69 int64_t fs___FileInfo__size(const std::shared_ptr<fs::FileInfo>& x) { return x->size(); }
70 
71 // [[arrow::export]]
fs___FileInfo__set_size(const std::shared_ptr<fs::FileInfo> & x,int64_t size)72 void fs___FileInfo__set_size(const std::shared_ptr<fs::FileInfo>& x, int64_t size) {
73   x->set_size(size);
74 }
75 
76 // [[arrow::export]]
fs___FileInfo__base_name(const std::shared_ptr<fs::FileInfo> & x)77 std::string fs___FileInfo__base_name(const std::shared_ptr<fs::FileInfo>& x) {
78   return x->base_name();
79 }
80 
81 // [[arrow::export]]
fs___FileInfo__extension(const std::shared_ptr<fs::FileInfo> & x)82 std::string fs___FileInfo__extension(const std::shared_ptr<fs::FileInfo>& x) {
83   return x->extension();
84 }
85 
86 // [[arrow::export]]
fs___FileInfo__mtime(const std::shared_ptr<fs::FileInfo> & x)87 SEXP fs___FileInfo__mtime(const std::shared_ptr<fs::FileInfo>& x) {
88   SEXP res = PROTECT(Rf_allocVector(REALSXP, 1));
89   // .mtime() gets us nanoseconds since epoch, POSIXct is seconds since epoch as a double
90   REAL(res)[0] = static_cast<double>(x->mtime().time_since_epoch().count()) / 1000000000;
91   Rf_classgets(res, arrow::r::data::classes_POSIXct);
92   UNPROTECT(1);
93   return res;
94 }
95 
96 // [[arrow::export]]
fs___FileInfo__set_mtime(const std::shared_ptr<fs::FileInfo> & x,SEXP time)97 void fs___FileInfo__set_mtime(const std::shared_ptr<fs::FileInfo>& x, SEXP time) {
98   auto nanosecs =
99       std::chrono::nanoseconds(static_cast<int64_t>(REAL(time)[0] * 1000000000));
100   x->set_mtime(fs::TimePoint(nanosecs));
101 }
102 
103 // Selector
104 
105 // [[arrow::export]]
fs___FileSelector__base_dir(const std::shared_ptr<fs::FileSelector> & selector)106 std::string fs___FileSelector__base_dir(
107     const std::shared_ptr<fs::FileSelector>& selector) {
108   return selector->base_dir;
109 }
110 
111 // [[arrow::export]]
fs___FileSelector__allow_not_found(const std::shared_ptr<fs::FileSelector> & selector)112 bool fs___FileSelector__allow_not_found(
113     const std::shared_ptr<fs::FileSelector>& selector) {
114   return selector->allow_not_found;
115 }
116 
117 // [[arrow::export]]
fs___FileSelector__recursive(const std::shared_ptr<fs::FileSelector> & selector)118 bool fs___FileSelector__recursive(const std::shared_ptr<fs::FileSelector>& selector) {
119   return selector->recursive;
120 }
121 
122 // [[arrow::export]]
fs___FileSelector__create(const std::string & base_dir,bool allow_not_found,bool recursive)123 std::shared_ptr<fs::FileSelector> fs___FileSelector__create(const std::string& base_dir,
124                                                             bool allow_not_found,
125                                                             bool recursive) {
126   auto selector = std::make_shared<fs::FileSelector>();
127   selector->base_dir = base_dir;
128   selector->allow_not_found = allow_not_found;
129   selector->recursive = recursive;
130   return selector;
131 }
132 
133 // FileSystem
134 
135 template <typename T>
shared_ptr_vector(const std::vector<T> & vec)136 std::vector<std::shared_ptr<T>> shared_ptr_vector(const std::vector<T>& vec) {
137   std::vector<std::shared_ptr<fs::FileInfo>> res(vec.size());
138   std::transform(vec.begin(), vec.end(), res.begin(),
139                  [](const fs::FileInfo& x) { return std::make_shared<fs::FileInfo>(x); });
140   return res;
141 }
142 
143 // [[arrow::export]]
fs___FileSystem__GetTargetInfos_Paths(const std::shared_ptr<fs::FileSystem> & file_system,const std::vector<std::string> & paths)144 cpp11::list fs___FileSystem__GetTargetInfos_Paths(
145     const std::shared_ptr<fs::FileSystem>& file_system,
146     const std::vector<std::string>& paths) {
147   auto results = ValueOrStop(file_system->GetFileInfo(paths));
148   return arrow::r::to_r_list(shared_ptr_vector(results));
149 }
150 
151 // [[arrow::export]]
fs___FileSystem__GetTargetInfos_FileSelector(const std::shared_ptr<fs::FileSystem> & file_system,const std::shared_ptr<fs::FileSelector> & selector)152 cpp11::list fs___FileSystem__GetTargetInfos_FileSelector(
153     const std::shared_ptr<fs::FileSystem>& file_system,
154     const std::shared_ptr<fs::FileSelector>& selector) {
155   auto results = ValueOrStop(file_system->GetFileInfo(*selector));
156 
157   return arrow::r::to_r_list(shared_ptr_vector(results));
158 }
159 
160 // [[arrow::export]]
fs___FileSystem__CreateDir(const std::shared_ptr<fs::FileSystem> & file_system,const std::string & path,bool recursive)161 void fs___FileSystem__CreateDir(const std::shared_ptr<fs::FileSystem>& file_system,
162                                 const std::string& path, bool recursive) {
163   StopIfNotOk(file_system->CreateDir(path, recursive));
164 }
165 
166 // [[arrow::export]]
fs___FileSystem__DeleteDir(const std::shared_ptr<fs::FileSystem> & file_system,const std::string & path)167 void fs___FileSystem__DeleteDir(const std::shared_ptr<fs::FileSystem>& file_system,
168                                 const std::string& path) {
169   StopIfNotOk(file_system->DeleteDir(path));
170 }
171 
172 // [[arrow::export]]
fs___FileSystem__DeleteDirContents(const std::shared_ptr<fs::FileSystem> & file_system,const std::string & path)173 void fs___FileSystem__DeleteDirContents(
174     const std::shared_ptr<fs::FileSystem>& file_system, const std::string& path) {
175   StopIfNotOk(file_system->DeleteDirContents(path));
176 }
177 
178 // [[arrow::export]]
fs___FileSystem__DeleteFile(const std::shared_ptr<fs::FileSystem> & file_system,const std::string & path)179 void fs___FileSystem__DeleteFile(const std::shared_ptr<fs::FileSystem>& file_system,
180                                  const std::string& path) {
181   StopIfNotOk(file_system->DeleteFile(path));
182 }
183 
184 // [[arrow::export]]
fs___FileSystem__DeleteFiles(const std::shared_ptr<fs::FileSystem> & file_system,const std::vector<std::string> & paths)185 void fs___FileSystem__DeleteFiles(const std::shared_ptr<fs::FileSystem>& file_system,
186                                   const std::vector<std::string>& paths) {
187   StopIfNotOk(file_system->DeleteFiles(paths));
188 }
189 
190 // [[arrow::export]]
fs___FileSystem__Move(const std::shared_ptr<fs::FileSystem> & file_system,const std::string & src,const std::string & dest)191 void fs___FileSystem__Move(const std::shared_ptr<fs::FileSystem>& file_system,
192                            const std::string& src, const std::string& dest) {
193   StopIfNotOk(file_system->Move(src, dest));
194 }
195 
196 // [[arrow::export]]
fs___FileSystem__CopyFile(const std::shared_ptr<fs::FileSystem> & file_system,const std::string & src,const std::string & dest)197 void fs___FileSystem__CopyFile(const std::shared_ptr<fs::FileSystem>& file_system,
198                                const std::string& src, const std::string& dest) {
199   StopIfNotOk(file_system->CopyFile(src, dest));
200 }
201 
202 // [[arrow::export]]
fs___FileSystem__OpenInputStream(const std::shared_ptr<fs::FileSystem> & file_system,const std::string & path)203 std::shared_ptr<arrow::io::InputStream> fs___FileSystem__OpenInputStream(
204     const std::shared_ptr<fs::FileSystem>& file_system, const std::string& path) {
205   return ValueOrStop(file_system->OpenInputStream(path));
206 }
207 
208 // [[arrow::export]]
fs___FileSystem__OpenInputFile(const std::shared_ptr<fs::FileSystem> & file_system,const std::string & path)209 std::shared_ptr<arrow::io::RandomAccessFile> fs___FileSystem__OpenInputFile(
210     const std::shared_ptr<fs::FileSystem>& file_system, const std::string& path) {
211   return ValueOrStop(file_system->OpenInputFile(path));
212 }
213 
214 // [[arrow::export]]
fs___FileSystem__OpenOutputStream(const std::shared_ptr<fs::FileSystem> & file_system,const std::string & path)215 std::shared_ptr<arrow::io::OutputStream> fs___FileSystem__OpenOutputStream(
216     const std::shared_ptr<fs::FileSystem>& file_system, const std::string& path) {
217   return ValueOrStop(file_system->OpenOutputStream(path));
218 }
219 
220 // [[arrow::export]]
fs___FileSystem__OpenAppendStream(const std::shared_ptr<fs::FileSystem> & file_system,const std::string & path)221 std::shared_ptr<arrow::io::OutputStream> fs___FileSystem__OpenAppendStream(
222     const std::shared_ptr<fs::FileSystem>& file_system, const std::string& path) {
223   return ValueOrStop(file_system->OpenAppendStream(path));
224 }
225 
226 // [[arrow::export]]
fs___FileSystem__type_name(const std::shared_ptr<fs::FileSystem> & file_system)227 std::string fs___FileSystem__type_name(
228     const std::shared_ptr<fs::FileSystem>& file_system) {
229   return file_system->type_name();
230 }
231 
232 // [[arrow::export]]
fs___LocalFileSystem__create()233 std::shared_ptr<fs::LocalFileSystem> fs___LocalFileSystem__create() {
234   // Affects OpenInputFile/OpenInputStream
235   auto io_context = arrow::io::IOContext(gc_memory_pool());
236   return std::make_shared<fs::LocalFileSystem>(io_context);
237 }
238 
239 // [[arrow::export]]
fs___SubTreeFileSystem__create(const std::string & base_path,const std::shared_ptr<fs::FileSystem> & base_fs)240 std::shared_ptr<fs::SubTreeFileSystem> fs___SubTreeFileSystem__create(
241     const std::string& base_path, const std::shared_ptr<fs::FileSystem>& base_fs) {
242   return std::make_shared<fs::SubTreeFileSystem>(base_path, base_fs);
243 }
244 
245 // [[arrow::export]]
fs___SubTreeFileSystem__base_fs(const std::shared_ptr<fs::SubTreeFileSystem> & file_system)246 std::shared_ptr<fs::FileSystem> fs___SubTreeFileSystem__base_fs(
247     const std::shared_ptr<fs::SubTreeFileSystem>& file_system) {
248   return file_system->base_fs();
249 }
250 
251 // [[arrow::export]]
fs___SubTreeFileSystem__base_path(const std::shared_ptr<fs::SubTreeFileSystem> & file_system)252 std::string fs___SubTreeFileSystem__base_path(
253     const std::shared_ptr<fs::SubTreeFileSystem>& file_system) {
254   return file_system->base_path();
255 }
256 
257 // [[arrow::export]]
fs___FileSystemFromUri(const std::string & path)258 cpp11::writable::list fs___FileSystemFromUri(const std::string& path) {
259   using cpp11::literals::operator"" _nm;
260 
261   std::string out_path;
262   return cpp11::writable::list(
263       {"fs"_nm = cpp11::to_r6(ValueOrStop(fs::FileSystemFromUri(path, &out_path))),
264        "path"_nm = out_path});
265 }
266 
267 // [[arrow::export]]
fs___CopyFiles(const std::shared_ptr<fs::FileSystem> & source_fs,const std::shared_ptr<fs::FileSelector> & source_sel,const std::shared_ptr<fs::FileSystem> & destination_fs,const std::string & destination_base_dir,int64_t chunk_size=1024* 1024,bool use_threads=true)268 void fs___CopyFiles(const std::shared_ptr<fs::FileSystem>& source_fs,
269                     const std::shared_ptr<fs::FileSelector>& source_sel,
270                     const std::shared_ptr<fs::FileSystem>& destination_fs,
271                     const std::string& destination_base_dir,
272                     int64_t chunk_size = 1024 * 1024, bool use_threads = true) {
273   StopIfNotOk(fs::CopyFiles(source_fs, *source_sel, destination_fs, destination_base_dir,
274                             io::default_io_context(), chunk_size, use_threads));
275 }
276 
277 #endif
278 
279 #if defined(ARROW_R_WITH_S3)
280 
281 #include <arrow/filesystem/s3fs.h>
282 
283 // [[s3::export]]
fs___S3FileSystem__create(bool anonymous=false,std::string access_key="",std::string secret_key="",std::string session_token="",std::string role_arn="",std::string session_name="",std::string external_id="",int load_frequency=900,std::string region="",std::string endpoint_override="",std::string scheme="",bool background_writes=true)284 std::shared_ptr<fs::S3FileSystem> fs___S3FileSystem__create(
285     bool anonymous = false, std::string access_key = "", std::string secret_key = "",
286     std::string session_token = "", std::string role_arn = "",
287     std::string session_name = "", std::string external_id = "", int load_frequency = 900,
288     std::string region = "", std::string endpoint_override = "", std::string scheme = "",
289     bool background_writes = true) {
290   fs::S3Options s3_opts;
291   // Handle auth (anonymous, keys, default)
292   // (validation/internal coherence handled in R)
293   if (anonymous) {
294     s3_opts = fs::S3Options::Anonymous();
295   } else if (access_key != "" && secret_key != "") {
296     s3_opts = fs::S3Options::FromAccessKey(access_key, secret_key, session_token);
297   } else if (role_arn != "") {
298     s3_opts = fs::S3Options::FromAssumeRole(role_arn, session_name, external_id,
299                                             load_frequency);
300   } else {
301     s3_opts = fs::S3Options::Defaults();
302   }
303 
304   // Now handle the rest of the options
305   /// AWS region to connect to (default determined by AWS SDK)
306   if (region != "") {
307     s3_opts.region = region;
308   }
309   /// If non-empty, override region with a connect string such as "localhost:9000"
310   s3_opts.endpoint_override = endpoint_override;
311   /// S3 connection transport, default "https"
312   if (scheme != "") {
313     s3_opts.scheme = scheme;
314   }
315   /// Whether OutputStream writes will be issued in the background, without blocking
316   /// default true
317   s3_opts.background_writes = background_writes;
318 
319   StopIfNotOk(fs::EnsureS3Initialized());
320   auto io_context = arrow::io::IOContext(gc_memory_pool());
321   return ValueOrStop(fs::S3FileSystem::Make(s3_opts, io_context));
322 }
323 
324 // [[s3::export]]
fs___S3FileSystem__region(const std::shared_ptr<fs::S3FileSystem> & fs)325 std::string fs___S3FileSystem__region(const std::shared_ptr<fs::S3FileSystem>& fs) {
326   return fs->region();
327 }
328 
329 #endif
330