1 #include "data.table.h"
2 #ifdef _OPENMP
3 #include <pthread.h>
4 #endif
5 #include <errno.h>     // errno
6 #include <ctype.h>     // isspace
7 
8 static int  DTthreads = -1;   // Never read directly hence static; use getDTthreads(n, /*throttle=*/0|1). -1 so we know for sure initDTthreads() ran and set it >= 1.
9 static int  DTthrottle = -1;  // Thread 1 is assigned DTthrottle iterations before a 2nd thread is utilized; #4484.
10 static bool RestoreAfterFork = true;  // see #2885 in v1.12.0
11 
getIntEnv(const char * name,int def)12 static int getIntEnv(const char *name, int def)
13 {
14   const char *val = getenv(name);
15   if (val==NULL) return def;
16   size_t nchar = strlen(val);
17   if (nchar==0) return def;
18   char *end;
19   errno = 0;
20   long int ans = strtol(val, &end, 10);  // ignores leading whitespace. If it fully consumed the string, *end=='\0' and isspace('\0')==false
21   while (isspace(*end)) end++;  // ignore trailing whitespace
22   if (errno || (size_t)(end-val)!=nchar || ans<1 || ans>INT_MAX) {
23     warning(_("Ignoring invalid %s==\"%s\". Not an integer >= 1. Please remove any characters that are not a digit [0-9]. See ?data.table::setDTthreads."), name, val);
24     return def;
25   }
26   return (int)ans;
27 }
28 
imin(int a,int b)29 static inline int imin(int a, int b) { return a < b ? a : b; }
imax(int a,int b)30 static inline int imax(int a, int b) { return a > b ? a : b; }
31 
initDTthreads()32 void initDTthreads() {
33   // called at package startup from init.c
34   // also called by setDTthreads(threads=NULL) (default) to reread environment variables; see setDTthreads below
35   // No verbosity here in this setter. Verbosity is in getDTthreads(verbose=TRUE)
36   int ans = getIntEnv("R_DATATABLE_NUM_THREADS", INT_MIN);
37   if (ans>=1) {
38     ans = imin(ans, omp_get_num_procs());  // num_procs is a hard limit; user cannot achieve more. ifndef _OPENMP then myomp.h defines this to be 1
39   } else {
40     // Only when R_DATATABLE_NUM_THREADS is unset (or <=0) do we use PROCS_PERCENT; #4514
41     int perc = getIntEnv("R_DATATABLE_NUM_PROCS_PERCENT", 50); // use "NUM_PROCS" to use the same name as the OpenMP function this uses
42     // 50% of logical CPUs by default; half of 8 is 4 on laptop with 4 cores. Leaves plenty of room for other processes: #3395 & #3298
43     if (perc<=1 || perc>100) {
44       warning(_("Ignoring invalid R_DATATABLE_NUM_PROCS_PERCENT==%d. If used it must be an integer between 2 and 100. Default is 50. See ?setDTtheads."), perc);
45       // not allowing 1 is to catch attempts to use 1 or 1.0 to represent 100%.
46       perc = 50;
47     }
48     ans = imax(omp_get_num_procs()*perc/100, 1); // imax for when formula would result in 0.
49   }
50   ans = imin(ans, omp_get_thread_limit());  // honors OMP_THREAD_LIMIT when OpenMP started; e.g. CRAN sets this to 2. Often INT_MAX meaning unlimited/unset
51   ans = imin(ans, omp_get_max_threads());   // honors OMP_NUM_THREADS when OpenMP started, plus reflects any omp_set_* calls made since
52   // max_threads() -vs- num_procs(): https://software.intel.com/en-us/forums/intel-visual-fortran-compiler-for-windows/topic/302866
53   ans = imin(ans, getIntEnv("OMP_THREAD_LIMIT", INT_MAX));  // user might expect `Sys.setenv(OMP_THREAD_LIMIT=2);setDTthreads()` to work. Satisfy this
54   ans = imin(ans, getIntEnv("OMP_NUM_THREADS", INT_MAX));   //   expectation by reading them again now. OpenMP just reads them on startup (quite reasonably)
55   ans = imax(ans, 1);  // just in case omp_get_* returned <=0 for any reason, or the env variables above are set <=0
56   DTthreads = ans;
57   DTthrottle = imax(1, getIntEnv("R_DATATABLE_THROTTLE", 1024)); // 2nd thread is used only when n>1024, 3rd thread when n>2048, etc
58 }
59 
getDTthreads(const int64_t n,const bool throttle)60 int getDTthreads(const int64_t n, const bool throttle) {
61   // this is the main getter used by all parallel regions; they specify num_threads(n, true|false).
62   // Keep this light, simple and robust. initDTthreads() ensures 1 <= DTthreads <= omp_get_num_proc()
63   // throttle introduced in 1.12.10 (see NEWS item); #4484
64   // throttle==true  : a number of iterations per thread (DTthrottle) is applied before a second thread is utilized
65   // throttle==false : parallel region is already pre-chunked such as in fread; e.g. two batches intended for two threads
66   if (n<1) return 1; // 0 or negative could be deliberate in calling code for edge cases where loop is not intended to run at all
67   int64_t ans = throttle ? 1+(n-1)/DTthrottle :  // 1 thread for n<=1024, 2 thread for n<=2048, etc
68                            n;                    // don't use 20 threads for just one or two batches
69   return ans>=DTthreads ? DTthreads : (int)ans;  // apply limit in static local DTthreads saved there by initDTthreads() and setDTthreads()
70 }
71 
mygetenv(const char * name,const char * unset)72 static const char *mygetenv(const char *name, const char *unset) {
73   const char *ans = getenv(name);
74   return (ans==NULL || ans[0]=='\0') ? unset : ans;
75 }
76 
getDTthreads_R(SEXP verbose)77 SEXP getDTthreads_R(SEXP verbose) {
78   if (!isLogical(verbose) || LENGTH(verbose)!=1 || INTEGER(verbose)[0]==NA_LOGICAL) error(_("'verbose' must be TRUE or FALSE"));
79   if (LOGICAL(verbose)[0]) {
80     #ifndef _OPENMP
81       Rprintf(_("This installation of data.table has not been compiled with OpenMP support.\n"));
82     #else
83       Rprintf(_("  OpenMP version (_OPENMP)       %d\n"), _OPENMP); // user can use Google to map 201511 to 4.5; it's odd that OpenMP API does not provide 4.5
84     #endif
85     // this output is captured, paste0(collapse="; ")'d, and placed at the end of test.data.table() for display in the last 13 lines of CRAN check logs
86     // it is also printed at the start of test.data.table() so that we can trace any Killed events on CRAN before the end is reached
87     // this is printed verbatim (e.g. without using data.table to format the output) in case there is a problem even with simple data.table creation/printing
88     Rprintf(_("  omp_get_num_procs()            %d\n"), omp_get_num_procs());
89     Rprintf(_("  R_DATATABLE_NUM_PROCS_PERCENT  %s\n"), mygetenv("R_DATATABLE_NUM_PROCS_PERCENT", "unset (default 50)"));
90     Rprintf(_("  R_DATATABLE_NUM_THREADS        %s\n"), mygetenv("R_DATATABLE_NUM_THREADS", "unset"));
91     Rprintf(_("  R_DATATABLE_THROTTLE           %s\n"), mygetenv("R_DATATABLE_THROTTLE", "unset (default 1024)"));
92     Rprintf(_("  omp_get_thread_limit()         %d\n"), omp_get_thread_limit());
93     Rprintf(_("  omp_get_max_threads()          %d\n"), omp_get_max_threads());
94     Rprintf(_("  OMP_THREAD_LIMIT               %s\n"), mygetenv("OMP_THREAD_LIMIT", "unset"));  // CRAN sets to 2
95     Rprintf(_("  OMP_NUM_THREADS                %s\n"), mygetenv("OMP_NUM_THREADS", "unset"));
96     Rprintf(_("  RestoreAfterFork               %s\n"), RestoreAfterFork ? "true" : "false");
97     Rprintf(_("  data.table is using %d threads with throttle==%d. See ?setDTthreads.\n"), getDTthreads(INT_MAX, false), DTthrottle);
98   }
99   return ScalarInteger(getDTthreads(INT_MAX, false));
100 }
101 
setDTthreads(SEXP threads,SEXP restore_after_fork,SEXP percent,SEXP throttle)102 SEXP setDTthreads(SEXP threads, SEXP restore_after_fork, SEXP percent, SEXP throttle) {
103   if (!isNull(restore_after_fork)) {
104     if (!isLogical(restore_after_fork) || LOGICAL(restore_after_fork)[0]==NA_LOGICAL) {
105       error(_("restore_after_fork= must be TRUE, FALSE, or NULL (default). getDTthreads(verbose=TRUE) reports the current setting.\n"));
106     }
107     RestoreAfterFork = LOGICAL(restore_after_fork)[0];  // # nocov
108   }
109   if (length(throttle)) {
110     if (!isInteger(throttle) || LENGTH(throttle)!=1 || INTEGER(throttle)[0]<1)
111       error(_("'throttle' must be a single number, non-NA, and >=1"));
112     DTthrottle = INTEGER(throttle)[0];
113   }
114   int old = DTthreads;
115   if (!length(threads) && !length(throttle)) {
116     initDTthreads();
117     // Rerun exactly the same function used on startup (re-reads env variables); this is now default setDTthreads() behavior from 1.12.2
118     // Allows robust testing of environment variables using Sys.setenv() to experiment.
119     // Default  is now (as from 1.12.2) threads=NULL which re-reads environment variables.
120     // If a CPU has been unplugged (high end servers allow live hardware replacement) then omp_get_num_procs() will
121     // reflect that and a call to setDTthreads(threads=NULL) will update DTthreads.
122   } else if (length(threads)) {
123     int n=0;
124     if (length(threads)!=1 || !isInteger(threads) || (n=INTEGER(threads)[0]) < 0) {  // <0 catches NA too since NA is negative (INT_MIN)
125       error(_("threads= must be either NULL or a single number >= 0. See ?setDTthreads."));
126     }
127     int num_procs = imax(omp_get_num_procs(), 1); // max just in case omp_get_num_procs() returns <= 0 (perhaps error, or unsupported)
128     if (!isLogical(percent) || length(percent)!=1 || LOGICAL(percent)[0]==NA_LOGICAL) {
129       error(_("Internal error: percent= must be TRUE or FALSE at C level"));  // # nocov
130     }
131     if (LOGICAL(percent)[0]) {
132       if (n<2 || n>100) error(_("Internal error: threads==%d should be between 2 and 100 (percent=TRUE at C level)."), n);  // # nocov
133       n = num_procs*n/100;  // if 0 it will be reset to 1 in the imax() below
134     } else {
135       if (n==0 || n>num_procs) n = num_procs; // setDTthreads(0) == setDTthread(percent=100); i.e. use all logical CPUs (the default in 1.12.0 and before, from 1.12.2 it's 50%)
136     }
137     n = imin(n, omp_get_thread_limit());  // can't think why this might be different from its value on startup, but call it just in case
138     n = imin(n, getIntEnv("OMP_THREAD_LIMIT", INT_MAX));  // user might have called Sys.setenv(OMP_THREAD_LIMIT=) since startup and expect setDTthreads to respect it
139     DTthreads = imax(n, 1);  // imax just in case
140     // Do not call omp_set_num_threads() here. Any calls to omp_set_num_threads() affect other
141     // packages and R itself too which has some OpenMP usage. Instead we set our own DTthreads
142     // static variable and read that from getDTthreads(n, throttle).
143     // All parallel regions should include num_threads(getDTthreads(n, true|false)) and this is ensured via
144     // a grep in CRAN_Release.cmd.
145   }
146   return ScalarInteger(old);
147 }
148 
149 /*
150   Automatically drop down to 1 thread when called from parallel package (e.g. mclapply) to avoid
151   deadlock when data.table is used from within parallel::mclapply; #1745 and #1727.
152   GNU OpenMP seems ok with just setting DTthreads to 1 which limits the next parallel region
153   if data.table is used within the fork'd proceess. This is tested by test 1705.
154 
155   From v1.12.0 we're trying again to RestoreAferFork (#2285) with optional-off due to success
156   reported by Ken Run and Mark Klik in fst#110 and fst#112. We had tried that before but had
157   experienced problems likely on Intel's OpenMP only (Mac).
158 
159   DO NOT call omp_set_num_threads(1) inside when_fork()!! That causes a different crash/hang on MacOS
160   upon mclapply's fork even if data.table is merely loaded and neither used yet in the session nor by
161   what mclapply is calling. Even when passing on CRAN's MacOS all-OK. As discovered by several MacOS
162   and RStudio users here when 1.10.4-2 went to CRAN :
163      https://github.com/Rdatatable/data.table/issues/2418
164   v1.10.4-3 removed the call to omp_set_num_threads().
165   We do not know why calling (mere) omp_set_num_threads(1) in the when_fork() would cause a problem
166   on MacOS.
167 */
168 
169 static int pre_fork_DTthreads = 0;
170 
when_fork()171 void when_fork() {
172   pre_fork_DTthreads = DTthreads;
173   DTthreads = 1;
174 }
175 
after_fork()176 void after_fork() {
177   if (RestoreAfterFork) DTthreads = pre_fork_DTthreads;
178 }
179 
avoid_openmp_hang_within_fork()180 void avoid_openmp_hang_within_fork() {
181   // Called once on loading data.table from init.c
182 #ifdef _OPENMP
183   pthread_atfork(&when_fork, &after_fork, NULL);
184 #endif
185 }
186 
187