1# Licensed to the Apache Software Foundation (ASF) under one
2# or more contributor license agreements.  See the NOTICE file
3# distributed with this work for additional information
4# regarding copyright ownership.  The ASF licenses this file
5# to you under the Apache License, Version 2.0 (the
6# "License"); you may not use this file except in compliance
7# with the License.  You may obtain a copy of the License at
8#
9#   http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing,
12# software distributed under the License is distributed on an
13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14# KIND, either express or implied.  See the License for the
15# specific language governing permissions and limitations
16# under the License.
17
18#' @include arrow-package.R
19#' @title FileSystem entry info
20#' @usage NULL
21#' @format NULL
22#'
23#' @section Methods:
24#'
25#' - `base_name()` : The file base name (component after the last directory
26#'    separator).
27#' - `extension()` : The file extension
28#'
29#' @section Active bindings:
30#'
31#' - `$type`: The file type
32#' - `$path`: The full file path in the filesystem
33#' - `$size`: The size in bytes, if available.  Only regular files are
34#'    guaranteed to have a size.
35#' - `$mtime`: The time of last modification, if available.
36#'
37#' @rdname FileInfo
38#' @export
39FileInfo <- R6Class("FileInfo",
40  inherit = ArrowObject,
41  public = list(
42    base_name = function() fs___FileInfo__base_name(self),
43    extension = function() fs___FileInfo__extension(self)
44  ),
45  active = list(
46    type = function(type) {
47      if (missing(type)) {
48        fs___FileInfo__type(self)
49      } else {
50        fs___FileInfo__set_type(self, type)
51      }
52    },
53    path = function(path) {
54      if (missing(path)) {
55        fs___FileInfo__path(self)
56      } else {
57        invisible(fs___FileInfo__set_path(self))
58      }
59    },
60    size = function(size) {
61      if (missing(size)) {
62        fs___FileInfo__size(self)
63      } else {
64        invisible(fs___FileInfo__set_size(self, size))
65      }
66    },
67    mtime = function(time) {
68      if (missing(time)) {
69        fs___FileInfo__mtime(self)
70      } else {
71        if (!inherits(time, "POSIXct") && length(time) == 1L) {
72          abort("invalid time")
73        }
74        invisible(fs___FileInfo__set_mtime(self, time))
75      }
76    }
77  )
78)
79
80#' @title file selector
81#' @format NULL
82#'
83#' @section Factory:
84#'
85#' The `$create()` factory method instantiates a `FileSelector` given the 3 fields
86#' described below.
87#'
88#' @section Fields:
89#'
90#' - `base_dir`: The directory in which to select files. If the path exists but
91#'    doesn't point to a directory, this should be an error.
92#' - `allow_not_found`: The behavior if `base_dir` doesn't exist in the
93#'    filesystem. If `FALSE`, an error is returned.  If `TRUE`, an empty
94#'    selection is returned
95#' - `recursive`: Whether to recurse into subdirectories.
96#'
97#' @rdname FileSelector
98#' @export
99FileSelector <- R6Class("FileSelector",
100  inherit = ArrowObject,
101  active = list(
102    base_dir = function() fs___FileSelector__base_dir(self),
103    allow_not_found = function() fs___FileSelector__allow_not_found(self),
104    recursive = function() fs___FileSelector__recursive(self)
105  )
106)
107
108FileSelector$create <- function(base_dir, allow_not_found = FALSE, recursive = FALSE) {
109  fs___FileSelector__create(clean_path_rel(base_dir), allow_not_found, recursive)
110}
111
112#' @title FileSystem classes
113#' @description `FileSystem` is an abstract file system API,
114#' `LocalFileSystem` is an implementation accessing files
115#' on the local machine. `SubTreeFileSystem` is an implementation that delegates
116#' to another implementation after prepending a fixed base path
117#'
118#' @section Factory:
119#'
120#' `LocalFileSystem$create()` returns the object and takes no arguments.
121#'
122#' `SubTreeFileSystem$create()` takes the following arguments:
123#'
124#' - `base_path`, a string path
125#' - `base_fs`, a `FileSystem` object
126#'
127#' `S3FileSystem$create()` optionally takes arguments:
128#'
129#' - `anonymous`: logical, default `FALSE`. If true, will not attempt to look up
130#'    credentials using standard AWS configuration methods.
131#' - `access_key`, `secret_key`: authentication credentials. If one is provided,
132#'    the other must be as well. If both are provided, they will override any
133#'    AWS configuration set at the environment level.
134#' - `session_token`: optional string for authentication along with
135#'    `access_key` and `secret_key`
136#' - `role_arn`: string AWS ARN of an AccessRole. If provided instead of `access_key` and
137#'    `secret_key`, temporary credentials will be fetched by assuming this role.
138#' - `session_name`: optional string identifier for the assumed role session.
139#' - `external_id`: optional unique string identifier that might be required
140#'    when you assume a role in another account.
141#' - `load_frequency`: integer, frequency (in seconds) with which temporary
142#'    credentials from an assumed role session will be refreshed. Default is
143#'    900 (i.e. 15 minutes)
144#' - `region`: AWS region to connect to. If omitted, the AWS library will
145#'    provide a sensible default based on client configuration, falling back
146#'    to "us-east-1" if no other alternatives are found.
147#' - `endpoint_override`: If non-empty, override region with a connect string
148#'    such as "localhost:9000". This is useful for connecting to file systems
149#'    that emulate S3.
150#' - `scheme`: S3 connection transport (default "https")
151#' - `background_writes`: logical, whether `OutputStream` writes will be issued
152#'    in the background, without blocking (default `TRUE`)
153#'
154#' @section Methods:
155#'
156#' - `$GetFileInfo(x)`: `x` may be a [FileSelector][FileSelector] or a character
157#'    vector of paths. Returns a list of [FileInfo][FileInfo]
158#' - `$CreateDir(path, recursive = TRUE)`: Create a directory and subdirectories.
159#' - `$DeleteDir(path)`: Delete a directory and its contents, recursively.
160#' - `$DeleteDirContents(path)`: Delete a directory's contents, recursively.
161#'    Like `$DeleteDir()`,
162#'    but doesn't delete the directory itself. Passing an empty path (`""`) will
163#'    wipe the entire filesystem tree.
164#' - `$DeleteFile(path)` : Delete a file.
165#' - `$DeleteFiles(paths)` : Delete many files. The default implementation
166#'    issues individual delete operations in sequence.
167#' - `$Move(src, dest)`: Move / rename a file or directory. If the destination
168#'    exists:
169#'      if it is a non-empty directory, an error is returned
170#'      otherwise, if it has the same type as the source, it is replaced
171#'      otherwise, behavior is unspecified (implementation-dependent).
172#' - `$CopyFile(src, dest)`: Copy a file. If the destination exists and is a
173#'    directory, an error is returned. Otherwise, it is replaced.
174#' - `$OpenInputStream(path)`: Open an [input stream][InputStream] for
175#'    sequential reading.
176#' - `$OpenInputFile(path)`: Open an [input file][RandomAccessFile] for random
177#'    access reading.
178#' - `$OpenOutputStream(path)`: Open an [output stream][OutputStream] for
179#'    sequential writing.
180#' - `$OpenAppendStream(path)`: Open an [output stream][OutputStream] for
181#'    appending.
182#'
183#' @section Active bindings:
184#'
185#' - `$type_name`: string filesystem type name, such as "local", "s3", etc.
186#' - `$region`: string AWS region, for `S3FileSystem` and `SubTreeFileSystem`
187#'    containing a `S3FileSystem`
188#' - `$base_fs`: for `SubTreeFileSystem`, the `FileSystem` it contains
189#' - `$base_path`: for `SubTreeFileSystem`, the path in `$base_fs` which is considered
190#'    root in this `SubTreeFileSystem`.
191#'
192#' @usage NULL
193#' @format NULL
194#' @docType class
195#'
196#' @rdname FileSystem
197#' @name FileSystem
198#' @export
199FileSystem <- R6Class("FileSystem",
200  inherit = ArrowObject,
201  public = list(
202    GetFileInfo = function(x) {
203      if (inherits(x, "FileSelector")) {
204        fs___FileSystem__GetTargetInfos_FileSelector(self, x)
205      } else if (is.character(x)) {
206        fs___FileSystem__GetTargetInfos_Paths(self, clean_path_rel(x))
207      } else {
208        abort("incompatible type for FileSystem$GetFileInfo()")
209      }
210    },
211    CreateDir = function(path, recursive = TRUE) {
212      fs___FileSystem__CreateDir(self, clean_path_rel(path), isTRUE(recursive))
213    },
214    DeleteDir = function(path) {
215      fs___FileSystem__DeleteDir(self, clean_path_rel(path))
216    },
217    DeleteDirContents = function(path) {
218      fs___FileSystem__DeleteDirContents(self, clean_path_rel(path))
219    },
220    DeleteFile = function(path) {
221      fs___FileSystem__DeleteFile(self, clean_path_rel(path))
222    },
223    DeleteFiles = function(paths) {
224      fs___FileSystem__DeleteFiles(self, clean_path_rel(paths))
225    },
226    Move = function(src, dest) {
227      fs___FileSystem__Move(self, clean_path_rel(src), clean_path_rel(dest))
228    },
229    CopyFile = function(src, dest) {
230      fs___FileSystem__CopyFile(self, clean_path_rel(src), clean_path_rel(dest))
231    },
232    OpenInputStream = function(path) {
233      fs___FileSystem__OpenInputStream(self, clean_path_rel(path))
234    },
235    OpenInputFile = function(path) {
236      fs___FileSystem__OpenInputFile(self, clean_path_rel(path))
237    },
238    OpenOutputStream = function(path) {
239      fs___FileSystem__OpenOutputStream(self, clean_path_rel(path))
240    },
241    OpenAppendStream = function(path) {
242      fs___FileSystem__OpenAppendStream(self, clean_path_rel(path))
243    },
244
245    # Friendlier R user interface
246    path = function(x) SubTreeFileSystem$create(x, self),
247    cd = function(x) SubTreeFileSystem$create(x, self),
248    ls = function(path = "", ...) {
249      selector <- FileSelector$create(path, ...) # ... for recursive = TRUE
250      infos <- self$GetFileInfo(selector)
251      map_chr(infos, ~ .$path)
252      # TODO: add full.names argument like base::dir() (default right now is TRUE)
253      # TODO: see fs package for glob/regexp filtering
254      # TODO: verbose method that shows other attributes as df
255      # TODO: print methods for FileInfo, SubTreeFileSystem, S3FileSystem
256    }
257  ),
258  active = list(
259    type_name = function() fs___FileSystem__type_name(self)
260  )
261)
262FileSystem$from_uri <- function(uri) {
263  assert_that(is.string(uri))
264  fs___FileSystemFromUri(uri)
265}
266
267get_paths_and_filesystem <- function(x, filesystem = NULL) {
268  # Wrapper around FileSystem$from_uri that handles local paths
269  # and an optional explicit filesystem
270  if (inherits(x, "SubTreeFileSystem")) {
271    return(list(fs = x$base_fs, path = x$base_path))
272  }
273  assert_that(is.character(x))
274  are_urls <- are_urls(x)
275  if (any(are_urls)) {
276    if (!all(are_urls)) {
277      stop("Vectors of mixed paths and URIs are not supported", call. = FALSE)
278    }
279    if (!is.null(filesystem)) {
280      # Stop? Can't have URL (which yields a fs) and another fs
281    }
282    x <- lapply(x, FileSystem$from_uri)
283    if (length(unique(map(x, ~ class(.$fs)))) > 1) {
284      stop(
285        "Vectors of URIs for different file systems are not supported",
286        call. = FALSE
287      )
288    }
289    fs <- x[[1]]$fs
290    path <- map_chr(x, ~ .$path) # singular name "path" used for compatibility
291  } else {
292    fs <- filesystem %||% LocalFileSystem$create()
293    if (inherits(fs, "LocalFileSystem")) {
294      path <- clean_path_abs(x)
295    } else {
296      path <- clean_path_rel(x)
297    }
298  }
299  list(
300    fs = fs,
301    path = path
302  )
303}
304
305# variant of the above function that asserts that x is either a scalar string
306# or a SubTreeFileSystem
307get_path_and_filesystem <- function(x, filesystem = NULL) {
308  assert_that(is.string(x) || inherits(x, "SubTreeFileSystem"))
309  get_paths_and_filesystem(x, filesystem)
310}
311
312is_url <- function(x) is.string(x) && grepl("://", x)
313are_urls <- function(x) if (!is.character(x)) FALSE else grepl("://", x)
314
315#' @usage NULL
316#' @format NULL
317#' @rdname FileSystem
318#' @export
319LocalFileSystem <- R6Class("LocalFileSystem", inherit = FileSystem)
320LocalFileSystem$create <- function() {
321  fs___LocalFileSystem__create()
322}
323
324#' @usage NULL
325#' @format NULL
326#' @rdname FileSystem
327#' @importFrom utils modifyList
328#' @export
329S3FileSystem <- R6Class("S3FileSystem",
330  inherit = FileSystem,
331  active = list(
332    region = function() fs___S3FileSystem__region(self)
333  )
334)
335S3FileSystem$create <- function(anonymous = FALSE, ...) {
336  args <- list2(...)
337  if (anonymous) {
338    invalid_args <- intersect(
339      c(
340        "access_key", "secret_key", "session_token", "role_arn", "session_name",
341        "external_id", "load_frequency"
342      ),
343      names(args)
344    )
345    if (length(invalid_args)) {
346      stop("Cannot specify ", oxford_paste(invalid_args), " when anonymous = TRUE", call. = FALSE)
347    }
348  } else {
349    keys_present <- length(intersect(c("access_key", "secret_key"), names(args)))
350    if (keys_present == 1) {
351      stop("Key authentication requires both access_key and secret_key", call. = FALSE)
352    }
353    if ("session_token" %in% names(args) && keys_present != 2) {
354      stop(
355        "In order to initialize a session with temporary credentials, ",
356        "both secret_key and access_key must be provided ",
357        "in addition to session_token.",
358        call. = FALSE
359      )
360    }
361    arn <- "role_arn" %in% names(args)
362    if (keys_present == 2 && arn) {
363      stop("Cannot provide both key authentication and role_arn", call. = FALSE)
364    }
365    arn_extras <- intersect(c("session_name", "external_id", "load_frequency"), names(args))
366    if (length(arn_extras) > 0 && !arn) {
367      stop("Cannot specify ", oxford_paste(arn_extras), " without providing a role_arn string", call. = FALSE)
368    }
369  }
370  args <- c(modifyList(default_s3_options, args), anonymous = anonymous)
371  exec(fs___S3FileSystem__create, !!!args)
372}
373
374default_s3_options <- list(
375  access_key = "",
376  secret_key = "",
377  session_token = "",
378  role_arn = "",
379  session_name = "",
380  external_id = "",
381  load_frequency = 900L,
382  region = "",
383  endpoint_override = "",
384  scheme = "",
385  background_writes = TRUE
386)
387
388#' Connect to an AWS S3 bucket
389#'
390#' `s3_bucket()` is a convenience function to create an `S3FileSystem` object
391#' that automatically detects the bucket's AWS region and holding onto the its
392#' relative path.
393#'
394#' @param bucket string S3 bucket name or path
395#' @param ... Additional connection options, passed to `S3FileSystem$create()`
396#' @return A `SubTreeFileSystem` containing an `S3FileSystem` and the bucket's
397#' relative path. Note that this function's success does not guarantee that you
398#' are authorized to access the bucket's contents.
399#' @examplesIf arrow_with_s3()
400#' bucket <- s3_bucket("ursa-labs-taxi-data")
401#' @export
402s3_bucket <- function(bucket, ...) {
403  assert_that(is.string(bucket))
404  args <- list2(...)
405
406  # Use FileSystemFromUri to detect the bucket's region
407  if (!is_url(bucket)) {
408    bucket <- paste0("s3://", bucket)
409  }
410  fs_and_path <- FileSystem$from_uri(bucket)
411  fs <- fs_and_path$fs
412  # If there are no additional S3Options, we can use that filesystem
413  # Otherwise, take the region that was detected and make a new fs with the args
414  if (length(args)) {
415    args$region <- fs$region
416    fs <- exec(S3FileSystem$create, !!!args)
417  }
418  # Return a subtree pointing at that bucket path
419  SubTreeFileSystem$create(fs_and_path$path, fs)
420}
421
422#' @usage NULL
423#' @format NULL
424#' @rdname FileSystem
425#' @export
426SubTreeFileSystem <- R6Class("SubTreeFileSystem",
427  inherit = FileSystem,
428  public = list(
429    print = function(...) {
430      if (inherits(self$base_fs, "LocalFileSystem")) {
431        cat("SubTreeFileSystem: ", "file://", self$base_path, "\n", sep = "")
432      } else if (inherits(self$base_fs, "S3FileSystem")) {
433        cat("SubTreeFileSystem: ", "s3://", self$base_path, "\n", sep = "")
434      } else {
435        cat("SubTreeFileSystem", "\n", sep = "")
436      }
437      invisible(self)
438    }
439  ),
440  active = list(
441    base_fs = function() {
442      fs___SubTreeFileSystem__base_fs(self)
443    },
444    base_path = function() fs___SubTreeFileSystem__base_path(self)
445  )
446)
447SubTreeFileSystem$create <- function(base_path, base_fs = NULL) {
448  fs_and_path <- get_path_and_filesystem(base_path, base_fs)
449  fs___SubTreeFileSystem__create(fs_and_path$path, fs_and_path$fs)
450}
451
452#' @export
453`$.SubTreeFileSystem` <- function(x, name, ...) {
454  # This is to allow delegating methods/properties to the base_fs
455  assert_that(is.string(name))
456  if (name %in% ls(envir = x)) {
457    get(name, x)
458  } else if (name %in% ls(envir = x$base_fs)) {
459    get(name, x$base_fs)
460  } else {
461    NULL
462  }
463}
464
465#' Copy files between FileSystems
466#'
467#' @param from A string path to a local directory or file, a URI, or a
468#' `SubTreeFileSystem`. Files will be copied recursively from this path.
469#' @param to A string path to a local directory or file, a URI, or a
470#' `SubTreeFileSystem`. Directories will be created as necessary
471#' @param chunk_size The maximum size of block to read before flushing
472#' to the destination file. A larger chunk_size will use more memory while
473#' copying but may help accommodate high latency FileSystems.
474#' @return Nothing: called for side effects in the file system
475#' @export
476#' @examplesIf FALSE
477#' # Copy an S3 bucket's files to a local directory:
478#' copy_files("s3://your-bucket-name", "local-directory")
479#' # Using a FileSystem object
480#' copy_files(s3_bucket("your-bucket-name"), "local-directory")
481#' # Or go the other way, from local to S3
482#' copy_files("local-directory", s3_bucket("your-bucket-name"))
483copy_files <- function(from, to, chunk_size = 1024L * 1024L) {
484  from <- get_path_and_filesystem(from)
485  to <- get_path_and_filesystem(to)
486  invisible(fs___CopyFiles(
487    from$fs,
488    FileSelector$create(from$path, recursive = TRUE),
489    to$fs,
490    to$path,
491    chunk_size,
492    option_use_threads()
493  ))
494}
495
496clean_path_abs <- function(path) {
497  # Make sure we have a valid, absolute, forward-slashed path for passing to Arrow
498  normalizePath(path, winslash = "/", mustWork = FALSE)
499}
500
501clean_path_rel <- function(path) {
502  # Make sure all path separators are "/", not "\" as on Windows
503  path_sep <- ifelse(tolower(Sys.info()[["sysname"]]) == "windows", "\\\\", "/")
504  gsub(path_sep, "/", path)
505}
506