From 89830e97b1582c1f4603b6b8f0260e0945876a27 Mon Sep 17 00:00:00 2001 From: Jan Gorecki Date: Thu, 18 Jun 2020 09:56:31 +0100 Subject: [PATCH] throttle threads for iterated small data tasks (#4484) --- NEWS.md | 2 ++ R/openmp-utils.R | 6 +++--- inst/tests/tests.Rraw | 6 +++++- man/openmp-utils.Rd | 3 ++- src/between.c | 16 +++++++-------- src/cj.c | 12 +++++------ src/coalesce.c | 8 ++++---- src/data.table.h | 2 +- src/fifelse.c | 8 ++++---- src/forder.c | 35 ++++++++++++++++--------------- src/froll.c | 8 ++++---- src/frollR.c | 2 +- src/frolladaptive.c | 16 +++++++-------- src/fsort.c | 6 +++--- src/gsumm.c | 40 ++++++++++++++++++------------------ src/nafill.c | 2 +- src/openmp-utils.c | 48 ++++++++++++++++++++++++++----------------- src/reorder.c | 6 +++--- src/subset.c | 6 +++--- src/types.c | 2 +- 20 files changed, 126 insertions(+), 108 deletions(-) diff --git a/NEWS.md b/NEWS.md index 0ba902c87..98484687c 100644 --- a/NEWS.md +++ b/NEWS.md @@ -81,6 +81,8 @@ unit = "s") 14. Added support for `round()` and `trunc()` to extend functionality of `ITime`. `round()` and `trunc()` can be used with argument units: "hours" or "minutes". Thanks to @JensPederM for the suggestion and PR. +15. A new throttle feature has been introduced to speed up small data tasks that are repeated in a loop, [#3175](https://github.com/Rdatatable/data.table/issues/3175) [#3438](https://github.com/Rdatatable/data.table/issues/3438) [#3205](https://github.com/Rdatatable/data.table/issues/3205) [#3735](https://github.com/Rdatatable/data.table/issues/3735) [#3739](https://github.com/Rdatatable/data.table/issues/3739) [#4284](https://github.com/Rdatatable/data.table/issues/4284) [#4527](https://github.com/Rdatatable/data.table/issues/4527) [#4294](https://github.com/Rdatatable/data.table/issues/4294) [#1120](https://github.com/Rdatatable/data.table/issues/1120). The default throttle of 1024 means that a single thread will be used when nrow<=1024, two threads when nrow<=2048, etc. To change the default, use `setDTthreads(throttle=)`. Or use the new environment variable `R_DATATABLE_THROTTLE`. If you use `Sys.setenv()` in a running R session to change this environment variable, be sure to run an empty `setDTthreads()` call afterwards for the change to take effect; see `?setDTthreads`. The word *throttle* is used to convey that the number of threads is restricted (throttled) for small data tasks. Reducing throttle to 1 will turn off throttling and should revert behaviour to past versions (i.e. using many threads even for small data). Increasing throttle to, say, 65536 will utilize multi-threading only for larger datasets. The value 1024 is a guess. We welcome feedback and test results indicating what the best default should be. + ## BUG FIXES 1. A NULL timezone on POSIXct was interpreted by `as.IDate` and `as.ITime` as UTC rather than the session's default timezone (`tz=""`) , [#4085](https://github.com/Rdatatable/data.table/issues/4085). diff --git a/R/openmp-utils.R b/R/openmp-utils.R index 5e11222c5..9df55f114 100644 --- a/R/openmp-utils.R +++ b/R/openmp-utils.R @@ -1,12 +1,12 @@ -setDTthreads = function(threads=NULL, restore_after_fork=NULL, percent=NULL) { +setDTthreads = function(threads=NULL, restore_after_fork=NULL, percent=NULL, throttle=NULL) { if (!missing(percent)) { if (!missing(threads)) stop("Provide either threads= or percent= but not both") if (length(percent)!=1) stop("percent= is provided but is length ", length(percent)) percent=as.integer(percent) if (is.na(percent) || percent<2L || percent>100L) stop("percent==",percent," but should be a number between 2 and 100") - invisible(.Call(CsetDTthreads, percent, restore_after_fork, TRUE)) + invisible(.Call(CsetDTthreads, percent, restore_after_fork, TRUE, as.integer(throttle))) } else { - invisible(.Call(CsetDTthreads, threads, restore_after_fork, FALSE)) + invisible(.Call(CsetDTthreads, as.integer(threads), restore_after_fork, FALSE, as.integer(throttle))) } } diff --git a/inst/tests/tests.Rraw b/inst/tests/tests.Rraw index 1def357eb..12790ed92 100644 --- a/inst/tests/tests.Rraw +++ b/inst/tests/tests.Rraw @@ -14188,7 +14188,7 @@ test(1996.2, d[, eval(qcall)], data.table(a=1L, b=3)) # setDTthreads; #3435 test(1997.01, setDTthreads(NULL, percent=75), error="Provide either threads= or percent= but not both") test(1997.02, setDTthreads(1L, percent=75), error="Provide either threads= or percent= but not both") -test(1997.03, setDTthreads(-1L), error="must be either NULL or a single integer >= 0") +test(1997.03, setDTthreads(-1L), error="threads= must be either NULL or a single number >= 0") test(1997.04, setDTthreads(percent=101), error="should be a number between 2 and 100") test(1997.05, setDTthreads(percent=1), error="should be a number between 2 and 100") test(1997.06, setDTthreads(percent=NULL), error="but is length 0") @@ -14211,6 +14211,10 @@ test(1997.14, getDTthreads(), new) Sys.setenv(R_DATATABLE_NUM_PROCS_PERCENT=oldenv) test(1997.15, setDTthreads(old), new) test(1997.16, getDTthreads(), old) +test(1997.17, setDTthreads(throttle=NA), error="throttle.*must be a single number, non-NA, and >=1") +setDTthreads(throttle=65536) +test(1997.18, getDTthreads(TRUE), output="throttle==65536") +setDTthreads(throttle=1024) # test that a copy is being made and output is printed, #3385 after partial revert of #3281 x = 5L diff --git a/man/openmp-utils.Rd b/man/openmp-utils.Rd index 8bb6dccc2..b8d014976 100644 --- a/man/openmp-utils.Rd +++ b/man/openmp-utils.Rd @@ -8,13 +8,14 @@ Set and get number of threads to be used in \code{data.table} functions that are parallelized with OpenMP. The number of threads is initialized when \code{data.table} is first loaded in the R session using optional envioronment variables. Thereafter, the number of threads may be changed by calling \code{setDTthreads}. If you change an environment variable using \code{Sys.setenv} you will need to call \code{setDTthreads} again to reread the environment variables. } \usage{ - setDTthreads(threads = NULL, restore_after_fork = NULL, percent = NULL) + setDTthreads(threads = NULL, restore_after_fork = NULL, percent = NULL, throttle = NULL) getDTthreads(verbose = getOption("datatable.verbose")) } \arguments{ \item{threads}{ NULL (default) rereads environment variables. 0 means to use all logical CPUs available. Otherwise a number >= 1 } \item{restore_after_fork}{ Should data.table be multi-threaded after a fork has completed? NULL leaves the current setting unchanged which by default is TRUE. See details below. } \item{percent}{ If provided it should be a number between 2 and 100; the percentage of logical CPUs to use. By default on startup, 50\%. } + \item{throttle}{ 1024 (default) means that, roughly speaking, a single thread will be used when nrow(DT)<=1024, 2 threads when nrow(DT)<=2048, etc. The throttle is to speed up small data tasks (especially when repeated many times) by not incurring the overhead of managing multiple threads. Hence the number of threads is throttled (restricted) for small tasks. } \item{verbose}{ Display the value of relevant OpenMP settings plus the \code{restore_after_fork} internal option. } } \value{ diff --git a/src/between.c b/src/between.c index b4444d968..c5d91b30c 100644 --- a/src/between.c +++ b/src/between.c @@ -64,14 +64,14 @@ SEXP between(SEXP x, SEXP lower, SEXP upper, SEXP incbounds, SEXP NAboundsArg, S error(_("Item %d of lower (%d) is greater than item %d of upper (%d)"), (i&lowMask)+1, l, (i&uppMask)+1, u); } if (NAbounds) { // default NAbounds==TRUE => NA bound means TRUE; i.e. asif lower=-Inf or upper==Inf) - #pragma omp parallel for num_threads(getDTthreads()) + #pragma omp parallel for num_threads(getDTthreads(longest, true)) for (int i=0; i= and <=. NA_INTEGER+1 == -INT_MAX == INT_MIN+1 (so NA limit handled by this too) } } else { - #pragma omp parallel for num_threads(getDTthreads()) + #pragma omp parallel for num_threads(getDTthreads(longest, true)) for (int i=0; i0; // flag to re-run with NA support if NAs detected if (!truehasna || !narm) { - #pragma omp parallel for num_threads(getDTthreads()) + #pragma omp parallel for num_threads(getDTthreads(nx, true)) for (uint64_t i=k-1; i0; if (!truehasna || !narm) { - #pragma omp parallel for num_threads(getDTthreads()) + #pragma omp parallel for num_threads(getDTthreads(nx, true)) for (uint64_t i=k-1; i1) schedule(auto) collapse(2) num_threads(getDTthreads()) + #pragma omp parallel for if (ialgo==0) schedule(dynamic) collapse(2) num_threads(getDTthreads(nx*nk, false)) for (R_len_t i=0; idbl_v[i] = cs[i]/k[i]; // current obs window width exactly same as obs position in a vector @@ -82,7 +82,7 @@ void fadaptiverollmeanFast(double *x, uint64_t nx, ans_t *ans, int *k, double fi cs[i] = (double) w; // cumsum, na.rm=TRUE always, NAs handled using cum NA counter cn[i] = nc; // cum NA counter } - #pragma omp parallel for num_threads(getDTthreads()) + #pragma omp parallel for num_threads(getDTthreads(nx, true)) for (uint64_t i=0; idbl_v[i] = fill; @@ -114,7 +114,7 @@ void fadaptiverollmeanExact(double *x, uint64_t nx, ans_t *ans, int *k, double f snprintf(end(ans->message[0]), 500, _("%s: running in parallel for input length %"PRIu64", hasna %d, narm %d\n"), "fadaptiverollmeanExact", (uint64_t)nx, hasna, (int) narm); bool truehasna = hasna>0; // flag to re-run if NAs detected if (!truehasna || !narm) { // narm=FALSE handled here as NAs properly propagated in exact algo - #pragma omp parallel for num_threads(getDTthreads()) + #pragma omp parallel for num_threads(getDTthreads(nx, true)) for (uint64_t i=0; idbl_v[i] = fill; // partial window @@ -231,7 +231,7 @@ void fadaptiverollsumFast(double *x, uint64_t nx, ans_t *ans, int *k, double fil cs[i] = (double) w; } if (R_FINITE((double) w)) { - #pragma omp parallel for num_threads(getDTthreads()) + #pragma omp parallel for num_threads(getDTthreads(nx, true)) for (uint64_t i=0; idbl_v[i] = cs[i]; @@ -271,7 +271,7 @@ void fadaptiverollsumFast(double *x, uint64_t nx, ans_t *ans, int *k, double fil cs[i] = (double) w; cn[i] = nc; } - #pragma omp parallel for num_threads(getDTthreads()) + #pragma omp parallel for num_threads(getDTthreads(nx, true)) for (uint64_t i=0; idbl_v[i] = fill; @@ -298,7 +298,7 @@ void fadaptiverollsumExact(double *x, uint64_t nx, ans_t *ans, int *k, double fi snprintf(end(ans->message[0]), 500, _("%s: running in parallel for input length %"PRIu64", hasna %d, narm %d\n"), "fadaptiverollsumExact", (uint64_t)nx, hasna, (int) narm); bool truehasna = hasna>0; if (!truehasna || !narm) { - #pragma omp parallel for num_threads(getDTthreads()) + #pragma omp parallel for num_threads(getDTthreads(nx, true)) for (uint64_t i=0; idbl_v[i] = fill; diff --git a/src/fsort.c b/src/fsort.c index d3c695eac..00c7e5c10 100644 --- a/src/fsort.c +++ b/src/fsort.c @@ -117,7 +117,7 @@ SEXP fsort(SEXP x, SEXP verboseArg) { // allocate early in case fails if not enough RAM // TODO: document this is much cheaper than a copy followed by in-place. - int nth = getDTthreads(); + int nth = getDTthreads(xlength(x), true); int nBatch=nth*2; // at least nth; more to reduce last-man-home; but not too large to keep counts small in cache if (verbose) Rprintf(_("nth=%d, nBatch=%d\n"),nth,nBatch); @@ -131,7 +131,7 @@ SEXP fsort(SEXP x, SEXP verboseArg) { t[1] = wallclock(); double mins[nBatch], maxs[nBatch]; const double *restrict xp = REAL(x); - #pragma omp parallel for schedule(dynamic) num_threads(nth) + #pragma omp parallel for schedule(dynamic) num_threads(getDTthreads(nBatch, false)) for (int batch=0; batch1) num_threads(getDTthreads()) + #pragma omp parallel for schedule(dynamic) num_threads(getDTthreads(nx, false)) for (R_len_t i=0; i // errno #include // isspace -static int DTthreads = -1; // Never read directly hence static; use getDTthreads(). -1 so we know for sure initDTthreads() ran and set it >= 1. +static int DTthreads = -1; // Never read directly hence static; use getDTthreads(n, /*throttle=*/0|1). -1 so we know for sure initDTthreads() ran and set it >= 1. +static int DTthrottle = -1; // Thread 1 is assigned DTthrottle iterations before a 2nd thread is utilized; #4484. static bool RestoreAfterFork = true; // see #2885 in v1.12.0 static int getIntEnv(const char *name, int def) @@ -50,12 +51,19 @@ void initDTthreads() { ans = imin(ans, getIntEnv("OMP_THREAD_LIMIT", INT_MAX)); // user might expect `Sys.setenv(OMP_THREAD_LIMIT=2);setDTthreads()` to work. Satisfy this ans = imin(ans, getIntEnv("OMP_NUM_THREADS", INT_MAX)); // expectation by reading them again now. OpenMP just reads them on startup (quite reasonably) DTthreads = ans; + DTthrottle = imax(1, getIntEnv("R_DATATABLE_THROTTLE", 1024)); // 2nd thread is used only when n>1024, 3rd thread when n>2048, etc } -int getDTthreads() { - // this is the main getter used by all parallel regions; they specify num_threads(getDTthreads()) - // Therefore keep it light, simple and robust. Local static variable. initDTthreads() ensures 1 <= DTthreads <= omp_get_num_proc() - return DTthreads; +int getDTthreads(const int64_t n, const bool throttle) { + // this is the main getter used by all parallel regions; they specify num_threads(n, true|false). + // Keep this light, simple and robust. initDTthreads() ensures 1 <= DTthreads <= omp_get_num_proc() + // throttle introduced in 1.12.10 (see NEWS item); #4484 + // throttle==true : a number of iterations per thread (DTthrottle) is applied before a second thread is utilized + // throttle==false : parallel region is already pre-chunked such as in fread; e.g. two batches intended for two threads + if (n<1) return 1; // 0 or negative could be deliberate in calling code for edge cases where loop is not intended to run at all + int64_t ans = throttle ? 1+(n-1)/DTthrottle : // 1 thread for n<=1024, 2 thread for n<=2048, etc + n; // don't use 20 threads for just one or two batches + return ans>=DTthreads ? DTthreads : (int)ans; // apply limit in static local DTthreads saved there by initDTthreads() and setDTthreads() } static const char *mygetenv(const char *name, const char *unset) { @@ -75,40 +83,42 @@ SEXP getDTthreads_R(SEXP verbose) { Rprintf(_(" omp_get_num_procs() %d\n"), omp_get_num_procs()); Rprintf(_(" R_DATATABLE_NUM_PROCS_PERCENT %s\n"), mygetenv("R_DATATABLE_NUM_PROCS_PERCENT", "unset (default 50)")); Rprintf(_(" R_DATATABLE_NUM_THREADS %s\n"), mygetenv("R_DATATABLE_NUM_THREADS", "unset")); + Rprintf(_(" R_DATATABLE_THROTTLE %s\n"), mygetenv("R_DATATABLE_THROTTLE", "unset (default 1024)")); Rprintf(_(" omp_get_thread_limit() %d\n"), omp_get_thread_limit()); Rprintf(_(" omp_get_max_threads() %d\n"), omp_get_max_threads()); Rprintf(_(" OMP_THREAD_LIMIT %s\n"), mygetenv("OMP_THREAD_LIMIT", "unset")); // CRAN sets to 2 Rprintf(_(" OMP_NUM_THREADS %s\n"), mygetenv("OMP_NUM_THREADS", "unset")); Rprintf(_(" RestoreAfterFork %s\n"), RestoreAfterFork ? "true" : "false"); - Rprintf(_(" data.table is using %d threads. See ?setDTthreads.\n"), getDTthreads()); + Rprintf(_(" data.table is using %d threads with throttle==%d. See ?setDTthreads.\n"), getDTthreads(INT_MAX, false), DTthrottle); } - return ScalarInteger(getDTthreads()); + return ScalarInteger(getDTthreads(INT_MAX, false)); } -SEXP setDTthreads(SEXP threads, SEXP restore_after_fork, SEXP percent) { +SEXP setDTthreads(SEXP threads, SEXP restore_after_fork, SEXP percent, SEXP throttle) { if (!isNull(restore_after_fork)) { if (!isLogical(restore_after_fork) || LOGICAL(restore_after_fork)[0]==NA_LOGICAL) { error(_("restore_after_fork= must be TRUE, FALSE, or NULL (default). getDTthreads(verbose=TRUE) reports the current setting.\n")); } RestoreAfterFork = LOGICAL(restore_after_fork)[0]; // # nocov } + if (length(throttle)) { + if (!isInteger(throttle) || LENGTH(throttle)!=1 || INTEGER(throttle)[0]<1) + error(_("'throttle' must be a single number, non-NA, and >=1")); + DTthrottle = INTEGER(throttle)[0]; + } int old = DTthreads; - if (isNull(threads)) { + if (!length(threads) && !length(throttle)) { initDTthreads(); // Rerun exactly the same function used on startup (re-reads env variables); this is now default setDTthreads() behavior from 1.12.2 // Allows robust testing of environment variables using Sys.setenv() to experiment. // Default is now (as from 1.12.2) threads=NULL which re-reads environment variables. // If a CPU has been unplugged (high end servers allow live hardware replacement) then omp_get_num_procs() will // reflect that and a call to setDTthreads(threads=NULL) will update DTthreads. - } else { - int n=0, protecti=0; - if (length(threads)!=1) error(_("threads= must be either NULL (default) or a single number. It has length %d"), length(threads)); - if (isReal(threads)) { threads = PROTECT(coerceVector(threads, INTSXP)); protecti++; } - if (!isInteger(threads)) error(_("threads= must be either NULL (default) or type integer/numeric")); - if ((n=INTEGER(threads)[0]) < 0) { // <0 catches NA too since NA is negative (INT_MIN) - error(_("threads= must be either NULL or a single integer >= 0. See ?setDTthreads.")); + } else if (length(threads)) { + int n=0; + if (length(threads)!=1 || !isInteger(threads) || (n=INTEGER(threads)[0]) < 0) { // <0 catches NA too since NA is negative (INT_MIN) + error(_("threads= must be either NULL or a single number >= 0. See ?setDTthreads.")); } - UNPROTECT(protecti); int num_procs = imax(omp_get_num_procs(), 1); // max just in case omp_get_num_procs() returns <= 0 (perhaps error, or unsupported) if (!isLogical(percent) || length(percent)!=1 || LOGICAL(percent)[0]==NA_LOGICAL) { error(_("Internal error: percent= must be TRUE or FALSE at C level")); // # nocov @@ -124,8 +134,8 @@ SEXP setDTthreads(SEXP threads, SEXP restore_after_fork, SEXP percent) { DTthreads = imax(n, 1); // imax just in case // Do not call omp_set_num_threads() here. Any calls to omp_set_num_threads() affect other // packages and R itself too which has some OpenMP usage. Instead we set our own DTthreads - // static variable and read that from getDTthreads(). - // All parallel regions should include num_threads(getDTthreads()) and this is ensured via + // static variable and read that from getDTthreads(n, throttle). + // All parallel regions should include num_threads(getDTthreads(n, true|false)) and this is ensured via // a grep in CRAN_Release.cmd. } return ScalarInteger(old); diff --git a/src/reorder.c b/src/reorder.c index da3784e94..c2deea8ae 100644 --- a/src/reorder.c +++ b/src/reorder.c @@ -64,7 +64,7 @@ SEXP reorder(SEXP x, SEXP order) if (size==4) { const int *restrict vd = DATAPTR_RO(v); int *restrict tmp = (int *)TMP; - #pragma omp parallel for num_threads(getDTthreads()) + #pragma omp parallel for num_threads(getDTthreads(end, true)) for (int i=start; i<=end; ++i) { tmp[i-start] = vd[idx[i]-1]; // copies 4 bytes; e.g. INTSXP and also SEXP pointers on 32bit (STRSXP and VECSXP) } @@ -75,14 +75,14 @@ SEXP reorder(SEXP x, SEXP order) } else if (size==8) { const double *restrict vd = DATAPTR_RO(v); double *restrict tmp = (double *)TMP; - #pragma omp parallel for num_threads(getDTthreads()) + #pragma omp parallel for num_threads(getDTthreads(end, true)) for (int i=start; i<=end; ++i) { tmp[i-start] = vd[idx[i]-1]; // copies 8 bytes; e.g. REALSXP and also SEXP pointers on 64bit (STRSXP and VECSXP) } } else { // size 16; checked up front const Rcomplex *restrict vd = DATAPTR_RO(v); Rcomplex *restrict tmp = (Rcomplex *)TMP; - #pragma omp parallel for num_threads(getDTthreads()) + #pragma omp parallel for num_threads(getDTthreads(end, true)) for (int i=start; i<=end; ++i) { tmp[i-start] = vd[idx[i]-1]; } diff --git a/src/subset.c b/src/subset.c index d9fea2800..91a4018e2 100644 --- a/src/subset.c +++ b/src/subset.c @@ -13,13 +13,13 @@ void subsetVectorRaw(SEXP ans, SEXP source, SEXP idx, const bool anyNA) #define PARLOOP(_NAVAL_) \ if (anyNA) { \ - _Pragma("omp parallel for num_threads(getDTthreads())") \ + _Pragma("omp parallel for num_threads(getDTthreads(n, true))") \ for (int i=0; i1) schedule(auto) collapse(2) num_threads(getDTthreads()) + #pragma omp parallel for schedule(dynamic) collapse(2) num_threads(getDTthreads(nx*nk, false)) for (R_len_t i=0; i