1 2default_http_version <- function() { 3 os <- Sys.info()["sysname"] 4 if (!is.na(os) && os == "Darwin") { 5 # FIXME: when is it safe to remove this? Does it depend on the OS 6 # version? The libcurl version? 7 2 # HTTP 1.1 8 } else { 9 0 # whatever curl chooses 10 } 11} 12 13#' @importFrom utils modifyList 14 15update_async_timeouts <- function(options) { 16 getopt <- function(nm) { 17 if (!is.null(v <- options[[nm]])) return(v) 18 anm <- paste0("pkgcache_", nm) 19 if (!is.null(v <- getOption(anm))) return(v) 20 if (!is.na(v <- Sys.getenv(toupper(anm), NA_character_))) return (v) 21 } 22 utils::modifyList( 23 options, 24 list( 25 timeout = as.integer(getopt("timeout") %||% 0), 26 connecttimeout = as.integer(getopt("connecttimeout") %||% 300), 27 low_speed_time = as.integer(getopt("low_speed_time") %||% 0), 28 low_speed_limit = as.integer(getopt("low_speed_limit") %||% 0), 29 http_version = as.integer(getopt("http_version") %||% default_http_version()) 30 ) 31 ) 32} 33 34#' Download a file, asynchronously 35#' 36#' This is the asynchronous version of [utils::download.file()]. 37#' 38#' `download_file` also has some nice improvements: 39#' * It uses a temporary file, so never leaves a partial file at `destfile`. 40#' * It can write the HTTP ETag from the response to a file, which can 41#' be used in [download_if_newer()], etc. 42#' * It returns the HTTP response as part of the error message if the 43#' response status code indicates a client or server error. 44#' * Well, it is asynchronous. 45#' 46#' @param url URL to download. 47#' @param destfile Destination file. 48#' @param etag_file If not `NULL`, and the response is successful and 49#' includes an `ETag` header, then this header is stored in this file. 50#' It can be used to cache the file, with the [download_if_newer()] or 51#' the [download_one_of()] functions. 52#' @param tmp_destfile Where to store the temporary destination file. 53#' @param error_on_status Whether to error for an HTTP status larger 54#' than or equal to 400. If this is `FALSE`, then an error object is 55#' returned for these status codes. 56#' @param options Curl options. 57#' @param ... Additional arguments are passed to [http_get()]. 58#' @return A [deferred] object. It resolves to a list with entries: 59#' * `url`: The URL in the request. 60#' * `destfile`: The destination file. 61#' * `response`: The response object from the curl package. 62#' * `etag`: The ETag of the response, of `NULL` if missing. 63#' * `etag_file`: The file the ETag was written to, or `NULL` otherwise 64#' 65#' @family async HTTP tools 66#' @importFrom curl parse_headers_list 67#' @noRd 68#' @section Examples: 69#' ``` 70#' dest1 <- tempfile(fileext = ".jpeg") 71#' dest2 <- tempfile(fileext = ".png") 72#' dl <- function() { 73#' when_all( 74#' download_file("https://httpbin.org/image/jpeg", dest1), 75#' download_file("https://httpbin.org/image/png", dest2) 76#' ) 77#' } 78#' resps <- synchronise(dl()) 79#' lapply(resps, function(x) x$response$status_code) 80#' resps[[1]]$url 81#' resps[[1]]$destfile 82#' resps[[1]]$response$times 83#' file.exists(dest1) 84#' file.exists(dest2) 85#' 86#' ## HTTP errors contain the response 87#' dest <- tempfile() 88#' err <- tryCatch( 89#' synchronise(download_file("https://httpbin.org/status/418", dest)), 90#' error = function(e) e 91#' ) 92#' err 93#' names(err) 94#' cat(rawToChar(err$response$content)) 95#' ``` 96 97download_file <- function(url, destfile, etag_file = NULL, 98 tmp_destfile = paste0(destfile, ".tmp"), 99 error_on_status = TRUE, 100 options = list(), ...) { 101 "!DEBUG downloading `url`" 102 assert_that( 103 is_string(url), 104 is_path(destfile), 105 is_path(tmp_destfile), 106 is_path_or_null(etag_file), 107 is_flag(error_on_status), 108 is.list(options)) 109 force(list(...)) 110 111 options <- update_async_timeouts(options) 112 destfile <- normalizePath(destfile, mustWork = FALSE) 113 tmp_destfile <- normalizePath(tmp_destfile, mustWork = FALSE) 114 mkdirp(dirname(tmp_destfile)) 115 116 http_get(url, file = tmp_destfile, options = options, ...)$ 117 then(http_stop_for_status)$ 118 then(function(resp) { 119 "!DEBUG downloaded `url`" 120 file.rename(tmp_destfile, destfile) 121 etag <- parse_headers_list(resp$headers)[["etag"]] %||% NA_character_ 122 if (!is.null(etag_file) && !is.na(etag[1])) { 123 mkdirp(dirname(etag_file)) 124 writeLines(etag, etag_file) 125 } 126 list(url = url, destfile = destfile, response = resp, etag = etag, 127 etag_file = etag_file) 128 })$ 129 catch(error = function(err) { 130 "!DEBUG downloading `url` failed" 131 err$destfile <- destfile 132 err$url <- url 133 if (error_on_status) stop(err) else err 134 }) 135} 136 137read_etag <- function(etag_file) { 138 tryCatch( 139 suppressWarnings(read_lines(etag_file, n = 1, warn = FALSE)[1]), 140 error = function(e) NA 141 ) 142} 143 144get_etag_header_from_file <- function(destfile, etag_file) { 145 if (!is.null(etag_file)) { 146 etag_old <- read_etag(etag_file) 147 if (file.exists(destfile) && !is.na(etag_old)) { 148 c("If-None-Match" = etag_old) 149 } 150 } 151} 152 153#' Download a file, if it is newer than a local file 154#' 155#' A version of [download_file()] that only downloads if the file at the 156#' specified URL is different from the local one. 157#' 158#' @inheritParams download_file 159#' @param etag_file If not `NULL` then the path to a file that may contain 160#' the ETag of a previous request to this URL. If `destfile` exists, and 161#' `etag_file` exists and it is not empty, then the `If-None-Match` HTTP 162#' header is used with this ETag to avoid downloading the file if it has 163#' not changed. If the file at `url` has changed, then it is downloaded, 164#' and the the new ETag is stored in `etag_file`. 165#' @param headers HTTP headers to add to the request, a named character 166#' vector. 167#' @inherit download_file return 168#' 169#' @family async HTTP tools 170#' @noRd 171#' @section Examples: 172#' ``` 173#' dest <- tempfile(fileext = ".jpeg") 174#' etag <- tempfile() 175#' dl <- function() { 176#' ## This URL will repond with an ETag 177#' download_if_newer("https://httpbin.org/etag/test", dest, 178#' etag_file = etag) 179#' } 180#' file.exists(dest) 181#' file.exists(etag) 182#' 183#' res1 <- synchronise(dl()) 184#' 185#' ## Downloaded the file, and also created the etag file 186#' file.exists(dest) 187#' file.exists(etag) 188#' readLines(etag) 189#' res1$response$status_code 190#' 191#' ## This will not download the file again, as the ETag matches 192#' ## The status code is 304 Not Modified 193#' res2 <- synchronise(dl()) 194#' res2$response$status_code 195#' 196#' ## HTTP errors contain the response 197#' dest <- tempfile() 198#' etag <- tempfile() 199#' err <- tryCatch( 200#' synchronise(download_if_newer("https://httpbin.org/status/418", 201#' dest, etag)), 202#' error = function(e) e 203#' ) 204#' err 205#' names(err) 206#' cat(rawToChar(err$response$content)) 207#' ``` 208 209download_if_newer <- function(url, destfile, etag_file = NULL, 210 headers = NULL, 211 tmp_destfile = paste0(destfile, ".tmp"), 212 error_on_status = TRUE, 213 options = list(), ...) { 214 "!DEBUG download if newer `url`" 215 headers <- headers %||% structure(character(), names = character()) 216 assert_that( 217 is_string(url), 218 is_path(destfile), 219 is_path(tmp_destfile), 220 is_path_or_null(etag_file), 221 is.character(headers), all_named(headers), 222 is_flag(error_on_status), 223 is.list(options)) 224 force(list(...)) 225 226 options <- update_async_timeouts(options) 227 etag_old <- get_etag_header_from_file(destfile, etag_file) 228 headers <- c(headers, etag_old) 229 230 destfile <- normalizePath(destfile, mustWork = FALSE) 231 tmp_destfile <- normalizePath(tmp_destfile, mustWork = FALSE) 232 mkdirp(dirname(tmp_destfile)) 233 234 http_get(url, file = tmp_destfile, headers = headers, 235 options = options, ...)$ 236 then(http_stop_for_status)$ 237 then(function(resp) { 238 if (resp$status_code == 304) { 239 "!DEBUG download not needed, `url` current" 240 etag <- unname(etag_old) 241 } else if (resp$status_code == 200) { 242 "!DEBUG downloaded `url`" 243 file.rename(tmp_destfile, destfile) 244 etag <- parse_headers_list(resp$headers)[["etag"]] %||% NA_character_ 245 if (!is.null(etag_file) && !is.na(etag[1])) { 246 mkdirp(dirname(etag_file)) 247 writeLines(etag, etag_file) 248 } 249 } else { 250 err <- structure( 251 list(response = resp, message = "Unknown HTTP response"), 252 class = c("error", "condition")) 253 stop(err) 254 } 255 list(url = url, destfile = destfile, response = resp, etag = etag, 256 etag_file = etag_file) 257 })$ 258 catch(error = function(err) { 259 "!DEBUG downloading `url` failed" 260 err$destfile <- destfile 261 err$url <- url 262 if (error_on_status) stop(err) else err 263 }) 264 265} 266 267#' Download a files from multiple candidate URLs 268#' 269#' Uses [download_if_newer()] to starts downloads in parallel, and the 270#' download that completes first is kept. (The others will be cancelled.) 271#' Download errors are ignored, as long as at least one download completes 272#' successfully. 273#' 274#' It also uses ETags, so if the destination file already exists, and one 275#' of the URLs contain the same file (and this request completes first), 276#' the file is not downloaded again. 277#' 278#' If all URLs fail, then `download_one_of` throws an error of class 279#' `download_one_of_error`. The error object contains all errors from 280#' the underlying [download_if_newer()] calls, in a list, in the 281#' `errors` member. 282#' 283#' @inheritParams download_if_newer 284#' @param urls A non-empty character vector of alternative URLs to try. 285#' @inherit download_if_newer return 286#' 287#' @family async HTTP tools 288#' @noRd 289#' @section Examples: 290#' ``` 291#' dest <- tempfile() 292#' ## The first URL answers after a 1s delay, 293#' ## the second after a 10s delay, 294#' ## the third throws an error immediately, so it will be ignored. 295#' ## Once the first URL responds, the second is cancelled, so the call 296#' ## will finish well before the 10s are over. 297#' dl <- function() { 298#' download_one_of( 299#' c("https://httpbin.org/delay/1", 300#' "https://httpbin.org/delay/10", 301#' "https://httpbin.org/status/404"), 302#' dest) 303#' } 304#' system.time(res <- synchronise(dl())) 305#' file.exists(dest) 306#' readLines(dest) 307#' 308#' ## Which URL responded 309#' res$response$url 310#' 311#' ## If all URLs fail 312#' dl2 <- function() { 313#' download_one_of( 314#' c("https://httpbin.org/status/418", 315#' "https://httpbin.org/status/401"), 316#' tempfile() 317#' ) 318#' } 319#' res <- tryCatch(synchronise(dl2()), error = function(e) e) 320#' res 321#' res$errors 322#' cat(rawToChar(res$errors[[1]]$response$content)) 323#' ``` 324 325download_one_of <- function(urls, destfile, etag_file = NULL, 326 headers = NULL, error_on_status = TRUE, 327 options = list(), ...) { 328 "!DEBUG trying multiple URLs" 329 headers <- headers %||% structure(character(), names = character()) 330 assert_that( 331 is_character(urls), length(urls) >= 1, 332 is_path(destfile), 333 is_path_or_null(etag_file), 334 is.character(headers), all_named(headers), 335 is_flag(error_on_status), 336 is.list(options)) 337 force(list(...)) 338 339 options <- update_async_timeouts(options) 340 tmps <- paste0(destfile, ".tmp.", seq_along(urls)) 341 dls <- mapply( 342 download_if_newer, url = urls, tmp_destfile = tmps, 343 MoreArgs = list(destfile = destfile, etag_file = etag_file, 344 headers = headers, options = options, ...), 345 SIMPLIFY = FALSE) 346 347 when_any(.list = dls)$ 348 catch(error = function(err) { 349 err$message <- "All URLs failed" 350 class(err) <- c("download_one_of_error", class(err)) 351 if (error_on_status) stop(err) else err 352 }) 353} 354 355download_files <- function(data, error_on_status = TRUE, 356 options = list(), ...) { 357 358 if (any(dup <- duplicated(data$path))) { 359 stop("Duplicate target paths in download_files: ", 360 paste0("`", unique(data$path[dup]), "`", collapse = ", "), ".") 361 } 362 363 options <- update_async_timeouts(options) 364 bar <- create_progress_bar(data) 365 prog_cb <- function(upd) update_progress_bar_progress(bar, upd) 366 367 dls <- lapply(seq_len(nrow(data)), function(idx) { 368 row <- data[idx, ] 369 dx <- download_if_newer( 370 row$url, row$path, row$etag, 371 on_progress = prog_cb, 372 error_on_status = error_on_status, 373 options = options, ... 374 ) 375 376 if ("fallback_url" %in% names(row) && !is.na(row$fallback_url)) { 377 dx <- dx$catch(error = function(err) { 378 download_if_newer( 379 row$fallback_url, row$path, row$etag, 380 error_on_status = error_on_status, 381 options = options, ... 382 ) 383 }) 384 } 385 386 dx <- dx$ 387 then(function(result) { 388 status_code <- result$response$status_code 389 if (status_code == 304) { 390 update_progress_bar_uptodate(bar, row$url) 391 } else { 392 update_progress_bar_done(bar, row$url) 393 } 394 result 395 }) 396 397 if (isTRUE(row$mayfail)) { 398 dx$catch(error = function(err) { 399 cat("", file = row$path, append = TRUE) 400 err 401 }) 402 } else { 403 dx 404 } 405 }) 406 407 ok <- FALSE 408 when_all(.list = dls)$ 409 then(function(result) { ok <<- TRUE; result })$ 410 finally(function() finish_progress_bar(ok, bar)) 411} 412