1<%@meta language="R-vignette" content="--------------------------------
2%\VignetteIndexEntry{A Future for R: Future Topologies}
3%\VignetteAuthor{Henrik Bengtsson}
4%\VignetteKeyword{R}
5%\VignetteKeyword{package}
6%\VignetteKeyword{vignette}
7%\VignetteKeyword{future}
8%\VignetteKeyword{promise}
9%\VignetteEngine{R.rsp::rsp}
10%\VignetteTangle{FALSE}
11--------------------------------------------------------------------"%>
12<%
13library("R.utils")
14`%<-%` <- future::`%<-%`
15options("withCapture/newline"=FALSE)
16%>
17
18# <%@meta name="title"%>
19
20Futures can be nested in R such that one future creates another set of futures and so on.  This may, for instance, occur within nested for loops, e.g.
21```r
22library("future")
23library("listenv")
24x <- listenv()
25for (ii in 1:3) {
26  x[[ii]] %<-% {
27    y <- listenv()
28    for (jj in 1:3) {
29      y[[jj]] %<-% { ii + jj / 10 }
30    }
31    y
32  }
33}
34unlist(x)
35## [1] 1.1 1.2 1.3 2.1 2.2 2.3 3.1 3.2 3.3
36```
37
38The default is to use synchronous futures unless otherwise specified, which is also true for nested futures.  If we for instance specify, `plan(multisession)`, the first layer of futures (`x[[ii]] %<-% { expr }`) will be processed asynchronously in background R processes, and the futures in the second layer of futures (`y[[jj]] %<-% { expr }`) will be processed synchronously in the separate background R processes.  If we wish to be explicit about this, we can specify `plan(list(multisession, sequential))`.
39
40
41## Example: High-Throughput Sequencing
42
43Consider a high-throughput sequencing (HT-Seq) project with 50 human DNA samples where we have one FASTQ file per sample containing the raw sequence reads as they come out of the sequencing machine.  With this data, we wish to align each FASTQ to a reference genome such that we generate 24 individual BAM files per sample - one per chromosome.
44
45Here is the layout of what such an analysis could look like in R using futures.
46```r
47library("future")
48library("listenv")
49htseq_align <- function(fq, chr) { chr }
50
51fqs <- dir(pattern = "[.]fastq$")
52
53bams <- listenv()
54for (ss in seq_along(fqs)) {
55  fq <- fqs[ss]
56  bams[[ss]] %<-% {
57    bams_ss <- listenv()
58    for (cc in 1:24) {
59      bams_ss[[cc]] %<-% htseq_align(fq, chr = cc)
60    }
61    as.list(bams_ss)
62  }
63}
64bams <- as.list(bams)
65```
66
67The default is to use synchronous futures, so without further specifications, the above will process each sample and each chromosome sequentially.  Next, we will consider what can be done with the following two computer setups:
68
69* A single machine with 8 cores
70* A compute cluster with 3 machines each with 16 cores
71
72### One multi-core machine
73
74With a single machine of 8 cores, we could choose to process multiple samples at the same time while processing chromosomes sequentially.  In other words, we would like to evaluate the outer layer of futures using multisession futures and the inner ones as sequential futures.  This can be specified as:
75```r
76plan(list(multisession, sequential))
77```
78The internals for processing multisession future queries `availableCores()` to infer how many cores can be used simultaneously, so there is no need to explicitly specify that there are 8 cores available.
79
80_Comment_: Since synchronous is the default future, we could skip trailing sequential futures in the setup, e.g. `plan(list(multisession))` or just `plan(multisession)`.  However, it does not hurt to be explicit.
81
82If we instead would like to process the sample sequentially and the chromosomes in parallel, we can use:
83```r
84plan(list(sequential, multisession))
85```
86
87#### Built-in protection against recursive parallelism
88
89Above we have processed either the outer or the inner set of future in parallel.  What if we want to process both layers in parallel?  It's tempting to use:
90```r
91plan(list(multisession, multisession))
92```
93Although this does not give an error, we will find that the inner layer of futures will be processed sequentially just as if we would use `plan(list(multisession, sequential))`.  This behavior is due to the built-in protection against nested parallelism.  If both layers would run in parallel, each using the 8 cores available on the machine, we would be running 8 * 8 = 64 parallel processes - that would for sure overload our computer.  What happens internally is that for the outer layer, `availableCores()` equals eight (8), whereas for the inner layer it equals one (1).
94
95Now, we could imagine that we process the outer layer with, say, two parallel futures, and then the inner layer with four parallel futures.  In that case, we would end up running on at most eight cores (= 2 * 4).  This can be achieved by forcing a fixed number of workers at each layer:
96
97```r
98plan(list(tweak(multisession, workers = 2), tweak(multisession, workers = 4)))
99```
100
101When using this approach, there is a risk of setting up too many concurrent workers.  To make sure the setup respects `availableCores()`, use something like:
102
103```r
104plan(list(
105  tweak(multisession, workers = availableCores() %/% 4),
106  tweak(multisession, workers = 4)
107))
108```
109
110However, before using nested parallelization on a single machine, make sure it is actually more efficient than using parallelization in only one of the layers.
111
112
113### An ad-hoc compute cluster
114
115With a compute cluster of 3 machines each with 16 cores, we can run up to 48 alignment processes in parallel.  A natural setup is to have one machine process one sample in parallel.  We could specify this as:
116```r
117nodes <- c("n1", "n2", "n3")
118plan(list(tweak(cluster, workers = nodes), multisession))
119```
120_Comment:_ Multisession futures are agile to its environment, that is, they will query the machine they are running on to find out how many parallel processes it can run at the same time.
121
122One possible downside to the above setup is that we might not utilize all available cores all the time.  This is because the alignment of the shorter chromosomes will finish sooner than the longer ones, which means that we might at the end of each sample have only a few alignment processes running on each machine leaving the remaining cores idle/unused.  An alternative set up is then to use the following setup:
123```r
124nodes <- rep(c("n1", "n2", "n3"), each = 8)
125plan(list(tweak(cluster, workers = nodes), multisession))
126```
127This will cause up to 24 (= 3*8) samples to be processed in parallel each processing two chromosomes at the same time.
128
129
130## Example: A Remote Compute Cluster
131
132Imagine we have access to a remote compute cluster, with login node `remote.server.org`, and that the cluster has three nodes `n1`, `n2`, and `n3`.  Also, let us assume we have already set up the cluster such that we can log in via public key authentication via SSH, i.e. when we do `ssh remote.server.org` authentication is done automatically.
133
134With the above setup, we can use nested futures in our local R session to evaluate R expression on the remote compute cluster and its three nodes.  Here is a proof of concept illustrating how the different nested futures are evaluated on different machines.
135
136```r
137library("future")
138library("listenv")
139
140## Set up access to remote login node
141login <- tweak(remote, workers = "remote.server.org")
142plan(login)
143
144## Set up cluster nodes on login node
145nodes %<-% { .keepme <- parallel::makeCluster(c("n1", "n2", "n3")) }
146
147## Specify future topology
148## login node -> { cluster nodes } -> { multiple cores }
149plan(list(
150  login,
151  tweak(cluster, workers = nodes),
152  multisession
153))
154
155
156## (a) This will be evaluated on the cluster login computer
157x %<-% {
158  thost <- Sys.info()[["nodename"]]
159  tpid <- Sys.getpid()
160  y <- listenv()
161  for (task in 1:4) {
162    ## (b) This will be evaluated on a compute node on the cluster
163    y[[task]] %<-% {
164      mhost <- Sys.info()[["nodename"]]
165      mpid <- Sys.getpid()
166      z <- listenv()
167      for (jj in 1:2) {
168        ## (c) These will be evaluated in separate processes on the same compute node
169        z[[jj]] %<-% data.frame(task = task,
170		                        top.host = thost, top.pid = tpid,
171                                mid.host = mhost, mid.pid = mpid,
172                                host = Sys.info()[["nodename"]],
173								pid = Sys.getpid())
174      }
175      Reduce(rbind, z)
176    }
177  }
178  Reduce(rbind, y)
179}
180
181print(x)
182##   task top.host top.pid mid.host mid.pid host    pid
183## 1    1    login  391547       n1  391878   n1 393943
184## 2    1    login  391547       n1  391878   n1 393951
185## 3    2    login  391547       n2  392204   n2 393971
186## 4    2    login  391547       n2  392204   n2 393978
187## 5    3    login  391547       n3  392527   n3 394040
188## 6    3    login  391547       n3  392527   n3 394048
189## 7    4    login  391547       n1  391878   n1 393959
190## 8    4    login  391547       n1  391878   n1 393966
191```
192
193Try the above `x %<-% { ... }` future with, say, `plan(list(sequential, multisession))` and see what the output will be.
194
195
196
197## Example: Adjust the Number of Workers for Each Cluster Node
198
199When using
200```r
201nodes <- c("n1", "n2", "n3")
202plan(list(tweak(cluster, workers = nodes), multisession))
203```
204the number of workers used on each of the nodes (`n1`, `n2`, and `n3`) is
205given by the value of `availableCores()` on each of those nodes.  In turn,
206`availableCores()` typically defaults to the number of cores on those nodes.
207Now, imagine you want to use only 50% of these cores.  This can be done by
208tweaking the `multisession` plan by passing a function to `workers`;
209```r
210halfCores <- function() { max(1, round(0.5 * availableCores()))
211plan(list(
212  tweak(cluster, workers = nodes),
213  tweak(multisession, workers = halfCores)
214))
215```
216With this, each node will use at most 50% of the cores available.
217For instance, if `n1` and `n2` have eight cores, and `n3` has 32 cores,
218then they nodes will use four, four, and 16 cores, respectively.
219
220Another example is:
221```r
222customWorkers <- function() {
223  switch(Sys.info()[["nodename"]],
224    "n1" = 2L,
225    "n2" = 3L,
226    ## default:
227    availableCores()
228  )
229}
230plan(list(
231  tweak(cluster, workers = nodes),
232  tweak(multisession, workers = customWorkers)
233))
234```
235In this case, node `n1` will always use two cores, `n2` three cores,
236and `n3` will respect what `availableCores()` returns.
237
238
239[listenv]: https://cran.r-project.org/package=listenv
240[globals]: https://cran.r-project.org/package=globals
241[Futures in R: Common issues with solutions]: future-issues.html
242