1# 2# Licensed to the Apache Software Foundation (ASF) under one or more 3# contributor license agreements. See the NOTICE file distributed with 4# this work for additional information regarding copyright ownership. 5# The ASF licenses this file to You under the Apache License, Version 2.0 6# (the "License"); you may not use this file except in compliance with 7# the License. You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16# 17 18# Functions to install Spark in case the user directly downloads SparkR 19# from CRAN. 20 21#' Download and Install Apache Spark to a Local Directory 22#' 23#' \code{install.spark} downloads and installs Spark to a local directory if 24#' it is not found. If SPARK_HOME is set in the environment, and that directory is found, that is 25#' returned. The Spark version we use is the same as the SparkR version. Users can specify a desired 26#' Hadoop version, the remote mirror site, and the directory where the package is installed locally. 27#' 28#' The full url of remote file is inferred from \code{mirrorUrl} and \code{hadoopVersion}. 29#' \code{mirrorUrl} specifies the remote path to a Spark folder. It is followed by a subfolder 30#' named after the Spark version (that corresponds to SparkR), and then the tar filename. 31#' The filename is composed of four parts, i.e. [Spark version]-bin-[Hadoop version].tgz. 32#' For example, the full path for a Spark 2.0.0 package for Hadoop 2.7 from 33#' \code{http://apache.osuosl.org} has path: 34#' \code{http://apache.osuosl.org/spark/spark-2.0.0/spark-2.0.0-bin-hadoop2.7.tgz}. 35#' For \code{hadoopVersion = "without"}, [Hadoop version] in the filename is then 36#' \code{without-hadoop}. 37#' 38#' @param hadoopVersion Version of Hadoop to install. Default is \code{"2.7"}. It can take other 39#' version number in the format of "x.y" where x and y are integer. 40#' If \code{hadoopVersion = "without"}, "Hadoop free" build is installed. 41#' See 42#' \href{http://spark.apache.org/docs/latest/hadoop-provided.html}{ 43#' "Hadoop Free" Build} for more information. 44#' Other patched version names can also be used, e.g. \code{"cdh4"} 45#' @param mirrorUrl base URL of the repositories to use. The directory layout should follow 46#' \href{http://www.apache.org/dyn/closer.lua/spark/}{Apache mirrors}. 47#' @param localDir a local directory where Spark is installed. The directory contains 48#' version-specific folders of Spark packages. Default is path to 49#' the cache directory: 50#' \itemize{ 51#' \item Mac OS X: \file{~/Library/Caches/spark} 52#' \item Unix: \env{$XDG_CACHE_HOME} if defined, otherwise \file{~/.cache/spark} 53#' \item Windows: \file{\%LOCALAPPDATA\%\\Apache\\Spark\\Cache}. 54#' } 55#' @param overwrite If \code{TRUE}, download and overwrite the existing tar file in localDir 56#' and force re-install Spark (in case the local directory or file is corrupted) 57#' @return the (invisible) local directory where Spark is found or installed 58#' @rdname install.spark 59#' @name install.spark 60#' @aliases install.spark 61#' @export 62#' @examples 63#'\dontrun{ 64#' install.spark() 65#'} 66#' @note install.spark since 2.1.0 67#' @seealso See available Hadoop versions: 68#' \href{http://spark.apache.org/downloads.html}{Apache Spark} 69install.spark <- function(hadoopVersion = "2.7", mirrorUrl = NULL, 70 localDir = NULL, overwrite = FALSE) { 71 sparkHome <- Sys.getenv("SPARK_HOME") 72 if (isSparkRShell()) { 73 stopifnot(nchar(sparkHome) > 0) 74 message("Spark is already running in sparkR shell.") 75 return(invisible(sparkHome)) 76 } else if (!is.na(file.info(sparkHome)$isdir)) { 77 message("Spark package found in SPARK_HOME: ", sparkHome) 78 return(invisible(sparkHome)) 79 } 80 81 version <- paste0("spark-", packageVersion("SparkR")) 82 hadoopVersion <- tolower(hadoopVersion) 83 hadoopVersionName <- hadoopVersionName(hadoopVersion) 84 packageName <- paste(version, "bin", hadoopVersionName, sep = "-") 85 localDir <- ifelse(is.null(localDir), sparkCachePath(), 86 normalizePath(localDir, mustWork = FALSE)) 87 88 if (is.na(file.info(localDir)$isdir)) { 89 dir.create(localDir, recursive = TRUE) 90 } 91 92 if (overwrite) { 93 message(paste0("Overwrite = TRUE: download and overwrite the tar file", 94 "and Spark package directory if they exist.")) 95 } 96 97 releaseUrl <- Sys.getenv("SPARKR_RELEASE_DOWNLOAD_URL") 98 if (releaseUrl != "") { 99 packageName <- basenameSansExtFromUrl(releaseUrl) 100 } 101 102 packageLocalDir <- file.path(localDir, packageName) 103 104 # can use dir.exists(packageLocalDir) under R 3.2.0 or later 105 if (!is.na(file.info(packageLocalDir)$isdir) && !overwrite) { 106 if (releaseUrl != "") { 107 message(paste(packageName, "found, setting SPARK_HOME to", packageLocalDir)) 108 } else { 109 fmt <- "%s for Hadoop %s found, setting SPARK_HOME to %s" 110 msg <- sprintf(fmt, version, ifelse(hadoopVersion == "without", "Free build", hadoopVersion), 111 packageLocalDir) 112 message(msg) 113 } 114 Sys.setenv(SPARK_HOME = packageLocalDir) 115 return(invisible(packageLocalDir)) 116 } else { 117 message("Spark not found in the cache directory. Installation will start.") 118 } 119 120 packageLocalPath <- paste0(packageLocalDir, ".tgz") 121 tarExists <- file.exists(packageLocalPath) 122 123 if (tarExists && !overwrite) { 124 message("tar file found.") 125 } else { 126 if (releaseUrl != "") { 127 message("Downloading from alternate URL:\n- ", releaseUrl) 128 success <- downloadUrl(releaseUrl, packageLocalPath) 129 if (!success) { 130 unlink(packageLocalPath) 131 stop(paste0("Fetch failed from ", releaseUrl)) 132 } 133 } else { 134 robustDownloadTar(mirrorUrl, version, hadoopVersion, packageName, packageLocalPath) 135 } 136 } 137 138 message(sprintf("Installing to %s", localDir)) 139 # There are two ways untar can fail - untar could stop() on errors like incomplete block on file 140 # or, tar command can return failure code 141 success <- tryCatch(untar(tarfile = packageLocalPath, exdir = localDir) == 0, 142 error = function(e) { 143 message(e) 144 message() 145 FALSE 146 }, 147 warning = function(w) { 148 # Treat warning as error, add an empty line with message() 149 message(w) 150 message() 151 FALSE 152 }) 153 if (!tarExists || overwrite || !success) { 154 unlink(packageLocalPath) 155 } 156 if (!success) stop("Extract archive failed.") 157 message("DONE.") 158 Sys.setenv(SPARK_HOME = packageLocalDir) 159 message(paste("SPARK_HOME set to", packageLocalDir)) 160 invisible(packageLocalDir) 161} 162 163robustDownloadTar <- function(mirrorUrl, version, hadoopVersion, packageName, packageLocalPath) { 164 # step 1: use user-provided url 165 if (!is.null(mirrorUrl)) { 166 message("Use user-provided mirror site: ", mirrorUrl) 167 success <- directDownloadTar(mirrorUrl, version, hadoopVersion, 168 packageName, packageLocalPath) 169 if (success) { 170 return() 171 } else { 172 message(paste0("Unable to download from mirrorUrl: ", mirrorUrl)) 173 } 174 } else { 175 message("MirrorUrl not provided.") 176 } 177 178 # step 2: use url suggested from apache website 179 message("Looking for preferred site from apache website...") 180 mirrorUrl <- getPreferredMirror(version, packageName) 181 if (!is.null(mirrorUrl)) { 182 success <- directDownloadTar(mirrorUrl, version, hadoopVersion, 183 packageName, packageLocalPath) 184 if (success) return() 185 } else { 186 message("Unable to download from preferred mirror site: ", mirrorUrl) 187 } 188 189 # step 3: use backup option 190 message("To use backup site...") 191 mirrorUrl <- defaultMirrorUrl() 192 success <- directDownloadTar(mirrorUrl, version, hadoopVersion, 193 packageName, packageLocalPath) 194 if (success) { 195 return() 196 } else { 197 # remove any partially downloaded file 198 unlink(packageLocalPath) 199 message("Unable to download from default mirror site: ", mirrorUrl) 200 msg <- sprintf(paste("Unable to download Spark %s for Hadoop %s.", 201 "Please check network connection, Hadoop version,", 202 "or provide other mirror sites."), 203 version, ifelse(hadoopVersion == "without", "Free build", hadoopVersion)) 204 stop(msg) 205 } 206} 207 208getPreferredMirror <- function(version, packageName) { 209 jsonUrl <- paste0("http://www.apache.org/dyn/closer.cgi?path=", 210 file.path("spark", version, packageName), 211 ".tgz&as_json=1") 212 textLines <- readLines(jsonUrl, warn = FALSE) 213 rowNum <- grep("\"preferred\"", textLines) 214 linePreferred <- textLines[rowNum] 215 matchInfo <- regexpr("\"[A-Za-z][A-Za-z0-9+-.]*://.+\"", linePreferred) 216 if (matchInfo != -1) { 217 startPos <- matchInfo + 1 218 endPos <- matchInfo + attr(matchInfo, "match.length") - 2 219 mirrorPreferred <- base::substr(linePreferred, startPos, endPos) 220 mirrorPreferred <- paste0(mirrorPreferred, "spark") 221 message(sprintf("Preferred mirror site found: %s", mirrorPreferred)) 222 } else { 223 mirrorPreferred <- NULL 224 } 225 mirrorPreferred 226} 227 228directDownloadTar <- function(mirrorUrl, version, hadoopVersion, packageName, packageLocalPath) { 229 packageRemotePath <- paste0(file.path(mirrorUrl, version, packageName), ".tgz") 230 fmt <- "Downloading %s for Hadoop %s from:\n- %s" 231 msg <- sprintf(fmt, version, ifelse(hadoopVersion == "without", "Free build", hadoopVersion), 232 packageRemotePath) 233 message(msg) 234 downloadUrl(packageRemotePath, packageLocalPath) 235} 236 237downloadUrl <- function(remotePath, localPath) { 238 isFail <- tryCatch(download.file(remotePath, localPath), 239 error = function(e) { 240 message(e) 241 message() 242 TRUE 243 }, 244 warning = function(w) { 245 # Treat warning as error, add an empty line with message() 246 message(w) 247 message() 248 TRUE 249 }) 250 !isFail 251} 252 253defaultMirrorUrl <- function() { 254 "http://www-us.apache.org/dist/spark" 255} 256 257hadoopVersionName <- function(hadoopVersion) { 258 if (hadoopVersion == "without") { 259 "without-hadoop" 260 } else if (grepl("^[0-9]+\\.[0-9]+$", hadoopVersion, perl = TRUE)) { 261 paste0("hadoop", hadoopVersion) 262 } else { 263 hadoopVersion 264 } 265} 266 267# The implementation refers to appdirs package: https://pypi.python.org/pypi/appdirs and 268# adapt to Spark context 269sparkCachePath <- function() { 270 if (.Platform$OS.type == "windows") { 271 winAppPath <- Sys.getenv("LOCALAPPDATA", unset = NA) 272 if (is.na(winAppPath)) { 273 stop(paste("%LOCALAPPDATA% not found.", 274 "Please define the environment variable", 275 "or restart and enter an installation path in localDir.")) 276 } else { 277 path <- file.path(winAppPath, "Apache", "Spark", "Cache") 278 } 279 } else if (.Platform$OS.type == "unix") { 280 if (Sys.info()["sysname"] == "Darwin") { 281 path <- file.path(Sys.getenv("HOME"), "Library/Caches", "spark") 282 } else { 283 path <- file.path( 284 Sys.getenv("XDG_CACHE_HOME", file.path(Sys.getenv("HOME"), ".cache")), "spark") 285 } 286 } else { 287 stop(sprintf("Unknown OS: %s", .Platform$OS.type)) 288 } 289 normalizePath(path, mustWork = FALSE) 290} 291 292 293installInstruction <- function(mode) { 294 if (mode == "remote") { 295 paste0("Connecting to a remote Spark master. ", 296 "Please make sure Spark package is also installed in this machine.\n", 297 "- If there is one, set the path in sparkHome parameter or ", 298 "environment variable SPARK_HOME.\n", 299 "- If not, you may run install.spark function to do the job. ", 300 "Please make sure the Spark and the Hadoop versions ", 301 "match the versions on the cluster. ", 302 "SparkR package is compatible with Spark ", packageVersion("SparkR"), ".", 303 "If you need further help, ", 304 "contact the administrators of the cluster.") 305 } else { 306 stop(paste0("No instruction found for ", mode, " mode.")) 307 } 308} 309