1# Definitions used in the parallel computations of NMF
2#
3# - reproducible backend
4# - reproducible %dopar% operator: %dorng%
5#
6# Author: Renaud Gaujoux
7# Creation: 08-Feb-2011
8###############################################################################
9
10#' @include utils.R
11#' @import foreach
12#' @import doParallel
13NULL
14
15# returns the number of cores to use in all NMF computation when no number is
16# specified by the user
17getMaxCores <- function(limit=TRUE){
18	#ceiling(parallel::detectCores()/2)
19	nt <- n <- parallel::detectCores()
20	# limit to number of cores specified in options if asked for
21	if( limit ){
22		if( !is.null(nc <- getOption('cores')) ) n <- nc # global option
23		else if( !is.null(nc <- nmf.getOption('cores')) ) n <- nc # NMF-specific option
24		else if( n > 2 ) n <- n - 1L # leave one core free if possible
25	}
26	# forces limiting maximum number of cores to 2 during CRAN checks
27	if( n > 2 && isCHECK() ){
28		message("# NOTE - CRAN check detected: limiting maximum number of cores [2/", nt, "]")
29		n <- 2L
30	}
31	n
32}
33
34#' Utilities and Extensions for Foreach Loops
35#'
36#' \code{registerDoBackend} is a unified register function for foreach backends.
37#'
38#' @param object specification of a foreach backend, e.g. \sQuote{SEQ},
39#' \sQuote{PAR} (for doParallel), \sQuote{MPI}, etc\ldots
40#' @param ... extra arguments passed to the backend own registration function.
41#'
42#' @keywords internal
43#' @rdname foreach
44registerDoBackend <- function(object, ...){
45
46	# restore old backend data in case of an error
47	old <- getDoBackend()
48	on.exit( setDoBackend(old) )
49
50	# get old foreach backend object
51	ob <- ForeachBackend()
52
53	# register new backend: call the register method
54	b <- ForeachBackend(object, ...)
55	res <- register(b)
56
57	# cancel backend restoration
58	on.exit()
59	# call old backend cleanup method
60	doBackendCleanup(ob)
61
62	# return old backend
63	invisible(ob)
64}
65
66#' \code{getDoBackend} returns the internal data of the currently registered foreach \%dopar\% backend.
67#' @rdname foreach
68#' @export
69getDoBackend <- function(){
70    fe_ns <- asNamespace('foreach')
71	fe <- ns_get('.foreachGlobals', fe_ns)
72	if( !exists("fun", where = fe, inherits = FALSE) )
73		return(NULL)
74
75    getDoPar <- ns_get('getDoPar', fe_ns)
76	c(getDoPar() # this returns the registered %dopar% function + associated data
77		# -> add info function from foreach internal environment
78		, info= if( exists("info", where = fe, inherits = FALSE) ){
79					get('info', fe, inherits=FALSE)
80				}else{
81					function(data, item) NULL
82				}
83		, cleanup = if( exists("cleanup", where = fe, inherits = FALSE) ){
84			get('cleanup', fe, inherits=FALSE)
85		}
86	)
87}
88
89getDoBackendInfo <- function(x, item){
90    if( is.function(x$info) ) x$info(x$data, item)
91}
92getDoBackendName <- function(x){
93    getDoBackendInfo(x, 'name')
94}
95
96#' \code{setDoBackend} is identical to \code{\link[foreach]{setDoPar}}, but
97#' returns the internal of the previously registered backend.
98#'
99#' @param data internal data of a foreach \%dopar\% backend.
100#' @param cleanup logical that indicates if the previous
101#' backend's cleanup procedure should be run, \strong{before}
102#' setting the new backend.
103#'
104#' @export
105#' @rdname foreach
106setDoBackend <- function(data, cleanup=FALSE){
107
108	# get old backend data
109	ob <- getDoBackend()
110	ofb <- ForeachBackend()
111	# cleanup old backend if requested
112	if( cleanup ){
113		doBackendCleanup(ofb)
114	}
115
116	if( !is.null(data) ){
117		bdata <- data
118		if( is.backend(data) ) data <- data[!names(data) %in% c('name', 'cleanup')]
119		do.call('setDoPar', data)
120		setBackendCleanup(bdata)
121	}else{
122		do.call('setDoPar', list(NULL))
123		fe <- ns_get('.foreachGlobals', 'foreach')
124		if (exists("fun", envir = fe, inherits = FALSE))
125			remove("fun", envir = fe)
126		setBackendCleanup(NULL)
127	}
128	# return old backend
129	invisible(ob)
130}
131
132# setup cleanup procedure for the current backend
133setBackendCleanup <- function(object, fun, verbose=FALSE){
134
135	fe <- ns_get('.foreachGlobals', 'foreach')
136	name <- getDoParName()
137	if( !is.null(fun <- object$cleanup) ){
138		if( verbose ) message("# Registering cleaning up function for '", name, "'... ", appendLF=FALSE)
139		assign('cleanup', fun, fe)
140		if( verbose ) message("OK")
141	}else if (exists("cleanup", envir = fe, inherits = FALSE)){
142		if( verbose ) message("# Removing cleaning up function for '", name, "'... ", appendLF=FALSE)
143		remove("cleanup", envir = fe)
144		if( verbose ) message("OK")
145	}
146	invisible(object)
147}
148
149# run cleanup procedure for a given backend object
150doBackendCleanup <- function(object, ..., run=TRUE, verbose=FALSE){
151
152	name <- object$name
153	if( !is.null(fun <- object$cleanup) ){
154		if( verbose ) message("# Cleaning up '", name, "'... ", appendLF=FALSE)
155		res <- try(fun(), silent=TRUE)
156		if( verbose ) message(if( is(res, 'try-error') ) 'ERROR' else 'OK')
157		if( isTRUE(res) ) object$cleanup <- NULL
158		if( verbose ) message('OK', if( !is.null(res) ) str_c(' [', res,']'))
159	}
160	invisible(object)
161}
162
163#' \code{register} is a generic function that register objects.
164#' It is used to as a unified interface to register foreach backends.
165#'
166#' @param x specification of a foreach backend
167#'
168#' @rdname foreach
169#' @export
170register <- function(x, ...){
171	UseMethod('register', x)
172}
173#' @export
174register.foreach_backend <- function(x, ...){
175
176	be <- x$name
177	# For everything except doSEQ:
178	# require definition package (it is safer to re-check)
179	if( be != 'doSEQ' ){
180		if( !require.quiet(be, character.only=TRUE) )
181			stop("Package '", be, "' is required to use foreach backend '", be, "'")
182	}
183
184	regfun <- .foreach_regfun(x$name)
185	res <-
186	if( length(formals(regfun)) > 0L ) do.call(regfun, c(x$data, ...))
187	else regfun()
188	# throw an error if not successful (foreach::setDoPar do not throw errors!!)
189	if( is(res, 'simpleError') ) stop(res)
190	# set cleanup procedure if any
191	setBackendCleanup(x)
192	# return result
193	invisible(res)
194}
195
196#' \code{ForeachBackend} is a factory method for foreach backend objects.
197#'
198#' @export
199#' @inline
200#' @rdname foreach
201setGeneric('ForeachBackend', function(object, ...) standardGeneric('ForeachBackend'))
202#' Default method defined to throw an informative error message, when no other
203#' method was found.
204setMethod('ForeachBackend', 'ANY',
205	function(object, ...){
206		if( is.backend(object) ){
207			# update arg list if necessary
208			if( nargs() > 1L )	object$data <- list(...)
209			object
210		}else if( is(object, 'cluster') )
211			selectMethod('ForeachBackend', 'cluster')(object, ...)
212		else
213			stop("Could not create foreach backend object with a specification of class '", class(object)[1L], "'")
214	}
215)
216
217formatDoName <- function(x){
218
219	# numeric values are resolved as doParallel
220	if( is.numeric(x) ) x <- 'PAR'
221	if( is.character(x) ){
222		# use upper case if not already specified as 'do*'
223		if( !grepl("^do", x) ){
224			x <- toupper(x)
225			# special treatment for doParallel
226			if( x %in% c('PAR', 'PARALLEL') ) x <- 'Parallel'
227		}
228		# stick prefix 'do' (removing leading 'do' if necessary)
229		str_c('do', sub('^do', '', x))
230	}else
231		''
232}
233#' Creates a foreach backend object based on its name.
234setMethod('ForeachBackend', 'character',
235	function(object, ...){
236
237		object <- formatDoName(object)
238
239		# build S3 class name
240		s3class <- str_c(object, "_backend")
241
242		# create empty S3 object
243		obj <- structure(list(name=object, data=list(...))
244						, class=c(s3class, 'foreach_backend'))
245
246		# give a chance to a backend-specific ForeachBackend factory method
247		# => this will generally fill the object with the elements suitable
248		# to be used in a call to foreach::setDoPar: fun, data, info
249		# and possibly change the name or the object class, e.g. to allow
250		# subsequent argument-dependent dispatch.
251		obj <- ForeachBackend(obj, ...)
252
253		# check the registration routine is available
254		.foreach_regfun(obj$name)
255
256		# set data slot if not already set by the backend-specific method
257		if( is.null(obj$data) || (length(obj$data) == 0L && nargs()>1L) )
258			obj$data <- list(...)
259
260		# return object
261		obj
262	}
263)
264#' Creates a foreach backend object for the currently registered backend.
265setMethod('ForeachBackend', 'missing',
266	function(object, ...){
267		be <- getDoParName()
268		data <- getDoBackend()
269		bdata <- data$data
270		res <- if( !is.null(bdata) ) do.call(ForeachBackend, c(list(be, bdata), ...))
271		else ForeachBackend(be, ...)
272		if( !is.null(data$cleanup) ) res$cleanup <- data$cleanup
273		res
274	}
275)
276#' Dummy method that returns \code{NULL}, defined for correct dispatch.
277setMethod('ForeachBackend', 'NULL', function(object, ...){ NULL })
278
279setOldClass('cluster')
280#' Creates a doParallel foreach backend that uses the cluster described in
281#' \code{object}.
282setMethod('ForeachBackend', 'cluster',
283	function(object, ...){
284		ForeachBackend('doParallel', cl=object)
285	}
286)
287#' Creates a doParallel foreach backend with \code{object} processes.
288setMethod('ForeachBackend', 'numeric',
289	function(object, ...){
290		# check numeric specification
291		if( length(object) == 0L )
292			stop("invalid number of cores specified as a backend [empty]")
293		object <- object[1]
294		if( object <= 0 )
295			stop("invalid negative number of cores [", object, "] specified for backend 'doParallel'")
296
297		ForeachBackend('doParallel', cl=object, ...)
298	}
299)
300###############
301# doParallel
302###############
303setOldClass('doParallel_backend')
304#' doParallel-specific backend factory
305#'
306#' @param cl cluster specification: a cluster object or a numeric that indicates the
307#' number of nodes to use.
308#' @param type type of cluster, See \code{\link[parallel]{makeCluster}}.
309setMethod('ForeachBackend', 'doParallel_backend',
310	function(object, cl, type=NULL){
311
312		# set type of cluster if explicitly provided
313		if( !is.null(type) ) object$data$type <- type
314
315        # required registration data
316		# NB: a function doParallel:::doParallel should exist and do the same
317		# thing as parallel::registerDoParallel without registering the backend
318		#object$fun <- doParallel:::doParallel
319#		object$info <- doParallel:::info
320        # doParallel:::info has been removed from doParallel since version 1.0.7
321		# Reported in Issue #7
322        object$info <- getDoParallelInfo(object)
323
324		# return object
325		object
326	}
327)
328
329setOldClass('doParallelMC_backend')
330#' doParallel-specific backend factory for multicore (fork) clusters
331#'
332#' This method is needed since version 1.0.7 of \pkg{doParallel}, which removed
333#' internal function \code{info} and defined separate backend names for mc and snow clusters.
334setMethod('ForeachBackend', 'doParallelMC_backend',
335    function(object, ...){
336
337        object$info <- getDoParallelInfo('mc')
338        object$name <- 'doParallel'
339        # return object
340        object
341    }
342)
343
344setOldClass('doParallelSNOW_backend')
345#' doParallel-specific backend factory for SNOW clusters.
346#'
347#' This method is needed since version 1.0.7 of \pkg{doParallel}, which removed
348#' internal function \code{info} and defined separate backend names for mc and snow clusters.
349setMethod('ForeachBackend', 'doParallelSNOW_backend',
350    function(object, ...){
351
352        object$info <- getDoParallelInfo('snow')
353        object$name <- 'doParallel'
354        # return object
355        object
356    }
357)
358
359getDoParallelType <- function(x){
360
361
362    cl <- x$data[['cl']]
363    if( is.null(cl) && length(x$data) && (is.null(names(x$data)) || names(x$data)[[1L]] == '') )
364        cl <- x$data[[1L]]
365    if ( is.null(cl) || is.numeric(cl) ) {
366        if (.Platform$OS.type == "windows" || (!is.null(x$data$type) && !identical(x$data$type, 'FORK')) ) 'snow'
367        else 'mc'
368    }
369    else 'snow'
370
371}
372
373getDoParallelInfo <- function(x, ...){
374    t <- if( isString(x) ) x else getDoParallelType(x, ...)
375#    str(t)
376    ns <- asNamespace('doParallel')
377    if( t == 'mc' ) get('mcinfo', ns)
378    else get('snowinfo', ns)
379}
380
381######################################################
382# doPSOCK
383# Default snow-like cluster from parallel on Windows
384# but works on Unix as well
385######################################################
386
387setOldClass('doPSOCK_backend')
388#' doSNOW-specific backend factory
389setMethod('ForeachBackend', 'doPSOCK_backend',
390		function(object, cl){
391
392			# use all available cores if not otherwise specified
393			if( missing(cl) ) cl <- getMaxCores()
394
395			# return equivalent doParallel object
396			ForeachBackend('doParallel', cl, type='PSOCK')
397		}
398)
399
400.cl_cleanup <- function(gvar, envir=.GlobalEnv){
401	if( !exists(gvar, envir = envir) ) return()
402	cl <- get(gvar, envir = envir)
403	try( parallel::stopCluster(cl), silent=TRUE)
404	rm(list=gvar, envir = envir)
405	TRUE
406}
407
408cleanupCluster <- function(x, cl, stopFun=NULL){
409
410	function(){
411
412		if( is(x, 'doParallel_backend') ){
413
414            # On non-Windows machines registerDoParallel(numeric) will use
415            # parallel::mclapply with `object` cores (no cleanup required).
416            # On Windows doParallel::registerDoParallel(numeric) will create a
417            # SOCKcluster with `object` cores.
418            # => Windows needs a cleanup function that will stop the cluster
419            # when another backend is registered.
420            # Fortunately doParallel::registerDoParallel assign the cluster object
421            # to the global variable `.revoDoParCluster`
422            if( .Platform$OS.type == "windows" ){
423				.cl_cleanup(".revoDoParCluster")
424			}
425		}
426
427		if( is.null(stopFun) ) stopFun <- parallel::stopCluster
428		# stop cluster
429		stopFun(cl)
430		TRUE
431	}
432}
433
434#' @export
435register.doParallel_backend <- function(x, ...){
436
437	# start cluster if numeric specification and type is defined
438	cl <- x$data[[1]]
439  if( is.numeric(cl) && (.Platform$OS.type == 'windows' || !is.null(x$data$type)) ){
440		names(x$data)[1L] <- 'spec'
441		# start cluster
442		clObj <- do.call(parallel::makeCluster, x$data)
443		x$data <- list(clObj)
444		# setup cleanup procedure
445		x$cleanup <- cleanupCluster(x, clObj)
446	}
447	# register
448	register.foreach_backend(x, ...)
449}
450
451###############
452# doMPI
453###############
454
455isMPIBackend <- function(x, ...){
456	b <- if( missing(x) ) ForeachBackend(...) else ForeachBackend(object=x, ...)
457	if( is.null(b) ) FALSE
458	else if( identical(b$name, 'doMPI') ) TRUE
459	else if( length(b$data) ){
460		is(b$data[[1]], 'MPIcluster') || is(b$data[[1]], 'mpicluster')
461	}else FALSE
462}
463
464#' @export
465register.doMPI_backend <- function(x, ...){
466
467	if( length(x$data) && isNumber(cl <- x$data[[1]]) ){
468		clObj <- doMPI::startMPIcluster(cl)
469		x$data[[1]] <- clObj
470		# setup cleanup procedure
471		x$cleanup <- cleanupCluster(x, clObj, doMPI::closeCluster)
472	}
473	# register
474	register.foreach_backend(x, ...)
475}
476
477setOldClass('mpicluster')
478#' Creates a doMPI foreach backend that uses the MPI cluster described in
479#' \code{object}.
480setMethod('ForeachBackend', 'mpicluster',
481	function(object, ...){
482		ForeachBackend('doMPI', cl=object)
483	}
484)
485
486setOldClass('doMPI_backend')
487#' doMPI-specific backend factory
488setMethod('ForeachBackend', 'doMPI_backend',
489	function(object, cl){
490
491		# use all available cores if not otherwise specified
492		if( missing(cl) ) cl <- getMaxCores()
493
494		# required registration data
495		object$fun <- doMPI:::doMPI
496		object$info <- doMPI:::info
497
498		# return object
499		object
500	}
501)
502
503#as.foreach_backend <- function(x, ...){
504#
505#	args <- list(...)
506#	if( is.backend(x) ){
507#		# update arg list if necessary
508#		if( length(args) > 0L )	x$args <- args
509#		return(x)
510#	}
511#
512#	be <-
513#	if( is.null(x) ){
514#		getDoParName()
515#	} else if( is(x, 'cluster') || is.numeric(x) ){
516#		# check numeric specification
517#		if( is.numeric(x) ){
518#			if( length(x) == 0L )
519#				stop("invalid number of cores specified as a backend [empty]")
520#			x <- x[1]
521#			if( x <= 0 )
522#				stop("invalid negative number of cores [", x, "] specified for backend 'doParallel'")
523#		}
524#
525#		args$spec <- x
526#		'Parallel'
527#	} else if( is(x, 'mpicluster') ){
528#		args$spec <- x
529#		'MPI'
530#	} else if( is.character(x) ){
531#		toupper(x)
532#	} else
533#		stop("invalid backend specification: must be NULL, a valid backend name, a numeric value or a cluster object [", class(x)[1L], "]")
534#
535#	if( be %in% c('PAR', 'PARALLEL') ) be <- 'Parallel'
536#	# remove leading 'do'
537#	be <- str_c('do', sub('^do', '', be))
538#	# build S3 class name
539#	s3class <- str_c(be, "_backend")
540#
541#	# check the registration routine is available
542#	regfun <- .foreach_regfun(be)
543#
544#	structure(list(name=be, args=args), class=c(s3class, 'foreach_backend'))
545#}
546
547is.backend <- function(x) is(x, 'foreach_backend')
548
549#' @export
550print.foreach_backend <- function(x, ...){
551	cat("<foreach backend:", x$name, ">\n", sep='')
552	if( length(x$data) ){
553		cat("Specifications:\n")
554		str(x$data)
555	}
556}
557
558.foreach_regfun <- function(name){
559
560	# early exit for doSEQ
561	if( name == 'doSEQ' ) return( registerDoSEQ )
562
563	# build name of registration function
564	s <- str_c(toupper(substring(name, 1,1)), substring(name, 2))
565	funname <- str_c('register', s)
566	s3class <- str_c(name, "_backend")
567
568	# require definition package
569	if( !require.quiet(name, character.only=TRUE) )
570		stop("could not find package for foreach backend '", name, "'")
571	# check for registering function or generic
572	if( is.null(regfun <- getFunction(funname, mustFind=FALSE, where=asNamespace(name))) ){
573		if( is.null(regfun <- getS3method('register', s3class, optional=TRUE)) )
574			stop("could not find registration routine for foreach backend '", name, "'")
575		#			stop("backend '", name,"' is not supported: function "
576		#							,"`", regfun, "` and S3 method `register.", s3class, "` not found.")
577	}
578	regfun
579}
580
581
582#' \code{getDoParHosts} is a generic function that returns the hostname of the worker nodes used by a backend.
583#'
584#' @export
585#' @rdname foreach
586#' @inline
587setGeneric('getDoParHosts', function(object, ...) standardGeneric('getDoParHosts'))
588setOldClass('foreach_backend')
589#' Default method that tries to heuristaically infer the number of hosts and in last
590#' resort temporarly register the backend and performs a foreach loop, to retrieve the
591#' nodename from each worker.
592setMethod('getDoParHosts', 'ANY',
593	function(object, ...){
594
595		be <- if( missing(object) ) ForeachBackend(...) else ForeachBackend(object, ...)
596		if( existsMethod('getDoParHosts', class(be)[1L]) ) return( callGeneric(object) )
597
598		# default behaviour
599		nodename <- setNames(Sys.info()['nodename'], NULL)
600
601		if( is.null(be) || is.null(be$data) ) return( NULL )
602		# doSEQ
603		if( be$name == 'doSEQ' )
604			return( nodename )
605		if( isNumber(be$data) )
606			return( rep(nodename, be$data) )
607		if( length(be$data) && isNumber(be$data[[1]]) )
608			return( rep(nodename, be$data[[1]]) )
609		if( length(be$data) && be$name == 'doParallel' )
610			return( sapply(be$data[[1L]], '[[', 'host') )
611
612		if( !missing(object) ){ # backend passed: register temporarly
613			ob <- getDoBackend()
614			on.exit( setDoBackend(ob) )
615			registerDoBackend(be)
616		}
617		setNames(unlist(times(getDoParWorkers()) %dopar% { Sys.info()['nodename'] }), NULL)
618	}
619)
620
621#' \code{getDoParNHosts} returns the number of hosts used by a backend.
622#'
623#' @export
624#' @rdname foreach
625getDoParNHosts <- function(object){
626	if( missing(object) ) foreach::getDoParWorkers()
627	else{
628		length(getDoParHosts(object))
629	}
630}
631
632# add new option: limit.cores indicates if the number of cores used in parallel
633# computation can exceed the detected number of CPUs on the host.
634#.OPTIONS$newOptions(limit.cores=TRUE)
635
636#' Computational Setup Functions
637#'
638#' @description
639#' Functions used internally to setup the computational environment.
640#'
641#' \code{setupBackend} sets up a foreach backend given some specifications.
642#'
643#' @param spec target parallel specification: either \code{TRUE} or \code{FALSE},
644#' or a single numeric value that specifies the number of cores to setup.
645#' @param backend value from argument \code{.pbackend} of \code{nmf}.
646#' @param optional a logical that indicates if the specification must be fully
647#' satisfied, throwing an error if it is not, or if one can switch back to
648#' sequential, only outputting a verbose message.
649#' @param verbose logical or integer level of verbosity for message outputs.
650#'
651#' @return Returns \code{FALSE} if no foreach backend is to be used, \code{NA} if the currently
652#' registered backend is to be used, or, if this function call registered a new backend,
653#' the previously registered backend as a \code{foreach} object, so that it can be restored
654#' after the computation is over.
655#' @keywords internals
656#' @rdname setup
657setupBackend <- function(spec, backend, optional=FALSE, verbose=FALSE){
658
659	pbackend <- backend
660	str_backend <- quick_str(pbackend)
661	# early exit: FALSE specification or NA backend means not using foreach at all
662	if( isFALSE(spec) || is_NA(pbackend) ) return(FALSE)
663	# use doParallel with number of cores if specified in backend
664	if( is.numeric(pbackend) ){
665		spec <- pbackend
666		pbackend <- 'PAR'
667	}
668	# identify doSEQ calls
669	doSEQ <- formatDoName(pbackend) == 'doSEQ'
670
671	# custom error function
672	pcomp <- is.numeric(spec) && !identical(spec[1], 1)
673	errorFun <- function(value=FALSE, stop=FALSE, level=1){
674		function(e, ...){
675			if( !is(e, 'error') ) e <- list(message=str_c(e, ...))
676
677			pref <- if( pcomp ) "Parallel" else "Foreach"
678			if( !optional || stop ){
679				if( verbose >= level ) message('ERROR')
680				stop(pref, " computation aborted: ", e$message, call.=FALSE)
681			}else if( verbose >= level ){
682				message('NOTE')
683				message("# NOTE: ", pref, " computation disabled: ", e$message)
684			}
685			value
686		}
687	}
688
689	# check current backend if backend is NULL
690	if( is.null(pbackend) ){
691		if( verbose > 1 ){
692			message("# Using current backend ... ", appendLF=FALSE)
693		}
694		ok <- tryCatch({
695			if( is.null(parname <- getDoParName()) )
696				stop("argument '.pbackend' is NULL but there is no registered backend")
697			if( verbose > 1 ) message('OK [', parname, ']')
698			TRUE
699		}, error = errorFun())
700		if( !ok ) return(FALSE)
701		# exit now since there is nothing to setup, nothing should change
702		# return NULL so that the backend is not restored on.exit of the parent call.
703		return(NA)
704	}
705	##
706
707	# test if requested number of cores is actually available
708	NCORES <- getMaxCores(limit=FALSE)
709	if( verbose > 2 ) message("# Check available cores ... [", NCORES, ']')
710	if( verbose > 2 ) message("# Check requested cores ... ", appendLF=FALSE)
711	ncores <- if( doSEQ ) 1L
712		else{
713			ncores <- tryCatch({
714					if( is.numeric(spec) ){
715						if( length(spec) == 0L )
716							stop("no number of cores specified for backend '", str_backend, "'")
717						spec <- spec[1]
718						if( spec <= 0L )
719							stop("invalid negative number of cores [", spec, "] specified for backend '", str_backend, "'")
720						spec
721					}else # by default use the 'cores' option or half the number of cores
722						getMaxCores() #getOption('cores', ceiling(NCORES/2))
723				}, error = errorFun(stop=TRUE))
724			if( isFALSE(ncores) ) return(FALSE)
725			ncores
726		}
727	if( verbose > 2 ) message('[', ncores, ']')
728
729	# create backend object
730	if( verbose > 2 ) message("# Loading backend for specification `", str_backend, "` ... ", appendLF=FALSE)
731	newBackend <- tryCatch({
732			# NB: limit to the number of cores available on the host
733			if( !doSEQ ) ForeachBackend(pbackend, min(ncores, NCORES))
734			else ForeachBackend(pbackend)
735		}, error = errorFun(level=3))
736	if( isFALSE(newBackend) ) return(FALSE)
737	if( verbose > 2 ) message('OK')
738
739	if( verbose > 1 ) message("# Check host compatibility ... ", appendLF=FALSE)
740	ok <- tryCatch({
741		# check if we're not running on MAC from GUI
742		if( is.Mac(check.gui=TRUE) && (newBackend$name == 'doMC' || (newBackend$name == 'doParallel' && is.numeric(newBackend$data[[1]]))) ){
743			# error only if the parallel computation was explicitly asked by the user
744			stop("multicore parallel computations are not safe from R.app on Mac OS X."
745					, "\n  -> Use a terminal session, starting R from the command line.")
746		}
747		TRUE
748		}, error = errorFun())
749	if( !ok ) return(FALSE)
750	if( verbose > 1 ) message('OK')
751
752	if( verbose > 1 ) message("# Registering backend `", newBackend$name, "` ... ", appendLF=FALSE)
753	# try registering the backend
754	oldBackend <- getDoBackend()
755	# setup retoration of backend in case of an error
756	# NB: the new backend cleanup will happens only
757	# if regsitration succeeds, since the cleanup routine is
758	# setup after the registration by the suitable register S3 method.
759	on.exit( setDoBackend(oldBackend, cleanup=TRUE) )
760
761	ov <- lverbose(verbose)
762	ok <- tryCatch({
763			registerDoBackend(newBackend)
764			TRUE
765		}
766		, error ={
767			lverbose(ov)
768			errorFun()
769		})
770	lverbose(ov)
771	if( !ok ) return(FALSE)
772	if( verbose > 1 ) message('OK')
773
774	# check allocated cores if not doSEQ backend
775	if( newBackend$name != 'doSEQ' ){
776		# test allocated number of cores
777		if( verbose > 2 ) message("# Check allocated cores ... ", appendLF=FALSE)
778		wcores <- getDoParWorkers()
779		if( ncores > 0L && wcores < ncores ){
780			if( !optional ){
781				errorFun(level=3)("only ", wcores, " core(s) available [requested ", ncores ," core(s)]")
782			}else if( verbose > 2 ){
783				message('NOTE [', wcores, '/', ncores, ']')
784				message("# NOTE: using only ", wcores,
785						" core(s) [requested ", ncores ," core(s)]")
786			}
787		}
788		else if( verbose > 2 ){
789			message('OK [', wcores, '/', ncores
790					, if(ncores != NCORES ) str_c(' out of ', NCORES)
791					, ']')
792		}
793	}
794
795	# cancel backend restoration
796	on.exit()
797	# return old backend
798	oldBackend
799}
800
801
802# add extra package bigmemory and synchronicity on Unix platforms
803if( .Platform$OS.type != 'windows' ){
804	setPackageExtra('install.packages', 'bigmemory', pkgs='bigmemory')
805	setPackageExtra('install.packages', 'synchronicity', pkgs='synchronicity')
806}
807# add new option: shared.memory that indicates if one should try using shared memory
808# to speed-up parallel computations.
809.OPTIONS$newOptions(shared.memory = (.Platform$OS.type != 'windows' && !is.Mac()))
810
811
812#' \code{setupSharedMemory} checks if one can use the packages \emph{bigmemory} and \emph{sychronicity}
813#' to speed-up parallel computations when not keeping all the fits.
814#' When both these packages are available, only one result per host is written on disk,
815#' with its achieved deviance stored in shared memory, that is accessible to all cores on
816#' a same host.
817#' It returns \code{TRUE} if both packages are available and NMF option \code{'shared'} is
818#' toggled on.
819#'
820#' @rdname setup
821setupSharedMemory <- function(verbose){
822
823	if( verbose > 1 ) message("# Check shared memory capability ... ", appendLF=FALSE)
824	# early exit if option shared is off
825	if( !nmf.getOption('shared.memory') ){
826		if( verbose > 1 ) message('SKIP [disabled]')
827		return(FALSE)
828	}
829	# early exit if foreach backend is doMPI: it is not working, not sure why
830	if( isMPIBackend() ){
831		if( verbose > 1 ) message('SKIP [MPI cluster]')
832		return(FALSE)
833	}
834	# not on Windows
835	if( .Platform$OS.type == 'windows' ){
836		if( verbose > 1 ) message('SKIP [Windows OS]')
837		return(FALSE)
838	}
839
840	if( !require.quiet('bigmemory', character.only=TRUE) ){
841		if( verbose > 1 ){
842			message('NO', if( verbose > 2 ) ' [Package `bigmemory` required]')
843		}
844		return(FALSE)
845	}
846	if( !require.quiet('synchronicity', character.only=TRUE) ){
847		if( verbose > 1 ){
848			message('NO', if( verbose > 2 ) ' [Package `synchronicity` required]')
849		}
850		return(FALSE)
851	}
852	if( verbose > 1 ) message('YES', if( verbose > 2 ) ' [synchronicity]')
853	TRUE
854}
855
856is.doSEQ <- function(){
857	dn <- getDoParName()
858	is.null(dn) || dn == 'doSEQ'
859}
860
861#' \code{setupTempDirectory} creates a temporary directory to store the best fits computed on each host.
862#' It ensures each worker process has access to it.
863#'
864#' @rdname setup
865setupTempDirectory <- function(verbose){
866
867	# - Create a temporary directory to store the best fits computed on each host
868	NMF_TMPDIR <- tempfile('NMF_', getwd())
869	if( verbose > 2 ) message("# Setup temporary directory: '", NMF_TMPDIR, "' ... ", appendLF=FALSE)
870	dir.create(NMF_TMPDIR)
871	if( !is.dir(NMF_TMPDIR) ){
872		if( verbose > 2 ) message('ERROR')
873		nmf_stop('nmf', "could not create temporary result directory '", NMF_TMPDIR, "'")
874	}
875
876	on.exit( unlink(NMF_TMPDIR, recursive=TRUE) )
877	# ensure that all workers can see the temporary directory
878	wd <- times(getDoParWorkers()) %dopar% {
879		if( !file_test('-d', NMF_TMPDIR) )
880			dir.create(NMF_TMPDIR, recursive=TRUE)
881		file_test('-d', NMF_TMPDIR)
882	}
883	# check it worked
884	if( any(!wd) ){
885		if( verbose > 2 ) message('ERROR')
886		nmf_stop('nmf', "could not create/see temporary result directory '", NMF_TMPDIR, "' on worker nodes ", str_out(which(!wd), Inf))
887	}
888	if( verbose > 2 ) message('OK')
889	on.exit()
890	NMF_TMPDIR
891}
892
893#' Utilities for Parallel Computations
894#'
895#'
896#' @rdname parallel
897#' @name parallel-NMF
898NULL
899
900#' \code{ts_eval} generates a thread safe version of \code{\link{eval}}.
901#' It uses boost mutexes provided by the \code{\link[synchronicity]{synchronicity}}
902#' package.
903#' The generated function has arguments \code{expr} and \code{envir}, which are passed
904#' to \code{\link{eval}}.
905#'
906#' @param mutex a mutex or a mutex descriptor.
907#' If missing, a new mutex is created via the function \code{\link[synchronicity]{boost.mutex}}.
908#' @param verbose a logical that indicates if messages should be printed when
909#' locking and unlocking the mutex.
910#'
911#' @rdname parallel
912#' @export
913ts_eval <- function(mutex = synchronicity::boost.mutex(), verbose=FALSE){
914
915
916	requireNamespace('bigmemory')
917	#library(bigmemory)
918	requireNamespace('synchronicity')
919	#library(synchronicity)
920	# describe mutex if necessary
921	.MUTEX_DESC <-
922			if( is(mutex, 'boost.mutex') ) synchronicity::describe(mutex)
923			else mutex
924
925	loadpkg <- TRUE
926	function(expr, envir=parent.frame()){
927
928		# load packages once
929		if( loadpkg ){
930			requireNamespace('bigmemory')
931			#library(bigmemory)
932			requireNamespace('synchronicity')
933			#library(synchronicity)
934			loadpkg <<- FALSE
935		}
936		MUTEX <- synchronicity::attach.mutex(.MUTEX_DESC)
937		synchronicity::lock(MUTEX)
938		if( verbose )
939			message('#', Sys.getpid(), " - START mutex: ", .MUTEX_DESC@description$shared.name)
940		ERROR <- "### <Error in mutex expression> ###\n"
941		on.exit({
942			if( verbose ){
943				message(ERROR, '#', Sys.getpid(), " - END mutex: ", .MUTEX_DESC@description$shared.name)
944			}
945			synchronicity::unlock(MUTEX)
946		})
947
948		eval(expr, envir=envir)
949
950		ERROR <- NULL
951	}
952}
953
954#' \code{ts_tempfile} generates a \emph{unique} temporary filename
955#' that includes the name of the host machine and/or the caller's process id,
956#' so that it is thread safe.
957#'
958#' @inheritParams base::tempfile
959#' @param ... extra arguments passed to \code{\link[base]{tempfile}}.
960#' @param host logical that indicates if the host machine name should
961#' be appear in the filename.
962#' @param pid logical that indicates if the current process id
963#' be appear in the filename.
964#'
965#' @rdname parallel
966#' @export
967ts_tempfile <- function(pattern = "file", ..., host=TRUE, pid=TRUE){
968	if( host ) pattern <- c(pattern, Sys.info()['nodename'])
969	if( pid ) pattern <- c(pattern, Sys.getpid())
970	tempfile(paste(pattern, collapse='_'), ...)
971}
972
973#' \code{hostfile} generates a temporary filename composed with
974#' the name of the host machine and/or the current process id.
975#'
976#' @inheritParams base::tempfile
977#' @inheritParams ts_tempfile
978#'
979#' @rdname parallel
980#' @export
981hostfile <- function(pattern = "file", tmpdir=tempdir(), fileext='', host=TRUE, pid=TRUE){
982	if( host ) pattern <- c(pattern, Sys.info()['nodename'])
983	if( pid ) pattern <- c(pattern, Sys.getpid())
984	file.path(tmpdir, str_c(paste(pattern, collapse='.'), fileext))
985}
986
987#' \code{gVariable} generates a function that access a global static variable,
988#' possibly in shared memory (only for numeric matrix-coercible data in this case).
989#' It is used primarily in parallel computations, to preserve data accross
990#' computations that are performed by the same process.
991#'
992#' @param init initial value
993#' @param shared a logical that indicates if the variable should be stored in shared
994#' memory or in a local environment.
995#'
996#' @rdname parallel
997#' @export
998gVariable <- function(init, shared=FALSE){
999
1000	if( shared ){ # use bigmemory shared matrices
1001		if( !is.matrix(init) )
1002			init <- as.matrix(init)
1003		requireNamespace('bigmemory')
1004		#library(bigmemory)
1005		DATA <- bigmemory::as.big.matrix(init, type='double', shared=TRUE)
1006		DATA_DESC <- bigmemory::describe(DATA)
1007	}else{ # use variables assigned to .GlobalEnv
1008		DATA_DESC <- basename(tempfile('.gVariable_'))
1009	}
1010
1011	.VALUE <- NULL
1012	.loadpkg <- TRUE
1013	function(value){
1014
1015		# load packages once
1016		if( shared && .loadpkg ){
1017			requireNamespace('bigmemory')
1018			#library(bigmemory)
1019			.loadpkg <<- FALSE
1020		}
1021
1022		# if shared: attach bigmemory matrix from its descriptor object
1023		if( shared ){
1024			DATA <- bigmemory::attach.big.matrix(DATA_DESC)
1025		}
1026
1027		if( missing(value) ){# READ ACCESS
1028			if( !shared ){
1029				# initialise on first call if necessary
1030				if( is.null(.VALUE) ) .VALUE <<- init
1031				# return variable
1032				.VALUE
1033			}else
1034				DATA[]
1035
1036		}else{# WRITE ACCESS
1037			if( !shared ) .VALUE <<- value
1038			else DATA[] <- value
1039
1040		}
1041	}
1042}
1043
1044#' \code{setupLibPaths} add the path to the NMF package to each workers' libPaths.
1045#'
1046#' @param pkg package name whose path should be exported the workers.
1047#'
1048#' @rdname setup
1049setupLibPaths <- function(pkg='NMF', verbose=FALSE){
1050
1051	# do nothing in sequential mode
1052	if( is.doSEQ() ) return( character() )
1053
1054	if( verbose ){
1055		message("# Setting up libpath on workers for package(s) "
1056			, str_out(pkg, Inf), ' ... ', appendLF=FALSE)
1057	}
1058	p <- path.package(pkg)
1059	if( is.null(p) ) return()
1060
1061	if( !isDevNamespace(pkg) ){ # not a dev package
1062		plibs <- dirname(p)
1063		libs <- times(getDoParWorkers()) %dopar% {
1064			.libPaths(c(.libPaths(), plibs))
1065		}
1066		libs <- unique(unlist(libs))
1067		if( verbose ){
1068			message("OK\n# libPaths:\n", paste('  ', libs, collapse="\n"))
1069		}
1070		libs
1071		pkg
1072	}else if( getDoParName() != 'doParallel' || !isNumber(getDoBackend()$data) ){
1073		# devmode: load the package + depends
1074		if( verbose ){ message("[devtools::load_all] ", appendLF=FALSE) }
1075		times(getDoParWorkers()) %dopar% {
1076			capture.output({
1077				suppressMessages({
1078					requireNamespace('devtools')
1079					#library(devtools)
1080					requireNamespace('bigmemory')
1081					#library(bigmemory)
1082					devtools::load_all(p)
1083				})
1084			})
1085		}
1086		if( verbose ){ message("OK") }
1087		c('bigmemory', 'rngtools')
1088	}
1089	else if( verbose ){
1090		message("OK")
1091	}
1092}
1093
1094#StaticWorkspace <- function(..., .SHARED=FALSE){
1095#
1096#	# create environment
1097#	e <- new.env(parent=.GlobalEnv)
1098#	# fill with initial data
1099#	vars <- list(...)
1100#	if( .SHARED ){
1101#		lapply(names(vars), function(x){
1102#			bm <- bigmemory::as.big.matrix(vars[[x]], type='double', shared=TRUE)
1103#			e[[x]] <- bigmemory::describe(bm)
1104#		})
1105#	}else
1106#		list2env(vars, envir=e)
1107#
1108#	structure(e, shared=.SHARED, class=c("static_wsp", 'environment'))
1109#}
1110#
1111#`[[.static_wsp` <- function(x, ..., exact = TRUE){
1112#	if( attr(x, 'shared') ){
1113#		var <- bigmemory::attach.big.matrix(NextMethod())
1114#		var[]
1115#	}else
1116#		NextMethod()
1117#}
1118#
1119#`[[.static_wsp<-` <- function(x, i, value){
1120#
1121#	if( attr(x, 'shared') ){
1122#		var <- bigmemory::attach.big.matrix(x[[i]])
1123#		var[] <- value
1124#	}else
1125#		x[[i]] <- value
1126#	x
1127#}
1128
1129
1130isRNGseed <- function(x){
1131	is.numeric(x) ||
1132			( is.list(x)
1133				&& is.null(names(x))
1134				&& all(sapply(x, is.numeric)) )
1135}
1136
1137#' \code{setupRNG} sets the RNG for use by the function nmf.
1138#' It returns the old RNG as an rstream object or the result of set.seed
1139#'  if the RNG is not changed due to one of the following reason:
1140#'  - the settings are not compatible with rstream
1141#'
1142#' @param seed initial RNG seed specification
1143#' @param n number of RNG seeds to generate
1144#'
1145#' @rdname setup
1146setupRNG <- function(seed, n, verbose=FALSE){
1147
1148	if( verbose == 2 ){
1149		message("# Setting up RNG ... ", appendLF=FALSE)
1150		on.exit( if( verbose == 2 ) message("OK") )
1151	}else if( verbose > 2 ) message("# Setting up RNG ... ")
1152
1153	if( verbose > 3 ){
1154		message("# ** Original RNG settings:")
1155		showRNG()
1156	}
1157
1158	# for multiple runs one always uses RNGstreams
1159	if( n > 1 ){
1160
1161		# seeding with numeric values only
1162		if( is.list(seed) && isRNGseed(seed) ){
1163			if( length(seed) != n )
1164				stop("Invalid list of RNG seeds: must be of length ", n)
1165
1166			if( verbose > 2 ) message("# Using supplied list of RNG seeds")
1167			return(seed)
1168
1169		}else if( is.numeric(seed) ){
1170
1171			if( verbose > 2 ){
1172				message("# Generate RNGStream sequence using seed ("
1173						, RNGstr(seed), ") ... "
1174						, appendLF=FALSE)
1175			}
1176			res <- RNGseq(n, seed)
1177			if( verbose > 2 ) message("OK")
1178			return(res)
1179
1180		}else{ # create a sequence of RNGstream using a random seed
1181			if( verbose > 2 ){
1182				message("# Generate RNGStream sequence using a random seed ... "
1183						, appendLF=FALSE)
1184			}
1185			res <- RNGseq(n, NULL)
1186			if( verbose > 2 ) message("OK")
1187			return(res)
1188		}
1189	}else if( is.numeric(seed) ){
1190		# for single runs: 1-length seeds are used to set the current RNG
1191		# 6-length seeds are used to set RNGstream
1192
1193		if( !is.vector(seed) ){
1194			message('ERROR')
1195			stop("NMF::nmf - Invalid numeric seed: expects a numeric vector.")
1196		}
1197
1198		# convert to an integer vector
1199		seed <- as.integer(seed)
1200		# immediately setup the RNG in the standard way
1201		if( length(seed) == 1L ){
1202			if( verbose > 2 ){
1203				message("# RNG setup: standard [seeding current RNG]")
1204				message("# Seeding current RNG with seed (", seed, ") ... "
1205						, appendLF=FALSE)
1206			}
1207			set.seed(seed)
1208			if( verbose > 2 ) message("OK")
1209			return( getRNG() )
1210		}else if( length(seed) == 6L ){
1211			if( verbose > 2 ){
1212				message("# RNG setup: reproducible [using RNGstream]")
1213				message("# Generate RNGStream sequence using seed ("
1214						, RNGstr(seed), ") ... "
1215						, appendLF=FALSE)
1216			}
1217			res <- RNGseq(1, seed)
1218			setRNG(res)
1219			if( verbose > 2 ) message("OK")
1220			return( res )
1221		}else{
1222			if( verbose > 2 ){
1223				message("# RNG setup: directly setting RNG")
1224				message("# Setting RNG with .Random.seed= ("
1225						, RNGstr(seed), ") ... "
1226						, appendLF=FALSE)
1227			}
1228			setRNG(seed, verbose > 2)
1229			if( verbose > 2 ) message("OK")
1230			return( getRNG() )
1231		}
1232		stop("NMF::nmf - Invalid numeric seed: unexpected error.")
1233	}else{
1234		if( verbose > 2 ) message("# RNG setup: standard [using current RNG]")
1235		NULL
1236	}
1237}
1238
1239##################################################################
1240## END
1241##################################################################
1242