1# Licensed to the Apache Software Foundation (ASF) under one 2# or more contributor license agreements. See the NOTICE file 3# distributed with this work for additional information 4# regarding copyright ownership. The ASF licenses this file 5# to you under the Apache License, Version 2.0 (the 6# "License"); you may not use this file except in compliance 7# with the License. You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, 12# software distributed under the License is distributed on an 13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 14# KIND, either express or implied. See the License for the 15# specific language governing permissions and limitations 16# under the License. 17 18#' @include arrow-package.R 19#' @title FileSystem entry info 20#' @usage NULL 21#' @format NULL 22#' 23#' @section Methods: 24#' 25#' - `base_name()` : The file base name (component after the last directory 26#' separator). 27#' - `extension()` : The file extension 28#' 29#' @section Active bindings: 30#' 31#' - `$type`: The file type 32#' - `$path`: The full file path in the filesystem 33#' - `$size`: The size in bytes, if available. Only regular files are 34#' guaranteed to have a size. 35#' - `$mtime`: The time of last modification, if available. 36#' 37#' @rdname FileInfo 38#' @export 39FileInfo <- R6Class("FileInfo", 40 inherit = ArrowObject, 41 public = list( 42 base_name = function() fs___FileInfo__base_name(self), 43 extension = function() fs___FileInfo__extension(self) 44 ), 45 active = list( 46 type = function(type) { 47 if (missing(type)) { 48 fs___FileInfo__type(self) 49 } else { 50 fs___FileInfo__set_type(self, type) 51 } 52 }, 53 path = function(path) { 54 if (missing(path)) { 55 fs___FileInfo__path(self) 56 } else { 57 invisible(fs___FileInfo__set_path(self)) 58 } 59 }, 60 size = function(size) { 61 if (missing(size)) { 62 fs___FileInfo__size(self) 63 } else { 64 invisible(fs___FileInfo__set_size(self, size)) 65 } 66 }, 67 mtime = function(time) { 68 if (missing(time)) { 69 fs___FileInfo__mtime(self) 70 } else { 71 if (!inherits(time, "POSIXct") && length(time) == 1L) { 72 abort("invalid time") 73 } 74 invisible(fs___FileInfo__set_mtime(self, time)) 75 } 76 } 77 ) 78) 79 80#' @title file selector 81#' @format NULL 82#' 83#' @section Factory: 84#' 85#' The `$create()` factory method instantiates a `FileSelector` given the 3 fields 86#' described below. 87#' 88#' @section Fields: 89#' 90#' - `base_dir`: The directory in which to select files. If the path exists but 91#' doesn't point to a directory, this should be an error. 92#' - `allow_not_found`: The behavior if `base_dir` doesn't exist in the 93#' filesystem. If `FALSE`, an error is returned. If `TRUE`, an empty 94#' selection is returned 95#' - `recursive`: Whether to recurse into subdirectories. 96#' 97#' @rdname FileSelector 98#' @export 99FileSelector <- R6Class("FileSelector", 100 inherit = ArrowObject, 101 active = list( 102 base_dir = function() fs___FileSelector__base_dir(self), 103 allow_not_found = function() fs___FileSelector__allow_not_found(self), 104 recursive = function() fs___FileSelector__recursive(self) 105 ) 106) 107 108FileSelector$create <- function(base_dir, allow_not_found = FALSE, recursive = FALSE) { 109 fs___FileSelector__create(clean_path_rel(base_dir), allow_not_found, recursive) 110} 111 112#' @title FileSystem classes 113#' @description `FileSystem` is an abstract file system API, 114#' `LocalFileSystem` is an implementation accessing files 115#' on the local machine. `SubTreeFileSystem` is an implementation that delegates 116#' to another implementation after prepending a fixed base path 117#' 118#' @section Factory: 119#' 120#' `LocalFileSystem$create()` returns the object and takes no arguments. 121#' 122#' `SubTreeFileSystem$create()` takes the following arguments: 123#' 124#' - `base_path`, a string path 125#' - `base_fs`, a `FileSystem` object 126#' 127#' `S3FileSystem$create()` optionally takes arguments: 128#' 129#' - `anonymous`: logical, default `FALSE`. If true, will not attempt to look up 130#' credentials using standard AWS configuration methods. 131#' - `access_key`, `secret_key`: authentication credentials. If one is provided, 132#' the other must be as well. If both are provided, they will override any 133#' AWS configuration set at the environment level. 134#' - `session_token`: optional string for authentication along with 135#' `access_key` and `secret_key` 136#' - `role_arn`: string AWS ARN of an AccessRole. If provided instead of `access_key` and 137#' `secret_key`, temporary credentials will be fetched by assuming this role. 138#' - `session_name`: optional string identifier for the assumed role session. 139#' - `external_id`: optional unique string identifier that might be required 140#' when you assume a role in another account. 141#' - `load_frequency`: integer, frequency (in seconds) with which temporary 142#' credentials from an assumed role session will be refreshed. Default is 143#' 900 (i.e. 15 minutes) 144#' - `region`: AWS region to connect to. If omitted, the AWS library will 145#' provide a sensible default based on client configuration, falling back 146#' to "us-east-1" if no other alternatives are found. 147#' - `endpoint_override`: If non-empty, override region with a connect string 148#' such as "localhost:9000". This is useful for connecting to file systems 149#' that emulate S3. 150#' - `scheme`: S3 connection transport (default "https") 151#' - `background_writes`: logical, whether `OutputStream` writes will be issued 152#' in the background, without blocking (default `TRUE`) 153#' 154#' @section Methods: 155#' 156#' - `$GetFileInfo(x)`: `x` may be a [FileSelector][FileSelector] or a character 157#' vector of paths. Returns a list of [FileInfo][FileInfo] 158#' - `$CreateDir(path, recursive = TRUE)`: Create a directory and subdirectories. 159#' - `$DeleteDir(path)`: Delete a directory and its contents, recursively. 160#' - `$DeleteDirContents(path)`: Delete a directory's contents, recursively. 161#' Like `$DeleteDir()`, 162#' but doesn't delete the directory itself. Passing an empty path (`""`) will 163#' wipe the entire filesystem tree. 164#' - `$DeleteFile(path)` : Delete a file. 165#' - `$DeleteFiles(paths)` : Delete many files. The default implementation 166#' issues individual delete operations in sequence. 167#' - `$Move(src, dest)`: Move / rename a file or directory. If the destination 168#' exists: 169#' if it is a non-empty directory, an error is returned 170#' otherwise, if it has the same type as the source, it is replaced 171#' otherwise, behavior is unspecified (implementation-dependent). 172#' - `$CopyFile(src, dest)`: Copy a file. If the destination exists and is a 173#' directory, an error is returned. Otherwise, it is replaced. 174#' - `$OpenInputStream(path)`: Open an [input stream][InputStream] for 175#' sequential reading. 176#' - `$OpenInputFile(path)`: Open an [input file][RandomAccessFile] for random 177#' access reading. 178#' - `$OpenOutputStream(path)`: Open an [output stream][OutputStream] for 179#' sequential writing. 180#' - `$OpenAppendStream(path)`: Open an [output stream][OutputStream] for 181#' appending. 182#' 183#' @section Active bindings: 184#' 185#' - `$type_name`: string filesystem type name, such as "local", "s3", etc. 186#' - `$region`: string AWS region, for `S3FileSystem` and `SubTreeFileSystem` 187#' containing a `S3FileSystem` 188#' - `$base_fs`: for `SubTreeFileSystem`, the `FileSystem` it contains 189#' - `$base_path`: for `SubTreeFileSystem`, the path in `$base_fs` which is considered 190#' root in this `SubTreeFileSystem`. 191#' 192#' @usage NULL 193#' @format NULL 194#' @docType class 195#' 196#' @rdname FileSystem 197#' @name FileSystem 198#' @export 199FileSystem <- R6Class("FileSystem", 200 inherit = ArrowObject, 201 public = list( 202 GetFileInfo = function(x) { 203 if (inherits(x, "FileSelector")) { 204 fs___FileSystem__GetTargetInfos_FileSelector(self, x) 205 } else if (is.character(x)) { 206 fs___FileSystem__GetTargetInfos_Paths(self, clean_path_rel(x)) 207 } else { 208 abort("incompatible type for FileSystem$GetFileInfo()") 209 } 210 }, 211 CreateDir = function(path, recursive = TRUE) { 212 fs___FileSystem__CreateDir(self, clean_path_rel(path), isTRUE(recursive)) 213 }, 214 DeleteDir = function(path) { 215 fs___FileSystem__DeleteDir(self, clean_path_rel(path)) 216 }, 217 DeleteDirContents = function(path) { 218 fs___FileSystem__DeleteDirContents(self, clean_path_rel(path)) 219 }, 220 DeleteFile = function(path) { 221 fs___FileSystem__DeleteFile(self, clean_path_rel(path)) 222 }, 223 DeleteFiles = function(paths) { 224 fs___FileSystem__DeleteFiles(self, clean_path_rel(paths)) 225 }, 226 Move = function(src, dest) { 227 fs___FileSystem__Move(self, clean_path_rel(src), clean_path_rel(dest)) 228 }, 229 CopyFile = function(src, dest) { 230 fs___FileSystem__CopyFile(self, clean_path_rel(src), clean_path_rel(dest)) 231 }, 232 OpenInputStream = function(path) { 233 fs___FileSystem__OpenInputStream(self, clean_path_rel(path)) 234 }, 235 OpenInputFile = function(path) { 236 fs___FileSystem__OpenInputFile(self, clean_path_rel(path)) 237 }, 238 OpenOutputStream = function(path) { 239 fs___FileSystem__OpenOutputStream(self, clean_path_rel(path)) 240 }, 241 OpenAppendStream = function(path) { 242 fs___FileSystem__OpenAppendStream(self, clean_path_rel(path)) 243 }, 244 245 # Friendlier R user interface 246 path = function(x) SubTreeFileSystem$create(x, self), 247 cd = function(x) SubTreeFileSystem$create(x, self), 248 ls = function(path = "", ...) { 249 selector <- FileSelector$create(path, ...) # ... for recursive = TRUE 250 infos <- self$GetFileInfo(selector) 251 map_chr(infos, ~ .$path) 252 # TODO: add full.names argument like base::dir() (default right now is TRUE) 253 # TODO: see fs package for glob/regexp filtering 254 # TODO: verbose method that shows other attributes as df 255 # TODO: print methods for FileInfo, SubTreeFileSystem, S3FileSystem 256 } 257 ), 258 active = list( 259 type_name = function() fs___FileSystem__type_name(self) 260 ) 261) 262FileSystem$from_uri <- function(uri) { 263 assert_that(is.string(uri)) 264 fs___FileSystemFromUri(uri) 265} 266 267get_paths_and_filesystem <- function(x, filesystem = NULL) { 268 # Wrapper around FileSystem$from_uri that handles local paths 269 # and an optional explicit filesystem 270 if (inherits(x, "SubTreeFileSystem")) { 271 return(list(fs = x$base_fs, path = x$base_path)) 272 } 273 assert_that(is.character(x)) 274 are_urls <- are_urls(x) 275 if (any(are_urls)) { 276 if (!all(are_urls)) { 277 stop("Vectors of mixed paths and URIs are not supported", call. = FALSE) 278 } 279 if (!is.null(filesystem)) { 280 # Stop? Can't have URL (which yields a fs) and another fs 281 } 282 x <- lapply(x, FileSystem$from_uri) 283 if (length(unique(map(x, ~ class(.$fs)))) > 1) { 284 stop( 285 "Vectors of URIs for different file systems are not supported", 286 call. = FALSE 287 ) 288 } 289 fs <- x[[1]]$fs 290 path <- map_chr(x, ~ .$path) # singular name "path" used for compatibility 291 } else { 292 fs <- filesystem %||% LocalFileSystem$create() 293 if (inherits(fs, "LocalFileSystem")) { 294 path <- clean_path_abs(x) 295 } else { 296 path <- clean_path_rel(x) 297 } 298 } 299 list( 300 fs = fs, 301 path = path 302 ) 303} 304 305# variant of the above function that asserts that x is either a scalar string 306# or a SubTreeFileSystem 307get_path_and_filesystem <- function(x, filesystem = NULL) { 308 assert_that(is.string(x) || inherits(x, "SubTreeFileSystem")) 309 get_paths_and_filesystem(x, filesystem) 310} 311 312is_url <- function(x) is.string(x) && grepl("://", x) 313are_urls <- function(x) if (!is.character(x)) FALSE else grepl("://", x) 314 315#' @usage NULL 316#' @format NULL 317#' @rdname FileSystem 318#' @export 319LocalFileSystem <- R6Class("LocalFileSystem", inherit = FileSystem) 320LocalFileSystem$create <- function() { 321 fs___LocalFileSystem__create() 322} 323 324#' @usage NULL 325#' @format NULL 326#' @rdname FileSystem 327#' @importFrom utils modifyList 328#' @export 329S3FileSystem <- R6Class("S3FileSystem", 330 inherit = FileSystem, 331 active = list( 332 region = function() fs___S3FileSystem__region(self) 333 ) 334) 335S3FileSystem$create <- function(anonymous = FALSE, ...) { 336 args <- list2(...) 337 if (anonymous) { 338 invalid_args <- intersect( 339 c( 340 "access_key", "secret_key", "session_token", "role_arn", "session_name", 341 "external_id", "load_frequency" 342 ), 343 names(args) 344 ) 345 if (length(invalid_args)) { 346 stop("Cannot specify ", oxford_paste(invalid_args), " when anonymous = TRUE", call. = FALSE) 347 } 348 } else { 349 keys_present <- length(intersect(c("access_key", "secret_key"), names(args))) 350 if (keys_present == 1) { 351 stop("Key authentication requires both access_key and secret_key", call. = FALSE) 352 } 353 if ("session_token" %in% names(args) && keys_present != 2) { 354 stop( 355 "In order to initialize a session with temporary credentials, ", 356 "both secret_key and access_key must be provided ", 357 "in addition to session_token.", 358 call. = FALSE 359 ) 360 } 361 arn <- "role_arn" %in% names(args) 362 if (keys_present == 2 && arn) { 363 stop("Cannot provide both key authentication and role_arn", call. = FALSE) 364 } 365 arn_extras <- intersect(c("session_name", "external_id", "load_frequency"), names(args)) 366 if (length(arn_extras) > 0 && !arn) { 367 stop("Cannot specify ", oxford_paste(arn_extras), " without providing a role_arn string", call. = FALSE) 368 } 369 } 370 args <- c(modifyList(default_s3_options, args), anonymous = anonymous) 371 exec(fs___S3FileSystem__create, !!!args) 372} 373 374default_s3_options <- list( 375 access_key = "", 376 secret_key = "", 377 session_token = "", 378 role_arn = "", 379 session_name = "", 380 external_id = "", 381 load_frequency = 900L, 382 region = "", 383 endpoint_override = "", 384 scheme = "", 385 background_writes = TRUE 386) 387 388#' Connect to an AWS S3 bucket 389#' 390#' `s3_bucket()` is a convenience function to create an `S3FileSystem` object 391#' that automatically detects the bucket's AWS region and holding onto the its 392#' relative path. 393#' 394#' @param bucket string S3 bucket name or path 395#' @param ... Additional connection options, passed to `S3FileSystem$create()` 396#' @return A `SubTreeFileSystem` containing an `S3FileSystem` and the bucket's 397#' relative path. Note that this function's success does not guarantee that you 398#' are authorized to access the bucket's contents. 399#' @examplesIf arrow_with_s3() 400#' bucket <- s3_bucket("ursa-labs-taxi-data") 401#' @export 402s3_bucket <- function(bucket, ...) { 403 assert_that(is.string(bucket)) 404 args <- list2(...) 405 406 # Use FileSystemFromUri to detect the bucket's region 407 if (!is_url(bucket)) { 408 bucket <- paste0("s3://", bucket) 409 } 410 fs_and_path <- FileSystem$from_uri(bucket) 411 fs <- fs_and_path$fs 412 # If there are no additional S3Options, we can use that filesystem 413 # Otherwise, take the region that was detected and make a new fs with the args 414 if (length(args)) { 415 args$region <- fs$region 416 fs <- exec(S3FileSystem$create, !!!args) 417 } 418 # Return a subtree pointing at that bucket path 419 SubTreeFileSystem$create(fs_and_path$path, fs) 420} 421 422#' @usage NULL 423#' @format NULL 424#' @rdname FileSystem 425#' @export 426SubTreeFileSystem <- R6Class("SubTreeFileSystem", 427 inherit = FileSystem, 428 public = list( 429 print = function(...) { 430 if (inherits(self$base_fs, "LocalFileSystem")) { 431 cat("SubTreeFileSystem: ", "file://", self$base_path, "\n", sep = "") 432 } else if (inherits(self$base_fs, "S3FileSystem")) { 433 cat("SubTreeFileSystem: ", "s3://", self$base_path, "\n", sep = "") 434 } else { 435 cat("SubTreeFileSystem", "\n", sep = "") 436 } 437 invisible(self) 438 } 439 ), 440 active = list( 441 base_fs = function() { 442 fs___SubTreeFileSystem__base_fs(self) 443 }, 444 base_path = function() fs___SubTreeFileSystem__base_path(self) 445 ) 446) 447SubTreeFileSystem$create <- function(base_path, base_fs = NULL) { 448 fs_and_path <- get_path_and_filesystem(base_path, base_fs) 449 fs___SubTreeFileSystem__create(fs_and_path$path, fs_and_path$fs) 450} 451 452#' @export 453`$.SubTreeFileSystem` <- function(x, name, ...) { 454 # This is to allow delegating methods/properties to the base_fs 455 assert_that(is.string(name)) 456 if (name %in% ls(envir = x)) { 457 get(name, x) 458 } else if (name %in% ls(envir = x$base_fs)) { 459 get(name, x$base_fs) 460 } else { 461 NULL 462 } 463} 464 465#' Copy files between FileSystems 466#' 467#' @param from A string path to a local directory or file, a URI, or a 468#' `SubTreeFileSystem`. Files will be copied recursively from this path. 469#' @param to A string path to a local directory or file, a URI, or a 470#' `SubTreeFileSystem`. Directories will be created as necessary 471#' @param chunk_size The maximum size of block to read before flushing 472#' to the destination file. A larger chunk_size will use more memory while 473#' copying but may help accommodate high latency FileSystems. 474#' @return Nothing: called for side effects in the file system 475#' @export 476#' @examplesIf FALSE 477#' # Copy an S3 bucket's files to a local directory: 478#' copy_files("s3://your-bucket-name", "local-directory") 479#' # Using a FileSystem object 480#' copy_files(s3_bucket("your-bucket-name"), "local-directory") 481#' # Or go the other way, from local to S3 482#' copy_files("local-directory", s3_bucket("your-bucket-name")) 483copy_files <- function(from, to, chunk_size = 1024L * 1024L) { 484 from <- get_path_and_filesystem(from) 485 to <- get_path_and_filesystem(to) 486 invisible(fs___CopyFiles( 487 from$fs, 488 FileSelector$create(from$path, recursive = TRUE), 489 to$fs, 490 to$path, 491 chunk_size, 492 option_use_threads() 493 )) 494} 495 496clean_path_abs <- function(path) { 497 # Make sure we have a valid, absolute, forward-slashed path for passing to Arrow 498 normalizePath(path, winslash = "/", mustWork = FALSE) 499} 500 501clean_path_rel <- function(path) { 502 # Make sure all path separators are "/", not "\" as on Windows 503 path_sep <- ifelse(tolower(Sys.info()[["sysname"]]) == "windows", "\\\\", "/") 504 gsub(path_sep, "/", path) 505} 506