1#
2# Licensed to the Apache Software Foundation (ASF) under one or more
3# contributor license agreements.  See the NOTICE file distributed with
4# this work for additional information regarding copyright ownership.
5# The ASF licenses this file to You under the Apache License, Version 2.0
6# (the "License"); you may not use this file except in compliance with
7# the License.  You may obtain a copy of the License at
8#
9#    http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17
18# Functions to install Spark in case the user directly downloads SparkR
19# from CRAN.
20
21#' Download and Install Apache Spark to a Local Directory
22#'
23#' \code{install.spark} downloads and installs Spark to a local directory if
24#' it is not found. If SPARK_HOME is set in the environment, and that directory is found, that is
25#' returned. The Spark version we use is the same as the SparkR version. Users can specify a desired
26#' Hadoop version, the remote mirror site, and the directory where the package is installed locally.
27#'
28#' The full url of remote file is inferred from \code{mirrorUrl} and \code{hadoopVersion}.
29#' \code{mirrorUrl} specifies the remote path to a Spark folder. It is followed by a subfolder
30#' named after the Spark version (that corresponds to SparkR), and then the tar filename.
31#' The filename is composed of four parts, i.e. [Spark version]-bin-[Hadoop version].tgz.
32#' For example, the full path for a Spark 2.0.0 package for Hadoop 2.7 from
33#' \code{http://apache.osuosl.org} has path:
34#' \code{http://apache.osuosl.org/spark/spark-2.0.0/spark-2.0.0-bin-hadoop2.7.tgz}.
35#' For \code{hadoopVersion = "without"}, [Hadoop version] in the filename is then
36#' \code{without-hadoop}.
37#'
38#' @param hadoopVersion Version of Hadoop to install. Default is \code{"2.7"}. It can take other
39#'                      version number in the format of "x.y" where x and y are integer.
40#'                      If \code{hadoopVersion = "without"}, "Hadoop free" build is installed.
41#'                      See
42#'                      \href{http://spark.apache.org/docs/latest/hadoop-provided.html}{
43#'                      "Hadoop Free" Build} for more information.
44#'                      Other patched version names can also be used, e.g. \code{"cdh4"}
45#' @param mirrorUrl base URL of the repositories to use. The directory layout should follow
46#'                  \href{http://www.apache.org/dyn/closer.lua/spark/}{Apache mirrors}.
47#' @param localDir a local directory where Spark is installed. The directory contains
48#'                 version-specific folders of Spark packages. Default is path to
49#'                 the cache directory:
50#'                 \itemize{
51#'                   \item Mac OS X: \file{~/Library/Caches/spark}
52#'                   \item Unix: \env{$XDG_CACHE_HOME} if defined, otherwise \file{~/.cache/spark}
53#'                   \item Windows: \file{\%LOCALAPPDATA\%\\Apache\\Spark\\Cache}.
54#'                 }
55#' @param overwrite If \code{TRUE}, download and overwrite the existing tar file in localDir
56#'                  and force re-install Spark (in case the local directory or file is corrupted)
57#' @return the (invisible) local directory where Spark is found or installed
58#' @rdname install.spark
59#' @name install.spark
60#' @aliases install.spark
61#' @export
62#' @examples
63#'\dontrun{
64#' install.spark()
65#'}
66#' @note install.spark since 2.1.0
67#' @seealso See available Hadoop versions:
68#'          \href{http://spark.apache.org/downloads.html}{Apache Spark}
69install.spark <- function(hadoopVersion = "2.7", mirrorUrl = NULL,
70                          localDir = NULL, overwrite = FALSE) {
71  sparkHome <- Sys.getenv("SPARK_HOME")
72  if (isSparkRShell()) {
73    stopifnot(nchar(sparkHome) > 0)
74    message("Spark is already running in sparkR shell.")
75    return(invisible(sparkHome))
76  } else if (!is.na(file.info(sparkHome)$isdir)) {
77    message("Spark package found in SPARK_HOME: ", sparkHome)
78    return(invisible(sparkHome))
79  }
80
81  version <- paste0("spark-", packageVersion("SparkR"))
82  hadoopVersion <- tolower(hadoopVersion)
83  hadoopVersionName <- hadoopVersionName(hadoopVersion)
84  packageName <- paste(version, "bin", hadoopVersionName, sep = "-")
85  localDir <- ifelse(is.null(localDir), sparkCachePath(),
86                     normalizePath(localDir, mustWork = FALSE))
87
88  if (is.na(file.info(localDir)$isdir)) {
89    dir.create(localDir, recursive = TRUE)
90  }
91
92  if (overwrite) {
93    message(paste0("Overwrite = TRUE: download and overwrite the tar file",
94                   "and Spark package directory if they exist."))
95  }
96
97  releaseUrl <- Sys.getenv("SPARKR_RELEASE_DOWNLOAD_URL")
98  if (releaseUrl != "") {
99    packageName <- basenameSansExtFromUrl(releaseUrl)
100  }
101
102  packageLocalDir <- file.path(localDir, packageName)
103
104  # can use dir.exists(packageLocalDir) under R 3.2.0 or later
105  if (!is.na(file.info(packageLocalDir)$isdir) && !overwrite) {
106    if (releaseUrl != "") {
107      message(paste(packageName, "found, setting SPARK_HOME to", packageLocalDir))
108    } else {
109      fmt <- "%s for Hadoop %s found, setting SPARK_HOME to %s"
110      msg <- sprintf(fmt, version, ifelse(hadoopVersion == "without", "Free build", hadoopVersion),
111                     packageLocalDir)
112      message(msg)
113    }
114    Sys.setenv(SPARK_HOME = packageLocalDir)
115    return(invisible(packageLocalDir))
116  } else {
117    message("Spark not found in the cache directory. Installation will start.")
118  }
119
120  packageLocalPath <- paste0(packageLocalDir, ".tgz")
121  tarExists <- file.exists(packageLocalPath)
122
123  if (tarExists && !overwrite) {
124    message("tar file found.")
125  } else {
126    if (releaseUrl != "") {
127      message("Downloading from alternate URL:\n- ", releaseUrl)
128      success <- downloadUrl(releaseUrl, packageLocalPath)
129      if (!success) {
130        unlink(packageLocalPath)
131        stop(paste0("Fetch failed from ", releaseUrl))
132      }
133    } else {
134      robustDownloadTar(mirrorUrl, version, hadoopVersion, packageName, packageLocalPath)
135    }
136  }
137
138  message(sprintf("Installing to %s", localDir))
139  # There are two ways untar can fail - untar could stop() on errors like incomplete block on file
140  # or, tar command can return failure code
141  success <- tryCatch(untar(tarfile = packageLocalPath, exdir = localDir) == 0,
142                     error = function(e) {
143                       message(e)
144                       message()
145                       FALSE
146                     },
147                     warning = function(w) {
148                       # Treat warning as error, add an empty line with message()
149                       message(w)
150                       message()
151                       FALSE
152                     })
153  if (!tarExists || overwrite || !success) {
154    unlink(packageLocalPath)
155  }
156  if (!success) stop("Extract archive failed.")
157  message("DONE.")
158  Sys.setenv(SPARK_HOME = packageLocalDir)
159  message(paste("SPARK_HOME set to", packageLocalDir))
160  invisible(packageLocalDir)
161}
162
163robustDownloadTar <- function(mirrorUrl, version, hadoopVersion, packageName, packageLocalPath) {
164  # step 1: use user-provided url
165  if (!is.null(mirrorUrl)) {
166    message("Use user-provided mirror site: ", mirrorUrl)
167    success <- directDownloadTar(mirrorUrl, version, hadoopVersion,
168                                   packageName, packageLocalPath)
169    if (success) {
170      return()
171    } else {
172      message(paste0("Unable to download from mirrorUrl: ", mirrorUrl))
173    }
174  } else {
175    message("MirrorUrl not provided.")
176  }
177
178  # step 2: use url suggested from apache website
179  message("Looking for preferred site from apache website...")
180  mirrorUrl <- getPreferredMirror(version, packageName)
181  if (!is.null(mirrorUrl)) {
182    success <- directDownloadTar(mirrorUrl, version, hadoopVersion,
183                                   packageName, packageLocalPath)
184    if (success) return()
185  } else {
186    message("Unable to download from preferred mirror site: ", mirrorUrl)
187  }
188
189  # step 3: use backup option
190  message("To use backup site...")
191  mirrorUrl <- defaultMirrorUrl()
192  success <- directDownloadTar(mirrorUrl, version, hadoopVersion,
193                                 packageName, packageLocalPath)
194  if (success) {
195    return()
196  } else {
197    # remove any partially downloaded file
198    unlink(packageLocalPath)
199    message("Unable to download from default mirror site: ", mirrorUrl)
200    msg <- sprintf(paste("Unable to download Spark %s for Hadoop %s.",
201                         "Please check network connection, Hadoop version,",
202                         "or provide other mirror sites."),
203                   version, ifelse(hadoopVersion == "without", "Free build", hadoopVersion))
204    stop(msg)
205  }
206}
207
208getPreferredMirror <- function(version, packageName) {
209  jsonUrl <- paste0("http://www.apache.org/dyn/closer.cgi?path=",
210                        file.path("spark", version, packageName),
211                        ".tgz&as_json=1")
212  textLines <- readLines(jsonUrl, warn = FALSE)
213  rowNum <- grep("\"preferred\"", textLines)
214  linePreferred <- textLines[rowNum]
215  matchInfo <- regexpr("\"[A-Za-z][A-Za-z0-9+-.]*://.+\"", linePreferred)
216  if (matchInfo != -1) {
217    startPos <- matchInfo + 1
218    endPos <- matchInfo + attr(matchInfo, "match.length") - 2
219    mirrorPreferred <- base::substr(linePreferred, startPos, endPos)
220    mirrorPreferred <- paste0(mirrorPreferred, "spark")
221    message(sprintf("Preferred mirror site found: %s", mirrorPreferred))
222  } else {
223    mirrorPreferred <- NULL
224  }
225  mirrorPreferred
226}
227
228directDownloadTar <- function(mirrorUrl, version, hadoopVersion, packageName, packageLocalPath) {
229  packageRemotePath <- paste0(file.path(mirrorUrl, version, packageName), ".tgz")
230  fmt <- "Downloading %s for Hadoop %s from:\n- %s"
231  msg <- sprintf(fmt, version, ifelse(hadoopVersion == "without", "Free build", hadoopVersion),
232                 packageRemotePath)
233  message(msg)
234  downloadUrl(packageRemotePath, packageLocalPath)
235}
236
237downloadUrl <- function(remotePath, localPath) {
238  isFail <- tryCatch(download.file(remotePath, localPath),
239                     error = function(e) {
240                       message(e)
241                       message()
242                       TRUE
243                     },
244                     warning = function(w) {
245                       # Treat warning as error, add an empty line with message()
246                       message(w)
247                       message()
248                       TRUE
249                     })
250  !isFail
251}
252
253defaultMirrorUrl <- function() {
254  "http://www-us.apache.org/dist/spark"
255}
256
257hadoopVersionName <- function(hadoopVersion) {
258  if (hadoopVersion == "without") {
259    "without-hadoop"
260  } else if (grepl("^[0-9]+\\.[0-9]+$", hadoopVersion, perl = TRUE)) {
261    paste0("hadoop", hadoopVersion)
262  } else {
263    hadoopVersion
264  }
265}
266
267# The implementation refers to appdirs package: https://pypi.python.org/pypi/appdirs and
268# adapt to Spark context
269sparkCachePath <- function() {
270  if (.Platform$OS.type == "windows") {
271    winAppPath <- Sys.getenv("LOCALAPPDATA", unset = NA)
272    if (is.na(winAppPath)) {
273      stop(paste("%LOCALAPPDATA% not found.",
274                   "Please define the environment variable",
275                   "or restart and enter an installation path in localDir."))
276    } else {
277      path <- file.path(winAppPath, "Apache", "Spark", "Cache")
278    }
279  } else if (.Platform$OS.type == "unix") {
280    if (Sys.info()["sysname"] == "Darwin") {
281      path <- file.path(Sys.getenv("HOME"), "Library/Caches", "spark")
282    } else {
283      path <- file.path(
284        Sys.getenv("XDG_CACHE_HOME", file.path(Sys.getenv("HOME"), ".cache")), "spark")
285    }
286  } else {
287    stop(sprintf("Unknown OS: %s", .Platform$OS.type))
288  }
289  normalizePath(path, mustWork = FALSE)
290}
291
292
293installInstruction <- function(mode) {
294  if (mode == "remote") {
295    paste0("Connecting to a remote Spark master. ",
296           "Please make sure Spark package is also installed in this machine.\n",
297           "- If there is one, set the path in sparkHome parameter or ",
298           "environment variable SPARK_HOME.\n",
299           "- If not, you may run install.spark function to do the job. ",
300           "Please make sure the Spark and the Hadoop versions ",
301           "match the versions on the cluster. ",
302           "SparkR package is compatible with Spark ", packageVersion("SparkR"), ".",
303           "If you need further help, ",
304           "contact the administrators of the cluster.")
305  } else {
306    stop(paste0("No instruction found for ", mode, " mode."))
307  }
308}
309