Skip to content

Commit

Permalink
throttle threads for iterated small data tasks (#4484)
Browse files Browse the repository at this point in the history
  • Loading branch information
jangorecki authored Jun 18, 2020
1 parent 12586af commit 89830e9
Show file tree
Hide file tree
Showing 20 changed files with 126 additions and 108 deletions.
2 changes: 2 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ unit = "s")

14. Added support for `round()` and `trunc()` to extend functionality of `ITime`. `round()` and `trunc()` can be used with argument units: "hours" or "minutes". Thanks to @JensPederM for the suggestion and PR.

15. A new throttle feature has been introduced to speed up small data tasks that are repeated in a loop, [#3175](https://github.com/Rdatatable/data.table/issues/3175) [#3438](https://github.com/Rdatatable/data.table/issues/3438) [#3205](https://github.com/Rdatatable/data.table/issues/3205) [#3735](https://github.com/Rdatatable/data.table/issues/3735) [#3739](https://github.com/Rdatatable/data.table/issues/3739) [#4284](https://github.com/Rdatatable/data.table/issues/4284) [#4527](https://github.com/Rdatatable/data.table/issues/4527) [#4294](https://github.com/Rdatatable/data.table/issues/4294) [#1120](https://github.com/Rdatatable/data.table/issues/1120). The default throttle of 1024 means that a single thread will be used when nrow<=1024, two threads when nrow<=2048, etc. To change the default, use `setDTthreads(throttle=)`. Or use the new environment variable `R_DATATABLE_THROTTLE`. If you use `Sys.setenv()` in a running R session to change this environment variable, be sure to run an empty `setDTthreads()` call afterwards for the change to take effect; see `?setDTthreads`. The word *throttle* is used to convey that the number of threads is restricted (throttled) for small data tasks. Reducing throttle to 1 will turn off throttling and should revert behaviour to past versions (i.e. using many threads even for small data). Increasing throttle to, say, 65536 will utilize multi-threading only for larger datasets. The value 1024 is a guess. We welcome feedback and test results indicating what the best default should be.

## BUG FIXES

1. A NULL timezone on POSIXct was interpreted by `as.IDate` and `as.ITime` as UTC rather than the session's default timezone (`tz=""`) , [#4085](https://github.com/Rdatatable/data.table/issues/4085).
Expand Down
6 changes: 3 additions & 3 deletions R/openmp-utils.R
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
setDTthreads = function(threads=NULL, restore_after_fork=NULL, percent=NULL) {
setDTthreads = function(threads=NULL, restore_after_fork=NULL, percent=NULL, throttle=NULL) {
if (!missing(percent)) {
if (!missing(threads)) stop("Provide either threads= or percent= but not both")
if (length(percent)!=1) stop("percent= is provided but is length ", length(percent))
percent=as.integer(percent)
if (is.na(percent) || percent<2L || percent>100L) stop("percent==",percent," but should be a number between 2 and 100")
invisible(.Call(CsetDTthreads, percent, restore_after_fork, TRUE))
invisible(.Call(CsetDTthreads, percent, restore_after_fork, TRUE, as.integer(throttle)))
} else {
invisible(.Call(CsetDTthreads, threads, restore_after_fork, FALSE))
invisible(.Call(CsetDTthreads, as.integer(threads), restore_after_fork, FALSE, as.integer(throttle)))
}
}

Expand Down
6 changes: 5 additions & 1 deletion inst/tests/tests.Rraw
Original file line number Diff line number Diff line change
Expand Up @@ -14188,7 +14188,7 @@ test(1996.2, d[, eval(qcall)], data.table(a=1L, b=3))
# setDTthreads; #3435
test(1997.01, setDTthreads(NULL, percent=75), error="Provide either threads= or percent= but not both")
test(1997.02, setDTthreads(1L, percent=75), error="Provide either threads= or percent= but not both")
test(1997.03, setDTthreads(-1L), error="must be either NULL or a single integer >= 0")
test(1997.03, setDTthreads(-1L), error="threads= must be either NULL or a single number >= 0")
test(1997.04, setDTthreads(percent=101), error="should be a number between 2 and 100")
test(1997.05, setDTthreads(percent=1), error="should be a number between 2 and 100")
test(1997.06, setDTthreads(percent=NULL), error="but is length 0")
Expand All @@ -14211,6 +14211,10 @@ test(1997.14, getDTthreads(), new)
Sys.setenv(R_DATATABLE_NUM_PROCS_PERCENT=oldenv)
test(1997.15, setDTthreads(old), new)
test(1997.16, getDTthreads(), old)
test(1997.17, setDTthreads(throttle=NA), error="throttle.*must be a single number, non-NA, and >=1")
setDTthreads(throttle=65536)
test(1997.18, getDTthreads(TRUE), output="throttle==65536")
setDTthreads(throttle=1024)

# test that a copy is being made and output is printed, #3385 after partial revert of #3281
x = 5L
Expand Down
3 changes: 2 additions & 1 deletion man/openmp-utils.Rd
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@
Set and get number of threads to be used in \code{data.table} functions that are parallelized with OpenMP. The number of threads is initialized when \code{data.table} is first loaded in the R session using optional envioronment variables. Thereafter, the number of threads may be changed by calling \code{setDTthreads}. If you change an environment variable using \code{Sys.setenv} you will need to call \code{setDTthreads} again to reread the environment variables.
}
\usage{
setDTthreads(threads = NULL, restore_after_fork = NULL, percent = NULL)
setDTthreads(threads = NULL, restore_after_fork = NULL, percent = NULL, throttle = NULL)
getDTthreads(verbose = getOption("datatable.verbose"))
}
\arguments{
\item{threads}{ NULL (default) rereads environment variables. 0 means to use all logical CPUs available. Otherwise a number >= 1 }
\item{restore_after_fork}{ Should data.table be multi-threaded after a fork has completed? NULL leaves the current setting unchanged which by default is TRUE. See details below. }
\item{percent}{ If provided it should be a number between 2 and 100; the percentage of logical CPUs to use. By default on startup, 50\%. }
\item{throttle}{ 1024 (default) means that, roughly speaking, a single thread will be used when nrow(DT)<=1024, 2 threads when nrow(DT)<=2048, etc. The throttle is to speed up small data tasks (especially when repeated many times) by not incurring the overhead of managing multiple threads. Hence the number of threads is throttled (restricted) for small tasks. }
\item{verbose}{ Display the value of relevant OpenMP settings plus the \code{restore_after_fork} internal option. }
}
\value{
Expand Down
16 changes: 8 additions & 8 deletions src/between.c
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,14 @@ SEXP between(SEXP x, SEXP lower, SEXP upper, SEXP incbounds, SEXP NAboundsArg, S
error(_("Item %d of lower (%d) is greater than item %d of upper (%d)"), (i&lowMask)+1, l, (i&uppMask)+1, u);
}
if (NAbounds) { // default NAbounds==TRUE => NA bound means TRUE; i.e. asif lower=-Inf or upper==Inf)
#pragma omp parallel for num_threads(getDTthreads())
#pragma omp parallel for num_threads(getDTthreads(longest, true))
for (int i=0; i<longest; ++i) {
const int elem=xp[i & xMask], l=lp[i & lowMask], u=up[i & uppMask];
ansp[i] = elem==NA_INTEGER ? NA_LOGICAL : (l==NA_INTEGER || l+open<=elem) && (u==NA_INTEGER || elem<=u-open);
// +open so we can always use >= and <=. NA_INTEGER+1 == -INT_MAX == INT_MIN+1 (so NA limit handled by this too)
}
} else {
#pragma omp parallel for num_threads(getDTthreads())
#pragma omp parallel for num_threads(getDTthreads(longest, true))
for (int i=0; i<longest; ++i) {
const int elem=xp[i & xMask], l=lp[i & lowMask], u=up[i & uppMask];
if (elem==NA_INTEGER) { ansp[i]=NA_LOGICAL; continue; }
Expand All @@ -95,13 +95,13 @@ SEXP between(SEXP x, SEXP lower, SEXP upper, SEXP incbounds, SEXP NAboundsArg, S
error(_("Item %d of lower (%"PRId64") is greater than item %d of upper (%"PRId64")"), (i&lowMask)+1, l, (i&uppMask)+1, u);
}
if (NAbounds) {
#pragma omp parallel for num_threads(getDTthreads())
#pragma omp parallel for num_threads(getDTthreads(longest, true))
for (int i=0; i<longest; ++i) {
const int64_t elem=xp[i & xMask], l=lp[i & lowMask], u=up[i & uppMask];
ansp[i] = elem==NA_INTEGER64 ? NA_LOGICAL : (l==NA_INTEGER64 || l+open<=elem) && (u==NA_INTEGER64 || elem<=u-open);
}
} else {
#pragma omp parallel for num_threads(getDTthreads())
#pragma omp parallel for num_threads(getDTthreads(longest, true))
for (int i=0; i<longest; ++i) {
const int64_t elem=xp[i & xMask], l=lp[i & lowMask], u=up[i & uppMask];
if (elem==NA_INTEGER64) { ansp[i]=NA_LOGICAL; continue; }
Expand All @@ -123,13 +123,13 @@ SEXP between(SEXP x, SEXP lower, SEXP upper, SEXP incbounds, SEXP NAboundsArg, S
}
if (open) {
if (NAbounds) {
#pragma omp parallel for num_threads(getDTthreads())
#pragma omp parallel for num_threads(getDTthreads(longest, true))
for (int i=0; i<longest; ++i) {
const double elem=xp[i & xMask], l=lp[i & lowMask], u=up[i & uppMask];
ansp[i] = isnan(elem) ? NA_LOGICAL : (isnan(l) || l<elem) && (isnan(u) || elem<u);
}
} else {
#pragma omp parallel for num_threads(getDTthreads())
#pragma omp parallel for num_threads(getDTthreads(longest, true))
for (int i=0; i<longest; ++i) {
const double elem=xp[i & xMask], l=lp[i & lowMask], u=up[i & uppMask];
if (isnan(elem)) { ansp[i]=NA_LOGICAL; continue; }
Expand All @@ -140,13 +140,13 @@ SEXP between(SEXP x, SEXP lower, SEXP upper, SEXP incbounds, SEXP NAboundsArg, S
if (verbose) Rprintf(_("between parallel processing of double with open bounds took %8.3fs\n"), omp_get_wtime()-tic);
} else {
if (NAbounds) {
#pragma omp parallel for num_threads(getDTthreads())
#pragma omp parallel for num_threads(getDTthreads(longest, true))
for (int i=0; i<longest; ++i) {
const double elem=xp[i & xMask], l=lp[i & lowMask], u=up[i & uppMask];
ansp[i] = isnan(elem) ? NA_LOGICAL : (isnan(l) || l<=elem) && (isnan(u) || elem<=u);
}
} else {
#pragma omp parallel for num_threads(getDTthreads())
#pragma omp parallel for num_threads(getDTthreads(longest, true))
for (int i=0; i<longest; ++i) {
const double elem=xp[i & xMask], l=lp[i & lowMask], u=up[i & uppMask];
if (isnan(elem)) { ansp[i]=NA_LOGICAL; continue; }
Expand Down
12 changes: 6 additions & 6 deletions src/cj.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,43 +20,43 @@ SEXP cj(SEXP base_list) {
case INTSXP: {
const int *restrict sourceP = INTEGER(source);
int *restrict targetP = INTEGER(target);
#pragma omp parallel for num_threads(getDTthreads())
#pragma omp parallel for num_threads(getDTthreads(thislen*eachrep, true))
// default static schedule so two threads won't write to same cache line in last column
// if they did write to same cache line (and will when last column's thislen is small) there's no correctness issue
for (int i=0; i<thislen; ++i) {
const int item = sourceP[i];
const int end = (i+1)*eachrep;
for (int j=i*eachrep; j<end; ++j) targetP[j] = item; // no div, mod or read ops inside loop; just rep a const contiguous write
}
#pragma omp parallel for num_threads(getDTthreads())
#pragma omp parallel for num_threads(getDTthreads(ncopy*blocklen, true))
for (int i=1; i<ncopy; ++i) {
memcpy(targetP + i*blocklen, targetP, blocklen*sizeof(int));
}
} break;
case REALSXP: {
const double *restrict sourceP = REAL(source);
double *restrict targetP = REAL(target);
#pragma omp parallel for num_threads(getDTthreads())
#pragma omp parallel for num_threads(getDTthreads(thislen*eachrep, true))
for (int i=0; i<thislen; ++i) {
const double item = sourceP[i];
const int end=(i+1)*eachrep;
for (int j=i*eachrep; j<end; ++j) targetP[j] = item;
}
#pragma omp parallel for num_threads(getDTthreads())
#pragma omp parallel for num_threads(getDTthreads(ncopy*blocklen, true))
for (int i=1; i<ncopy; ++i) {
memcpy(targetP + i*blocklen, targetP, blocklen*sizeof(double));
}
} break;
case CPLXSXP: {
const Rcomplex *restrict sourceP = COMPLEX(source);
Rcomplex *restrict targetP = COMPLEX(target);
#pragma omp parallel for num_threads(getDTthreads())
#pragma omp parallel for num_threads(getDTthreads(thislen*eachrep, true))
for (int i=0; i<thislen; ++i) {
const Rcomplex item = sourceP[i];
const int end=(i+1)*eachrep;
for (int j=i*eachrep; j<end; ++j) targetP[j] = item;
}
#pragma omp parallel for num_threads(getDTthreads())
#pragma omp parallel for num_threads(getDTthreads(ncopy*blocklen, true))
for (int i=1; i<ncopy; ++i) {
memcpy(targetP + i*blocklen, targetP, blocklen*sizeof(Rcomplex));
}
Expand Down
8 changes: 4 additions & 4 deletions src/coalesce.c
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ SEXP coalesce(SEXP x, SEXP inplaceArg) {
valP[k++] = INTEGER(item);
}
const bool final=(finalVal!=NA_INTEGER);
#pragma omp parallel for num_threads(getDTthreads())
#pragma omp parallel for num_threads(getDTthreads(nrow, true))
for (int i=0; i<nrow; ++i) {
int val = xP[i];
if (val!=NA_INTEGER) continue;
Expand All @@ -88,7 +88,7 @@ SEXP coalesce(SEXP x, SEXP inplaceArg) {
valP[k++] = REAL(item);
}
const bool final = (finalVal!=NA_INTEGER64);
#pragma omp parallel for num_threads(getDTthreads())
#pragma omp parallel for num_threads(getDTthreads(nrow, true))
for (int i=0; i<nrow; ++i) {
int64_t val=xP[i];
if (val!=NA_INTEGER64) continue;
Expand All @@ -109,7 +109,7 @@ SEXP coalesce(SEXP x, SEXP inplaceArg) {
valP[k++] = REAL(item);
}
const bool final = !ISNAN(finalVal);
#pragma omp parallel for num_threads(getDTthreads())
#pragma omp parallel for num_threads(getDTthreads(nrow, true))
for (int i=0; i<nrow; ++i) {
double val=xP[i];
if (!ISNAN(val)) continue;
Expand All @@ -132,7 +132,7 @@ SEXP coalesce(SEXP x, SEXP inplaceArg) {
valP[k++] = COMPLEX(item);
}
const bool final = !ISNAN(finalVal.r) && !ISNAN(finalVal.i);
#pragma omp parallel for num_threads(getDTthreads())
#pragma omp parallel for num_threads(getDTthreads(nrow, true))
for (int i=0; i<nrow; ++i) {
Rcomplex val=xP[i];
if (!ISNAN(val.r) && !ISNAN(val.i)) continue;
Expand Down
2 changes: 1 addition & 1 deletion src/data.table.h
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ double wallclock();

// openmp-utils.c
void initDTthreads();
int getDTthreads();
int getDTthreads(const int64_t n, const bool throttle);
void avoid_openmp_hang_within_fork();

// froll.c
Expand Down
8 changes: 4 additions & 4 deletions src/fifelse.c
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ SEXP fifelseR(SEXP l, SEXP a, SEXP b, SEXP na) {
const int *restrict pa = LOGICAL(a);
const int *restrict pb = LOGICAL(b);
const int pna = nonna ? LOGICAL(na)[0] : NA_LOGICAL;
#pragma omp parallel for num_threads(getDTthreads())
#pragma omp parallel for num_threads(getDTthreads(len0, true))
for (int64_t i=0; i<len0; ++i) {
pans[i] = pl[i]==0 ? pb[i & bmask] : (pl[i]==1 ? pa[i & amask] : pna);
}
Expand All @@ -87,7 +87,7 @@ SEXP fifelseR(SEXP l, SEXP a, SEXP b, SEXP na) {
const int *restrict pa = INTEGER(a);
const int *restrict pb = INTEGER(b);
const int pna = nonna ? INTEGER(na)[0] : NA_INTEGER;
#pragma omp parallel for num_threads(getDTthreads())
#pragma omp parallel for num_threads(getDTthreads(len0, true))
for (int64_t i=0; i<len0; ++i) {
pans[i] = pl[i]==0 ? pb[i & bmask] : (pl[i]==1 ? pa[i & amask] : pna);
}
Expand All @@ -98,7 +98,7 @@ SEXP fifelseR(SEXP l, SEXP a, SEXP b, SEXP na) {
const double *restrict pb = REAL(b);
const double na_double = Rinherits(a, char_integer64) ? NA_INT64_D : NA_REAL; // Rinherits() is true for nanotime
const double pna = nonna ? REAL(na)[0] : na_double;
#pragma omp parallel for num_threads(getDTthreads())
#pragma omp parallel for num_threads(getDTthreads(len0, true))
for (int64_t i=0; i<len0; ++i) {
pans[i] = pl[i]==0 ? pb[i & bmask] : (pl[i]==1 ? pa[i & amask] : pna);
}
Expand All @@ -116,7 +116,7 @@ SEXP fifelseR(SEXP l, SEXP a, SEXP b, SEXP na) {
const Rcomplex *restrict pa = COMPLEX(a);
const Rcomplex *restrict pb = COMPLEX(b);
const Rcomplex pna = nonna ? COMPLEX(na)[0] : NA_CPLX;
#pragma omp parallel for num_threads(getDTthreads())
#pragma omp parallel for num_threads(getDTthreads(len0, true))
for (int64_t i=0; i<len0; ++i) {
pans[i] = pl[i]==0 ? pb[i & bmask] : (pl[i]==1 ? pa[i & amask] : pna);
}
Expand Down
Loading

0 comments on commit 89830e9

Please sign in to comment.