1# Definitions used in the parallel computations of NMF 2# 3# - reproducible backend 4# - reproducible %dopar% operator: %dorng% 5# 6# Author: Renaud Gaujoux 7# Creation: 08-Feb-2011 8############################################################################### 9 10#' @include utils.R 11#' @import foreach 12#' @import doParallel 13NULL 14 15# returns the number of cores to use in all NMF computation when no number is 16# specified by the user 17getMaxCores <- function(limit=TRUE){ 18 #ceiling(parallel::detectCores()/2) 19 nt <- n <- parallel::detectCores() 20 # limit to number of cores specified in options if asked for 21 if( limit ){ 22 if( !is.null(nc <- getOption('cores')) ) n <- nc # global option 23 else if( !is.null(nc <- nmf.getOption('cores')) ) n <- nc # NMF-specific option 24 else if( n > 2 ) n <- n - 1L # leave one core free if possible 25 } 26 # forces limiting maximum number of cores to 2 during CRAN checks 27 if( n > 2 && isCHECK() ){ 28 message("# NOTE - CRAN check detected: limiting maximum number of cores [2/", nt, "]") 29 n <- 2L 30 } 31 n 32} 33 34#' Utilities and Extensions for Foreach Loops 35#' 36#' \code{registerDoBackend} is a unified register function for foreach backends. 37#' 38#' @param object specification of a foreach backend, e.g. \sQuote{SEQ}, 39#' \sQuote{PAR} (for doParallel), \sQuote{MPI}, etc\ldots 40#' @param ... extra arguments passed to the backend own registration function. 41#' 42#' @keywords internal 43#' @rdname foreach 44registerDoBackend <- function(object, ...){ 45 46 # restore old backend data in case of an error 47 old <- getDoBackend() 48 on.exit( setDoBackend(old) ) 49 50 # get old foreach backend object 51 ob <- ForeachBackend() 52 53 # register new backend: call the register method 54 b <- ForeachBackend(object, ...) 55 res <- register(b) 56 57 # cancel backend restoration 58 on.exit() 59 # call old backend cleanup method 60 doBackendCleanup(ob) 61 62 # return old backend 63 invisible(ob) 64} 65 66#' \code{getDoBackend} returns the internal data of the currently registered foreach \%dopar\% backend. 67#' @rdname foreach 68#' @export 69getDoBackend <- function(){ 70 fe_ns <- asNamespace('foreach') 71 fe <- ns_get('.foreachGlobals', fe_ns) 72 if( !exists("fun", where = fe, inherits = FALSE) ) 73 return(NULL) 74 75 getDoPar <- ns_get('getDoPar', fe_ns) 76 c(getDoPar() # this returns the registered %dopar% function + associated data 77 # -> add info function from foreach internal environment 78 , info= if( exists("info", where = fe, inherits = FALSE) ){ 79 get('info', fe, inherits=FALSE) 80 }else{ 81 function(data, item) NULL 82 } 83 , cleanup = if( exists("cleanup", where = fe, inherits = FALSE) ){ 84 get('cleanup', fe, inherits=FALSE) 85 } 86 ) 87} 88 89getDoBackendInfo <- function(x, item){ 90 if( is.function(x$info) ) x$info(x$data, item) 91} 92getDoBackendName <- function(x){ 93 getDoBackendInfo(x, 'name') 94} 95 96#' \code{setDoBackend} is identical to \code{\link[foreach]{setDoPar}}, but 97#' returns the internal of the previously registered backend. 98#' 99#' @param data internal data of a foreach \%dopar\% backend. 100#' @param cleanup logical that indicates if the previous 101#' backend's cleanup procedure should be run, \strong{before} 102#' setting the new backend. 103#' 104#' @export 105#' @rdname foreach 106setDoBackend <- function(data, cleanup=FALSE){ 107 108 # get old backend data 109 ob <- getDoBackend() 110 ofb <- ForeachBackend() 111 # cleanup old backend if requested 112 if( cleanup ){ 113 doBackendCleanup(ofb) 114 } 115 116 if( !is.null(data) ){ 117 bdata <- data 118 if( is.backend(data) ) data <- data[!names(data) %in% c('name', 'cleanup')] 119 do.call('setDoPar', data) 120 setBackendCleanup(bdata) 121 }else{ 122 do.call('setDoPar', list(NULL)) 123 fe <- ns_get('.foreachGlobals', 'foreach') 124 if (exists("fun", envir = fe, inherits = FALSE)) 125 remove("fun", envir = fe) 126 setBackendCleanup(NULL) 127 } 128 # return old backend 129 invisible(ob) 130} 131 132# setup cleanup procedure for the current backend 133setBackendCleanup <- function(object, fun, verbose=FALSE){ 134 135 fe <- ns_get('.foreachGlobals', 'foreach') 136 name <- getDoParName() 137 if( !is.null(fun <- object$cleanup) ){ 138 if( verbose ) message("# Registering cleaning up function for '", name, "'... ", appendLF=FALSE) 139 assign('cleanup', fun, fe) 140 if( verbose ) message("OK") 141 }else if (exists("cleanup", envir = fe, inherits = FALSE)){ 142 if( verbose ) message("# Removing cleaning up function for '", name, "'... ", appendLF=FALSE) 143 remove("cleanup", envir = fe) 144 if( verbose ) message("OK") 145 } 146 invisible(object) 147} 148 149# run cleanup procedure for a given backend object 150doBackendCleanup <- function(object, ..., run=TRUE, verbose=FALSE){ 151 152 name <- object$name 153 if( !is.null(fun <- object$cleanup) ){ 154 if( verbose ) message("# Cleaning up '", name, "'... ", appendLF=FALSE) 155 res <- try(fun(), silent=TRUE) 156 if( verbose ) message(if( is(res, 'try-error') ) 'ERROR' else 'OK') 157 if( isTRUE(res) ) object$cleanup <- NULL 158 if( verbose ) message('OK', if( !is.null(res) ) str_c(' [', res,']')) 159 } 160 invisible(object) 161} 162 163#' \code{register} is a generic function that register objects. 164#' It is used to as a unified interface to register foreach backends. 165#' 166#' @param x specification of a foreach backend 167#' 168#' @rdname foreach 169#' @export 170register <- function(x, ...){ 171 UseMethod('register', x) 172} 173#' @export 174register.foreach_backend <- function(x, ...){ 175 176 be <- x$name 177 # For everything except doSEQ: 178 # require definition package (it is safer to re-check) 179 if( be != 'doSEQ' ){ 180 if( !require.quiet(be, character.only=TRUE) ) 181 stop("Package '", be, "' is required to use foreach backend '", be, "'") 182 } 183 184 regfun <- .foreach_regfun(x$name) 185 res <- 186 if( length(formals(regfun)) > 0L ) do.call(regfun, c(x$data, ...)) 187 else regfun() 188 # throw an error if not successful (foreach::setDoPar do not throw errors!!) 189 if( is(res, 'simpleError') ) stop(res) 190 # set cleanup procedure if any 191 setBackendCleanup(x) 192 # return result 193 invisible(res) 194} 195 196#' \code{ForeachBackend} is a factory method for foreach backend objects. 197#' 198#' @export 199#' @inline 200#' @rdname foreach 201setGeneric('ForeachBackend', function(object, ...) standardGeneric('ForeachBackend')) 202#' Default method defined to throw an informative error message, when no other 203#' method was found. 204setMethod('ForeachBackend', 'ANY', 205 function(object, ...){ 206 if( is.backend(object) ){ 207 # update arg list if necessary 208 if( nargs() > 1L ) object$data <- list(...) 209 object 210 }else if( is(object, 'cluster') ) 211 selectMethod('ForeachBackend', 'cluster')(object, ...) 212 else 213 stop("Could not create foreach backend object with a specification of class '", class(object)[1L], "'") 214 } 215) 216 217formatDoName <- function(x){ 218 219 # numeric values are resolved as doParallel 220 if( is.numeric(x) ) x <- 'PAR' 221 if( is.character(x) ){ 222 # use upper case if not already specified as 'do*' 223 if( !grepl("^do", x) ){ 224 x <- toupper(x) 225 # special treatment for doParallel 226 if( x %in% c('PAR', 'PARALLEL') ) x <- 'Parallel' 227 } 228 # stick prefix 'do' (removing leading 'do' if necessary) 229 str_c('do', sub('^do', '', x)) 230 }else 231 '' 232} 233#' Creates a foreach backend object based on its name. 234setMethod('ForeachBackend', 'character', 235 function(object, ...){ 236 237 object <- formatDoName(object) 238 239 # build S3 class name 240 s3class <- str_c(object, "_backend") 241 242 # create empty S3 object 243 obj <- structure(list(name=object, data=list(...)) 244 , class=c(s3class, 'foreach_backend')) 245 246 # give a chance to a backend-specific ForeachBackend factory method 247 # => this will generally fill the object with the elements suitable 248 # to be used in a call to foreach::setDoPar: fun, data, info 249 # and possibly change the name or the object class, e.g. to allow 250 # subsequent argument-dependent dispatch. 251 obj <- ForeachBackend(obj, ...) 252 253 # check the registration routine is available 254 .foreach_regfun(obj$name) 255 256 # set data slot if not already set by the backend-specific method 257 if( is.null(obj$data) || (length(obj$data) == 0L && nargs()>1L) ) 258 obj$data <- list(...) 259 260 # return object 261 obj 262 } 263) 264#' Creates a foreach backend object for the currently registered backend. 265setMethod('ForeachBackend', 'missing', 266 function(object, ...){ 267 be <- getDoParName() 268 data <- getDoBackend() 269 bdata <- data$data 270 res <- if( !is.null(bdata) ) do.call(ForeachBackend, c(list(be, bdata), ...)) 271 else ForeachBackend(be, ...) 272 if( !is.null(data$cleanup) ) res$cleanup <- data$cleanup 273 res 274 } 275) 276#' Dummy method that returns \code{NULL}, defined for correct dispatch. 277setMethod('ForeachBackend', 'NULL', function(object, ...){ NULL }) 278 279setOldClass('cluster') 280#' Creates a doParallel foreach backend that uses the cluster described in 281#' \code{object}. 282setMethod('ForeachBackend', 'cluster', 283 function(object, ...){ 284 ForeachBackend('doParallel', cl=object) 285 } 286) 287#' Creates a doParallel foreach backend with \code{object} processes. 288setMethod('ForeachBackend', 'numeric', 289 function(object, ...){ 290 # check numeric specification 291 if( length(object) == 0L ) 292 stop("invalid number of cores specified as a backend [empty]") 293 object <- object[1] 294 if( object <= 0 ) 295 stop("invalid negative number of cores [", object, "] specified for backend 'doParallel'") 296 297 ForeachBackend('doParallel', cl=object, ...) 298 } 299) 300############### 301# doParallel 302############### 303setOldClass('doParallel_backend') 304#' doParallel-specific backend factory 305#' 306#' @param cl cluster specification: a cluster object or a numeric that indicates the 307#' number of nodes to use. 308#' @param type type of cluster, See \code{\link[parallel]{makeCluster}}. 309setMethod('ForeachBackend', 'doParallel_backend', 310 function(object, cl, type=NULL){ 311 312 # set type of cluster if explicitly provided 313 if( !is.null(type) ) object$data$type <- type 314 315 # required registration data 316 # NB: a function doParallel:::doParallel should exist and do the same 317 # thing as parallel::registerDoParallel without registering the backend 318 #object$fun <- doParallel:::doParallel 319# object$info <- doParallel:::info 320 # doParallel:::info has been removed from doParallel since version 1.0.7 321 # Reported in Issue #7 322 object$info <- getDoParallelInfo(object) 323 324 # return object 325 object 326 } 327) 328 329setOldClass('doParallelMC_backend') 330#' doParallel-specific backend factory for multicore (fork) clusters 331#' 332#' This method is needed since version 1.0.7 of \pkg{doParallel}, which removed 333#' internal function \code{info} and defined separate backend names for mc and snow clusters. 334setMethod('ForeachBackend', 'doParallelMC_backend', 335 function(object, ...){ 336 337 object$info <- getDoParallelInfo('mc') 338 object$name <- 'doParallel' 339 # return object 340 object 341 } 342) 343 344setOldClass('doParallelSNOW_backend') 345#' doParallel-specific backend factory for SNOW clusters. 346#' 347#' This method is needed since version 1.0.7 of \pkg{doParallel}, which removed 348#' internal function \code{info} and defined separate backend names for mc and snow clusters. 349setMethod('ForeachBackend', 'doParallelSNOW_backend', 350 function(object, ...){ 351 352 object$info <- getDoParallelInfo('snow') 353 object$name <- 'doParallel' 354 # return object 355 object 356 } 357) 358 359getDoParallelType <- function(x){ 360 361 362 cl <- x$data[['cl']] 363 if( is.null(cl) && length(x$data) && (is.null(names(x$data)) || names(x$data)[[1L]] == '') ) 364 cl <- x$data[[1L]] 365 if ( is.null(cl) || is.numeric(cl) ) { 366 if (.Platform$OS.type == "windows" || (!is.null(x$data$type) && !identical(x$data$type, 'FORK')) ) 'snow' 367 else 'mc' 368 } 369 else 'snow' 370 371} 372 373getDoParallelInfo <- function(x, ...){ 374 t <- if( isString(x) ) x else getDoParallelType(x, ...) 375# str(t) 376 ns <- asNamespace('doParallel') 377 if( t == 'mc' ) get('mcinfo', ns) 378 else get('snowinfo', ns) 379} 380 381###################################################### 382# doPSOCK 383# Default snow-like cluster from parallel on Windows 384# but works on Unix as well 385###################################################### 386 387setOldClass('doPSOCK_backend') 388#' doSNOW-specific backend factory 389setMethod('ForeachBackend', 'doPSOCK_backend', 390 function(object, cl){ 391 392 # use all available cores if not otherwise specified 393 if( missing(cl) ) cl <- getMaxCores() 394 395 # return equivalent doParallel object 396 ForeachBackend('doParallel', cl, type='PSOCK') 397 } 398) 399 400.cl_cleanup <- function(gvar, envir=.GlobalEnv){ 401 if( !exists(gvar, envir = envir) ) return() 402 cl <- get(gvar, envir = envir) 403 try( parallel::stopCluster(cl), silent=TRUE) 404 rm(list=gvar, envir = envir) 405 TRUE 406} 407 408cleanupCluster <- function(x, cl, stopFun=NULL){ 409 410 function(){ 411 412 if( is(x, 'doParallel_backend') ){ 413 414 # On non-Windows machines registerDoParallel(numeric) will use 415 # parallel::mclapply with `object` cores (no cleanup required). 416 # On Windows doParallel::registerDoParallel(numeric) will create a 417 # SOCKcluster with `object` cores. 418 # => Windows needs a cleanup function that will stop the cluster 419 # when another backend is registered. 420 # Fortunately doParallel::registerDoParallel assign the cluster object 421 # to the global variable `.revoDoParCluster` 422 if( .Platform$OS.type == "windows" ){ 423 .cl_cleanup(".revoDoParCluster") 424 } 425 } 426 427 if( is.null(stopFun) ) stopFun <- parallel::stopCluster 428 # stop cluster 429 stopFun(cl) 430 TRUE 431 } 432} 433 434#' @export 435register.doParallel_backend <- function(x, ...){ 436 437 # start cluster if numeric specification and type is defined 438 cl <- x$data[[1]] 439 if( is.numeric(cl) && (.Platform$OS.type == 'windows' || !is.null(x$data$type)) ){ 440 names(x$data)[1L] <- 'spec' 441 # start cluster 442 clObj <- do.call(parallel::makeCluster, x$data) 443 x$data <- list(clObj) 444 # setup cleanup procedure 445 x$cleanup <- cleanupCluster(x, clObj) 446 } 447 # register 448 register.foreach_backend(x, ...) 449} 450 451############### 452# doMPI 453############### 454 455isMPIBackend <- function(x, ...){ 456 b <- if( missing(x) ) ForeachBackend(...) else ForeachBackend(object=x, ...) 457 if( is.null(b) ) FALSE 458 else if( identical(b$name, 'doMPI') ) TRUE 459 else if( length(b$data) ){ 460 is(b$data[[1]], 'MPIcluster') || is(b$data[[1]], 'mpicluster') 461 }else FALSE 462} 463 464#' @export 465register.doMPI_backend <- function(x, ...){ 466 467 if( length(x$data) && isNumber(cl <- x$data[[1]]) ){ 468 clObj <- doMPI::startMPIcluster(cl) 469 x$data[[1]] <- clObj 470 # setup cleanup procedure 471 x$cleanup <- cleanupCluster(x, clObj, doMPI::closeCluster) 472 } 473 # register 474 register.foreach_backend(x, ...) 475} 476 477setOldClass('mpicluster') 478#' Creates a doMPI foreach backend that uses the MPI cluster described in 479#' \code{object}. 480setMethod('ForeachBackend', 'mpicluster', 481 function(object, ...){ 482 ForeachBackend('doMPI', cl=object) 483 } 484) 485 486setOldClass('doMPI_backend') 487#' doMPI-specific backend factory 488setMethod('ForeachBackend', 'doMPI_backend', 489 function(object, cl){ 490 491 # use all available cores if not otherwise specified 492 if( missing(cl) ) cl <- getMaxCores() 493 494 # required registration data 495 object$fun <- doMPI:::doMPI 496 object$info <- doMPI:::info 497 498 # return object 499 object 500 } 501) 502 503#as.foreach_backend <- function(x, ...){ 504# 505# args <- list(...) 506# if( is.backend(x) ){ 507# # update arg list if necessary 508# if( length(args) > 0L ) x$args <- args 509# return(x) 510# } 511# 512# be <- 513# if( is.null(x) ){ 514# getDoParName() 515# } else if( is(x, 'cluster') || is.numeric(x) ){ 516# # check numeric specification 517# if( is.numeric(x) ){ 518# if( length(x) == 0L ) 519# stop("invalid number of cores specified as a backend [empty]") 520# x <- x[1] 521# if( x <= 0 ) 522# stop("invalid negative number of cores [", x, "] specified for backend 'doParallel'") 523# } 524# 525# args$spec <- x 526# 'Parallel' 527# } else if( is(x, 'mpicluster') ){ 528# args$spec <- x 529# 'MPI' 530# } else if( is.character(x) ){ 531# toupper(x) 532# } else 533# stop("invalid backend specification: must be NULL, a valid backend name, a numeric value or a cluster object [", class(x)[1L], "]") 534# 535# if( be %in% c('PAR', 'PARALLEL') ) be <- 'Parallel' 536# # remove leading 'do' 537# be <- str_c('do', sub('^do', '', be)) 538# # build S3 class name 539# s3class <- str_c(be, "_backend") 540# 541# # check the registration routine is available 542# regfun <- .foreach_regfun(be) 543# 544# structure(list(name=be, args=args), class=c(s3class, 'foreach_backend')) 545#} 546 547is.backend <- function(x) is(x, 'foreach_backend') 548 549#' @export 550print.foreach_backend <- function(x, ...){ 551 cat("<foreach backend:", x$name, ">\n", sep='') 552 if( length(x$data) ){ 553 cat("Specifications:\n") 554 str(x$data) 555 } 556} 557 558.foreach_regfun <- function(name){ 559 560 # early exit for doSEQ 561 if( name == 'doSEQ' ) return( registerDoSEQ ) 562 563 # build name of registration function 564 s <- str_c(toupper(substring(name, 1,1)), substring(name, 2)) 565 funname <- str_c('register', s) 566 s3class <- str_c(name, "_backend") 567 568 # require definition package 569 if( !require.quiet(name, character.only=TRUE) ) 570 stop("could not find package for foreach backend '", name, "'") 571 # check for registering function or generic 572 if( is.null(regfun <- getFunction(funname, mustFind=FALSE, where=asNamespace(name))) ){ 573 if( is.null(regfun <- getS3method('register', s3class, optional=TRUE)) ) 574 stop("could not find registration routine for foreach backend '", name, "'") 575 # stop("backend '", name,"' is not supported: function " 576 # ,"`", regfun, "` and S3 method `register.", s3class, "` not found.") 577 } 578 regfun 579} 580 581 582#' \code{getDoParHosts} is a generic function that returns the hostname of the worker nodes used by a backend. 583#' 584#' @export 585#' @rdname foreach 586#' @inline 587setGeneric('getDoParHosts', function(object, ...) standardGeneric('getDoParHosts')) 588setOldClass('foreach_backend') 589#' Default method that tries to heuristaically infer the number of hosts and in last 590#' resort temporarly register the backend and performs a foreach loop, to retrieve the 591#' nodename from each worker. 592setMethod('getDoParHosts', 'ANY', 593 function(object, ...){ 594 595 be <- if( missing(object) ) ForeachBackend(...) else ForeachBackend(object, ...) 596 if( existsMethod('getDoParHosts', class(be)[1L]) ) return( callGeneric(object) ) 597 598 # default behaviour 599 nodename <- setNames(Sys.info()['nodename'], NULL) 600 601 if( is.null(be) || is.null(be$data) ) return( NULL ) 602 # doSEQ 603 if( be$name == 'doSEQ' ) 604 return( nodename ) 605 if( isNumber(be$data) ) 606 return( rep(nodename, be$data) ) 607 if( length(be$data) && isNumber(be$data[[1]]) ) 608 return( rep(nodename, be$data[[1]]) ) 609 if( length(be$data) && be$name == 'doParallel' ) 610 return( sapply(be$data[[1L]], '[[', 'host') ) 611 612 if( !missing(object) ){ # backend passed: register temporarly 613 ob <- getDoBackend() 614 on.exit( setDoBackend(ob) ) 615 registerDoBackend(be) 616 } 617 setNames(unlist(times(getDoParWorkers()) %dopar% { Sys.info()['nodename'] }), NULL) 618 } 619) 620 621#' \code{getDoParNHosts} returns the number of hosts used by a backend. 622#' 623#' @export 624#' @rdname foreach 625getDoParNHosts <- function(object){ 626 if( missing(object) ) foreach::getDoParWorkers() 627 else{ 628 length(getDoParHosts(object)) 629 } 630} 631 632# add new option: limit.cores indicates if the number of cores used in parallel 633# computation can exceed the detected number of CPUs on the host. 634#.OPTIONS$newOptions(limit.cores=TRUE) 635 636#' Computational Setup Functions 637#' 638#' @description 639#' Functions used internally to setup the computational environment. 640#' 641#' \code{setupBackend} sets up a foreach backend given some specifications. 642#' 643#' @param spec target parallel specification: either \code{TRUE} or \code{FALSE}, 644#' or a single numeric value that specifies the number of cores to setup. 645#' @param backend value from argument \code{.pbackend} of \code{nmf}. 646#' @param optional a logical that indicates if the specification must be fully 647#' satisfied, throwing an error if it is not, or if one can switch back to 648#' sequential, only outputting a verbose message. 649#' @param verbose logical or integer level of verbosity for message outputs. 650#' 651#' @return Returns \code{FALSE} if no foreach backend is to be used, \code{NA} if the currently 652#' registered backend is to be used, or, if this function call registered a new backend, 653#' the previously registered backend as a \code{foreach} object, so that it can be restored 654#' after the computation is over. 655#' @keywords internals 656#' @rdname setup 657setupBackend <- function(spec, backend, optional=FALSE, verbose=FALSE){ 658 659 pbackend <- backend 660 str_backend <- quick_str(pbackend) 661 # early exit: FALSE specification or NA backend means not using foreach at all 662 if( isFALSE(spec) || is_NA(pbackend) ) return(FALSE) 663 # use doParallel with number of cores if specified in backend 664 if( is.numeric(pbackend) ){ 665 spec <- pbackend 666 pbackend <- 'PAR' 667 } 668 # identify doSEQ calls 669 doSEQ <- formatDoName(pbackend) == 'doSEQ' 670 671 # custom error function 672 pcomp <- is.numeric(spec) && !identical(spec[1], 1) 673 errorFun <- function(value=FALSE, stop=FALSE, level=1){ 674 function(e, ...){ 675 if( !is(e, 'error') ) e <- list(message=str_c(e, ...)) 676 677 pref <- if( pcomp ) "Parallel" else "Foreach" 678 if( !optional || stop ){ 679 if( verbose >= level ) message('ERROR') 680 stop(pref, " computation aborted: ", e$message, call.=FALSE) 681 }else if( verbose >= level ){ 682 message('NOTE') 683 message("# NOTE: ", pref, " computation disabled: ", e$message) 684 } 685 value 686 } 687 } 688 689 # check current backend if backend is NULL 690 if( is.null(pbackend) ){ 691 if( verbose > 1 ){ 692 message("# Using current backend ... ", appendLF=FALSE) 693 } 694 ok <- tryCatch({ 695 if( is.null(parname <- getDoParName()) ) 696 stop("argument '.pbackend' is NULL but there is no registered backend") 697 if( verbose > 1 ) message('OK [', parname, ']') 698 TRUE 699 }, error = errorFun()) 700 if( !ok ) return(FALSE) 701 # exit now since there is nothing to setup, nothing should change 702 # return NULL so that the backend is not restored on.exit of the parent call. 703 return(NA) 704 } 705 ## 706 707 # test if requested number of cores is actually available 708 NCORES <- getMaxCores(limit=FALSE) 709 if( verbose > 2 ) message("# Check available cores ... [", NCORES, ']') 710 if( verbose > 2 ) message("# Check requested cores ... ", appendLF=FALSE) 711 ncores <- if( doSEQ ) 1L 712 else{ 713 ncores <- tryCatch({ 714 if( is.numeric(spec) ){ 715 if( length(spec) == 0L ) 716 stop("no number of cores specified for backend '", str_backend, "'") 717 spec <- spec[1] 718 if( spec <= 0L ) 719 stop("invalid negative number of cores [", spec, "] specified for backend '", str_backend, "'") 720 spec 721 }else # by default use the 'cores' option or half the number of cores 722 getMaxCores() #getOption('cores', ceiling(NCORES/2)) 723 }, error = errorFun(stop=TRUE)) 724 if( isFALSE(ncores) ) return(FALSE) 725 ncores 726 } 727 if( verbose > 2 ) message('[', ncores, ']') 728 729 # create backend object 730 if( verbose > 2 ) message("# Loading backend for specification `", str_backend, "` ... ", appendLF=FALSE) 731 newBackend <- tryCatch({ 732 # NB: limit to the number of cores available on the host 733 if( !doSEQ ) ForeachBackend(pbackend, min(ncores, NCORES)) 734 else ForeachBackend(pbackend) 735 }, error = errorFun(level=3)) 736 if( isFALSE(newBackend) ) return(FALSE) 737 if( verbose > 2 ) message('OK') 738 739 if( verbose > 1 ) message("# Check host compatibility ... ", appendLF=FALSE) 740 ok <- tryCatch({ 741 # check if we're not running on MAC from GUI 742 if( is.Mac(check.gui=TRUE) && (newBackend$name == 'doMC' || (newBackend$name == 'doParallel' && is.numeric(newBackend$data[[1]]))) ){ 743 # error only if the parallel computation was explicitly asked by the user 744 stop("multicore parallel computations are not safe from R.app on Mac OS X." 745 , "\n -> Use a terminal session, starting R from the command line.") 746 } 747 TRUE 748 }, error = errorFun()) 749 if( !ok ) return(FALSE) 750 if( verbose > 1 ) message('OK') 751 752 if( verbose > 1 ) message("# Registering backend `", newBackend$name, "` ... ", appendLF=FALSE) 753 # try registering the backend 754 oldBackend <- getDoBackend() 755 # setup retoration of backend in case of an error 756 # NB: the new backend cleanup will happens only 757 # if regsitration succeeds, since the cleanup routine is 758 # setup after the registration by the suitable register S3 method. 759 on.exit( setDoBackend(oldBackend, cleanup=TRUE) ) 760 761 ov <- lverbose(verbose) 762 ok <- tryCatch({ 763 registerDoBackend(newBackend) 764 TRUE 765 } 766 , error ={ 767 lverbose(ov) 768 errorFun() 769 }) 770 lverbose(ov) 771 if( !ok ) return(FALSE) 772 if( verbose > 1 ) message('OK') 773 774 # check allocated cores if not doSEQ backend 775 if( newBackend$name != 'doSEQ' ){ 776 # test allocated number of cores 777 if( verbose > 2 ) message("# Check allocated cores ... ", appendLF=FALSE) 778 wcores <- getDoParWorkers() 779 if( ncores > 0L && wcores < ncores ){ 780 if( !optional ){ 781 errorFun(level=3)("only ", wcores, " core(s) available [requested ", ncores ," core(s)]") 782 }else if( verbose > 2 ){ 783 message('NOTE [', wcores, '/', ncores, ']') 784 message("# NOTE: using only ", wcores, 785 " core(s) [requested ", ncores ," core(s)]") 786 } 787 } 788 else if( verbose > 2 ){ 789 message('OK [', wcores, '/', ncores 790 , if(ncores != NCORES ) str_c(' out of ', NCORES) 791 , ']') 792 } 793 } 794 795 # cancel backend restoration 796 on.exit() 797 # return old backend 798 oldBackend 799} 800 801 802# add extra package bigmemory and synchronicity on Unix platforms 803if( .Platform$OS.type != 'windows' ){ 804 setPackageExtra('install.packages', 'bigmemory', pkgs='bigmemory') 805 setPackageExtra('install.packages', 'synchronicity', pkgs='synchronicity') 806} 807# add new option: shared.memory that indicates if one should try using shared memory 808# to speed-up parallel computations. 809.OPTIONS$newOptions(shared.memory = (.Platform$OS.type != 'windows' && !is.Mac())) 810 811 812#' \code{setupSharedMemory} checks if one can use the packages \emph{bigmemory} and \emph{sychronicity} 813#' to speed-up parallel computations when not keeping all the fits. 814#' When both these packages are available, only one result per host is written on disk, 815#' with its achieved deviance stored in shared memory, that is accessible to all cores on 816#' a same host. 817#' It returns \code{TRUE} if both packages are available and NMF option \code{'shared'} is 818#' toggled on. 819#' 820#' @rdname setup 821setupSharedMemory <- function(verbose){ 822 823 if( verbose > 1 ) message("# Check shared memory capability ... ", appendLF=FALSE) 824 # early exit if option shared is off 825 if( !nmf.getOption('shared.memory') ){ 826 if( verbose > 1 ) message('SKIP [disabled]') 827 return(FALSE) 828 } 829 # early exit if foreach backend is doMPI: it is not working, not sure why 830 if( isMPIBackend() ){ 831 if( verbose > 1 ) message('SKIP [MPI cluster]') 832 return(FALSE) 833 } 834 # not on Windows 835 if( .Platform$OS.type == 'windows' ){ 836 if( verbose > 1 ) message('SKIP [Windows OS]') 837 return(FALSE) 838 } 839 840 if( !require.quiet('bigmemory', character.only=TRUE) ){ 841 if( verbose > 1 ){ 842 message('NO', if( verbose > 2 ) ' [Package `bigmemory` required]') 843 } 844 return(FALSE) 845 } 846 if( !require.quiet('synchronicity', character.only=TRUE) ){ 847 if( verbose > 1 ){ 848 message('NO', if( verbose > 2 ) ' [Package `synchronicity` required]') 849 } 850 return(FALSE) 851 } 852 if( verbose > 1 ) message('YES', if( verbose > 2 ) ' [synchronicity]') 853 TRUE 854} 855 856is.doSEQ <- function(){ 857 dn <- getDoParName() 858 is.null(dn) || dn == 'doSEQ' 859} 860 861#' \code{setupTempDirectory} creates a temporary directory to store the best fits computed on each host. 862#' It ensures each worker process has access to it. 863#' 864#' @rdname setup 865setupTempDirectory <- function(verbose){ 866 867 # - Create a temporary directory to store the best fits computed on each host 868 NMF_TMPDIR <- tempfile('NMF_', getwd()) 869 if( verbose > 2 ) message("# Setup temporary directory: '", NMF_TMPDIR, "' ... ", appendLF=FALSE) 870 dir.create(NMF_TMPDIR) 871 if( !is.dir(NMF_TMPDIR) ){ 872 if( verbose > 2 ) message('ERROR') 873 nmf_stop('nmf', "could not create temporary result directory '", NMF_TMPDIR, "'") 874 } 875 876 on.exit( unlink(NMF_TMPDIR, recursive=TRUE) ) 877 # ensure that all workers can see the temporary directory 878 wd <- times(getDoParWorkers()) %dopar% { 879 if( !file_test('-d', NMF_TMPDIR) ) 880 dir.create(NMF_TMPDIR, recursive=TRUE) 881 file_test('-d', NMF_TMPDIR) 882 } 883 # check it worked 884 if( any(!wd) ){ 885 if( verbose > 2 ) message('ERROR') 886 nmf_stop('nmf', "could not create/see temporary result directory '", NMF_TMPDIR, "' on worker nodes ", str_out(which(!wd), Inf)) 887 } 888 if( verbose > 2 ) message('OK') 889 on.exit() 890 NMF_TMPDIR 891} 892 893#' Utilities for Parallel Computations 894#' 895#' 896#' @rdname parallel 897#' @name parallel-NMF 898NULL 899 900#' \code{ts_eval} generates a thread safe version of \code{\link{eval}}. 901#' It uses boost mutexes provided by the \code{\link[synchronicity]{synchronicity}} 902#' package. 903#' The generated function has arguments \code{expr} and \code{envir}, which are passed 904#' to \code{\link{eval}}. 905#' 906#' @param mutex a mutex or a mutex descriptor. 907#' If missing, a new mutex is created via the function \code{\link[synchronicity]{boost.mutex}}. 908#' @param verbose a logical that indicates if messages should be printed when 909#' locking and unlocking the mutex. 910#' 911#' @rdname parallel 912#' @export 913ts_eval <- function(mutex = synchronicity::boost.mutex(), verbose=FALSE){ 914 915 916 requireNamespace('bigmemory') 917 #library(bigmemory) 918 requireNamespace('synchronicity') 919 #library(synchronicity) 920 # describe mutex if necessary 921 .MUTEX_DESC <- 922 if( is(mutex, 'boost.mutex') ) synchronicity::describe(mutex) 923 else mutex 924 925 loadpkg <- TRUE 926 function(expr, envir=parent.frame()){ 927 928 # load packages once 929 if( loadpkg ){ 930 requireNamespace('bigmemory') 931 #library(bigmemory) 932 requireNamespace('synchronicity') 933 #library(synchronicity) 934 loadpkg <<- FALSE 935 } 936 MUTEX <- synchronicity::attach.mutex(.MUTEX_DESC) 937 synchronicity::lock(MUTEX) 938 if( verbose ) 939 message('#', Sys.getpid(), " - START mutex: ", .MUTEX_DESC@description$shared.name) 940 ERROR <- "### <Error in mutex expression> ###\n" 941 on.exit({ 942 if( verbose ){ 943 message(ERROR, '#', Sys.getpid(), " - END mutex: ", .MUTEX_DESC@description$shared.name) 944 } 945 synchronicity::unlock(MUTEX) 946 }) 947 948 eval(expr, envir=envir) 949 950 ERROR <- NULL 951 } 952} 953 954#' \code{ts_tempfile} generates a \emph{unique} temporary filename 955#' that includes the name of the host machine and/or the caller's process id, 956#' so that it is thread safe. 957#' 958#' @inheritParams base::tempfile 959#' @param ... extra arguments passed to \code{\link[base]{tempfile}}. 960#' @param host logical that indicates if the host machine name should 961#' be appear in the filename. 962#' @param pid logical that indicates if the current process id 963#' be appear in the filename. 964#' 965#' @rdname parallel 966#' @export 967ts_tempfile <- function(pattern = "file", ..., host=TRUE, pid=TRUE){ 968 if( host ) pattern <- c(pattern, Sys.info()['nodename']) 969 if( pid ) pattern <- c(pattern, Sys.getpid()) 970 tempfile(paste(pattern, collapse='_'), ...) 971} 972 973#' \code{hostfile} generates a temporary filename composed with 974#' the name of the host machine and/or the current process id. 975#' 976#' @inheritParams base::tempfile 977#' @inheritParams ts_tempfile 978#' 979#' @rdname parallel 980#' @export 981hostfile <- function(pattern = "file", tmpdir=tempdir(), fileext='', host=TRUE, pid=TRUE){ 982 if( host ) pattern <- c(pattern, Sys.info()['nodename']) 983 if( pid ) pattern <- c(pattern, Sys.getpid()) 984 file.path(tmpdir, str_c(paste(pattern, collapse='.'), fileext)) 985} 986 987#' \code{gVariable} generates a function that access a global static variable, 988#' possibly in shared memory (only for numeric matrix-coercible data in this case). 989#' It is used primarily in parallel computations, to preserve data accross 990#' computations that are performed by the same process. 991#' 992#' @param init initial value 993#' @param shared a logical that indicates if the variable should be stored in shared 994#' memory or in a local environment. 995#' 996#' @rdname parallel 997#' @export 998gVariable <- function(init, shared=FALSE){ 999 1000 if( shared ){ # use bigmemory shared matrices 1001 if( !is.matrix(init) ) 1002 init <- as.matrix(init) 1003 requireNamespace('bigmemory') 1004 #library(bigmemory) 1005 DATA <- bigmemory::as.big.matrix(init, type='double', shared=TRUE) 1006 DATA_DESC <- bigmemory::describe(DATA) 1007 }else{ # use variables assigned to .GlobalEnv 1008 DATA_DESC <- basename(tempfile('.gVariable_')) 1009 } 1010 1011 .VALUE <- NULL 1012 .loadpkg <- TRUE 1013 function(value){ 1014 1015 # load packages once 1016 if( shared && .loadpkg ){ 1017 requireNamespace('bigmemory') 1018 #library(bigmemory) 1019 .loadpkg <<- FALSE 1020 } 1021 1022 # if shared: attach bigmemory matrix from its descriptor object 1023 if( shared ){ 1024 DATA <- bigmemory::attach.big.matrix(DATA_DESC) 1025 } 1026 1027 if( missing(value) ){# READ ACCESS 1028 if( !shared ){ 1029 # initialise on first call if necessary 1030 if( is.null(.VALUE) ) .VALUE <<- init 1031 # return variable 1032 .VALUE 1033 }else 1034 DATA[] 1035 1036 }else{# WRITE ACCESS 1037 if( !shared ) .VALUE <<- value 1038 else DATA[] <- value 1039 1040 } 1041 } 1042} 1043 1044#' \code{setupLibPaths} add the path to the NMF package to each workers' libPaths. 1045#' 1046#' @param pkg package name whose path should be exported the workers. 1047#' 1048#' @rdname setup 1049setupLibPaths <- function(pkg='NMF', verbose=FALSE){ 1050 1051 # do nothing in sequential mode 1052 if( is.doSEQ() ) return( character() ) 1053 1054 if( verbose ){ 1055 message("# Setting up libpath on workers for package(s) " 1056 , str_out(pkg, Inf), ' ... ', appendLF=FALSE) 1057 } 1058 p <- path.package(pkg) 1059 if( is.null(p) ) return() 1060 1061 if( !isDevNamespace(pkg) ){ # not a dev package 1062 plibs <- dirname(p) 1063 libs <- times(getDoParWorkers()) %dopar% { 1064 .libPaths(c(.libPaths(), plibs)) 1065 } 1066 libs <- unique(unlist(libs)) 1067 if( verbose ){ 1068 message("OK\n# libPaths:\n", paste(' ', libs, collapse="\n")) 1069 } 1070 libs 1071 pkg 1072 }else if( getDoParName() != 'doParallel' || !isNumber(getDoBackend()$data) ){ 1073 # devmode: load the package + depends 1074 if( verbose ){ message("[devtools::load_all] ", appendLF=FALSE) } 1075 times(getDoParWorkers()) %dopar% { 1076 capture.output({ 1077 suppressMessages({ 1078 requireNamespace('devtools') 1079 #library(devtools) 1080 requireNamespace('bigmemory') 1081 #library(bigmemory) 1082 devtools::load_all(p) 1083 }) 1084 }) 1085 } 1086 if( verbose ){ message("OK") } 1087 c('bigmemory', 'rngtools') 1088 } 1089 else if( verbose ){ 1090 message("OK") 1091 } 1092} 1093 1094#StaticWorkspace <- function(..., .SHARED=FALSE){ 1095# 1096# # create environment 1097# e <- new.env(parent=.GlobalEnv) 1098# # fill with initial data 1099# vars <- list(...) 1100# if( .SHARED ){ 1101# lapply(names(vars), function(x){ 1102# bm <- bigmemory::as.big.matrix(vars[[x]], type='double', shared=TRUE) 1103# e[[x]] <- bigmemory::describe(bm) 1104# }) 1105# }else 1106# list2env(vars, envir=e) 1107# 1108# structure(e, shared=.SHARED, class=c("static_wsp", 'environment')) 1109#} 1110# 1111#`[[.static_wsp` <- function(x, ..., exact = TRUE){ 1112# if( attr(x, 'shared') ){ 1113# var <- bigmemory::attach.big.matrix(NextMethod()) 1114# var[] 1115# }else 1116# NextMethod() 1117#} 1118# 1119#`[[.static_wsp<-` <- function(x, i, value){ 1120# 1121# if( attr(x, 'shared') ){ 1122# var <- bigmemory::attach.big.matrix(x[[i]]) 1123# var[] <- value 1124# }else 1125# x[[i]] <- value 1126# x 1127#} 1128 1129 1130isRNGseed <- function(x){ 1131 is.numeric(x) || 1132 ( is.list(x) 1133 && is.null(names(x)) 1134 && all(sapply(x, is.numeric)) ) 1135} 1136 1137#' \code{setupRNG} sets the RNG for use by the function nmf. 1138#' It returns the old RNG as an rstream object or the result of set.seed 1139#' if the RNG is not changed due to one of the following reason: 1140#' - the settings are not compatible with rstream 1141#' 1142#' @param seed initial RNG seed specification 1143#' @param n number of RNG seeds to generate 1144#' 1145#' @rdname setup 1146setupRNG <- function(seed, n, verbose=FALSE){ 1147 1148 if( verbose == 2 ){ 1149 message("# Setting up RNG ... ", appendLF=FALSE) 1150 on.exit( if( verbose == 2 ) message("OK") ) 1151 }else if( verbose > 2 ) message("# Setting up RNG ... ") 1152 1153 if( verbose > 3 ){ 1154 message("# ** Original RNG settings:") 1155 showRNG() 1156 } 1157 1158 # for multiple runs one always uses RNGstreams 1159 if( n > 1 ){ 1160 1161 # seeding with numeric values only 1162 if( is.list(seed) && isRNGseed(seed) ){ 1163 if( length(seed) != n ) 1164 stop("Invalid list of RNG seeds: must be of length ", n) 1165 1166 if( verbose > 2 ) message("# Using supplied list of RNG seeds") 1167 return(seed) 1168 1169 }else if( is.numeric(seed) ){ 1170 1171 if( verbose > 2 ){ 1172 message("# Generate RNGStream sequence using seed (" 1173 , RNGstr(seed), ") ... " 1174 , appendLF=FALSE) 1175 } 1176 res <- RNGseq(n, seed) 1177 if( verbose > 2 ) message("OK") 1178 return(res) 1179 1180 }else{ # create a sequence of RNGstream using a random seed 1181 if( verbose > 2 ){ 1182 message("# Generate RNGStream sequence using a random seed ... " 1183 , appendLF=FALSE) 1184 } 1185 res <- RNGseq(n, NULL) 1186 if( verbose > 2 ) message("OK") 1187 return(res) 1188 } 1189 }else if( is.numeric(seed) ){ 1190 # for single runs: 1-length seeds are used to set the current RNG 1191 # 6-length seeds are used to set RNGstream 1192 1193 if( !is.vector(seed) ){ 1194 message('ERROR') 1195 stop("NMF::nmf - Invalid numeric seed: expects a numeric vector.") 1196 } 1197 1198 # convert to an integer vector 1199 seed <- as.integer(seed) 1200 # immediately setup the RNG in the standard way 1201 if( length(seed) == 1L ){ 1202 if( verbose > 2 ){ 1203 message("# RNG setup: standard [seeding current RNG]") 1204 message("# Seeding current RNG with seed (", seed, ") ... " 1205 , appendLF=FALSE) 1206 } 1207 set.seed(seed) 1208 if( verbose > 2 ) message("OK") 1209 return( getRNG() ) 1210 }else if( length(seed) == 6L ){ 1211 if( verbose > 2 ){ 1212 message("# RNG setup: reproducible [using RNGstream]") 1213 message("# Generate RNGStream sequence using seed (" 1214 , RNGstr(seed), ") ... " 1215 , appendLF=FALSE) 1216 } 1217 res <- RNGseq(1, seed) 1218 setRNG(res) 1219 if( verbose > 2 ) message("OK") 1220 return( res ) 1221 }else{ 1222 if( verbose > 2 ){ 1223 message("# RNG setup: directly setting RNG") 1224 message("# Setting RNG with .Random.seed= (" 1225 , RNGstr(seed), ") ... " 1226 , appendLF=FALSE) 1227 } 1228 setRNG(seed, verbose > 2) 1229 if( verbose > 2 ) message("OK") 1230 return( getRNG() ) 1231 } 1232 stop("NMF::nmf - Invalid numeric seed: unexpected error.") 1233 }else{ 1234 if( verbose > 2 ) message("# RNG setup: standard [using current RNG]") 1235 NULL 1236 } 1237} 1238 1239################################################################## 1240## END 1241################################################################## 1242