1#' Apply a Function to a Data Frame Split by Factors via Futures
2#'
3#' @inheritParams future_lapply
4#'
5#' @param data An \R object, normally a data frame, possibly a matrix.
6#'
7#' @param INDICES A factor or a list of factors, each of length `nrow(data)`.
8#'
9#' @param FUN a function to be applied to (usually data-frame) subsets of `data`.
10#'
11#' @param \ldots Additional arguments pass to [future_lapply()] and
12#'   then to `FUN()`.
13#'
14#' @param simplify logical: see [base::tapply].
15#'
16#' @return
17#' An object of class "by", giving the results for each subset.
18#' This is always a list if simplify is false, otherwise a list
19#' or array (see [base::tapply]).
20#' See also [base::by()] for details.
21#'
22#' @example incl/future_by.R
23#'
24#' @details
25#' Internally, `data` is grouped by `INDICES` into a list of `data`
26#' subset elements which is then processed by [future_lapply()].
27#' When the groups differ significantly in size, the processing time
28#' may differ significantly between the groups.
29#' To correct for processing-time imbalances, adjust the amount of chunking
30#' via arguments `future.scheduling` and `future.chunk.size`.
31#'
32#' @section Note on 'stringsAsFactors':
33#' The `future_by()` is modeled as closely as possible to the
34#' behavior of `base::by()`.  Both functions have "default" S3 methods that
35#' calls `data <- as.data.frame(data)` internally.  This call may in turn call
36#' an S3 method for `as.data.frame()` that coerces strings to factors or not
37#' depending on whether it has a `stringsAsFactors` argument and what its
38#' default is.
39#' For example, the S3 method of `as.data.frame()` for lists changed its
40#' (effective) default from `stringsAsFactors = TRUE` to
41#' `stringsAsFactors = TRUE` in R 4.0.0.
42#'
43#'
44#' @rdname future_by
45#' @export
46future_by <- function(data, INDICES, FUN, ..., simplify = TRUE, future.envir = parent.frame()) {
47  future.envir <- force(future.envir)
48  UseMethod("future_by")
49}
50
51
52#' @export
53future_by.default <- function(data, INDICES, FUN, ..., simplify = TRUE, future.envir = parent.frame()) {
54  ndim <- length(dim(data))
55  .SUBSETTER <- if (ndim == 0L) {
56     function(row) data[row, , drop = TRUE]
57  } else {
58     function(row) data[row, , drop = FALSE]
59  }
60
61  data <- as.data.frame(data)
62  future_by_internal(data = data, INDICES = INDICES, FUN = FUN, ...,
63                     simplify = simplify,
64		     .INDICES.NAME = deparse(substitute(INDICES))[1L],
65		     .CALL = match.call(),
66		     .SUBSETTER = .SUBSETTER,
67                     future.envir = future.envir)
68}
69
70
71#' @export
72future_by.data.frame <- function(data, INDICES, FUN, ..., simplify = TRUE, future.envir = parent.frame()) {
73  future_by_internal(data = data, INDICES = INDICES, FUN = FUN, ...,
74                     simplify = simplify,
75		     .INDICES.NAME = deparse(substitute(INDICES))[1L],
76		     .CALL = match.call(),
77		     .SUBSETTER = function(row) data[row, , drop = FALSE],
78                     future.envir = future.envir)
79}
80
81
82
83future_by_internal <- function(data, INDICES, FUN, ..., simplify = TRUE, .SUBSETTER, .CALL, .INDICES.NAME, future.envir = parent.frame(), future.label = "future_by-%d") {
84  FUN <- if (!is.null(FUN)) match.fun(FUN)
85  stop_if_not(is.function(.SUBSETTER))
86
87  if (!is.list(INDICES)) {
88    INDEX <- vector("list", length = 1L)
89    INDEX[[1L]] <- INDICES
90    names(INDEX) <- .INDICES.NAME
91    INDICES <- INDEX
92    INDEX <- NULL ## Not needed anymore
93  }
94
95  INDICES <- lapply(INDICES, FUN = as.factor)
96  nI <- length(INDICES)
97  if (!nI) stop("'INDICES' is of length zero")
98
99  nd <- nrow(data)
100  if (!all(lengths(INDICES) == nd)) {
101    stop("All elements of argument 'INDICES' must have same length as 'data'")
102  }
103
104  namelist <- lapply(INDICES, FUN = levels)
105  extent <- lengths(namelist, use.names = FALSE)
106  cumextent <- cumprod(extent)
107  if (cumextent[nI] > .Machine$integer.max)
108    stop("total number of levels >= 2^31")
109
110  storage.mode(cumextent) <- "integer"
111  ngroup <- cumextent[nI]
112  group <- as.integer(INDICES[[1L]])
113  if (nI > 1L) {
114    for (i in 2L:nI) {
115      group <- group + cumextent[i - 1L] * (as.integer(INDICES[[i]]) - 1L)
116    }
117  }
118  cumextent <- NULL ## Not needed anymore
119
120  levels(group) <- as.character(seq_len(ngroup))
121  class(group) <- "factor"
122  ans <- split(seq_len(nd), f = group)
123  names(ans) <- NULL
124  index <- as.logical(lengths(ans) > 0L)
125  group <- NULL ## Not needed anymore
126
127  grouped_data <- lapply(X = ans[index], FUN = .SUBSETTER)
128  ans <- future_lapply(X = grouped_data, FUN = FUN, ..., future.envir = future.envir, future.label = future.label)
129  grouped_data <- NULL  ## Not needed anymore
130
131  ansmat <- array({
132    if (simplify && all(lengths(ans) == 1L)) {
133      ans <- unlist(ans, recursive = FALSE, use.names = FALSE)
134      if (!is.null(ans) && is.atomic(ans)) vector(typeof(ans)) else NA
135    } else {
136      vector("list", length = prod(extent))
137    }
138  }, dim = extent, dimnames = namelist)
139
140  if (length(ans) > 0L) ansmat[index] <- ans
141  ans <- NULL ## Not needed anymore
142
143  structure(ansmat,
144    call = .CALL,
145    class = "by"
146  )
147}
148