Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

throttle threads for iterated small data tasks #4484

Merged
merged 10 commits into from
Jun 18, 2020
6 changes: 3 additions & 3 deletions R/openmp-utils.R
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
setDTthreads = function(threads=NULL, restore_after_fork=NULL, percent=NULL) {
setDTthreads = function(threads=NULL, restore_after_fork=NULL, percent=NULL, th_iters=NULL) {
jangorecki marked this conversation as resolved.
Show resolved Hide resolved
if (!missing(percent)) {
if (!missing(threads)) stop("Provide either threads= or percent= but not both")
if (length(percent)!=1) stop("percent= is provided but is length ", length(percent))
percent=as.integer(percent)
if (is.na(percent) || percent<2L || percent>100L) stop("percent==",percent," but should be a number between 2 and 100")
invisible(.Call(CsetDTthreads, percent, restore_after_fork, TRUE))
invisible(.Call(CsetDTthreads, percent, restore_after_fork, TRUE, th_iters))
} else {
invisible(.Call(CsetDTthreads, threads, restore_after_fork, FALSE))
invisible(.Call(CsetDTthreads, threads, restore_after_fork, FALSE, th_iters))
}
}

Expand Down
3 changes: 2 additions & 1 deletion man/openmp-utils.Rd
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@
Set and get number of threads to be used in \code{data.table} functions that are parallelized with OpenMP. The number of threads is initialized when \code{data.table} is first loaded in the R session using optional envioronment variables. Thereafter, the number of threads may be changed by calling \code{setDTthreads}. If you change an environment variable using \code{Sys.setenv} you will need to call \code{setDTthreads} again to reread the environment variables.
}
\usage{
setDTthreads(threads = NULL, restore_after_fork = NULL, percent = NULL)
setDTthreads(threads = NULL, restore_after_fork = NULL, percent = NULL, th_iters = NULL)
getDTthreads(verbose = getOption("datatable.verbose"))
}
\arguments{
\item{threads}{ NULL (default) rereads environment variables. 0 means to use all logical CPUs available. Otherwise a number >= 1 }
\item{restore_after_fork}{ Should data.table be multi-threaded after a fork has completed? NULL leaves the current setting unchanged which by default is TRUE. See details below. }
\item{percent}{ If provided it should be a number between 2 and 100; the percentage of logical CPUs to use. By default on startup, 50\%. }
\item{th_iters}{ Number of iterations required for data.table to switch to utilizing more threads. }
\item{verbose}{ Display the value of relevant OpenMP settings plus the \code{restore_after_fork} internal option. }
}
\value{
Expand Down
16 changes: 8 additions & 8 deletions src/between.c
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,14 @@ SEXP between(SEXP x, SEXP lower, SEXP upper, SEXP incbounds, SEXP NAboundsArg, S
error(_("Item %d of lower (%d) is greater than item %d of upper (%d)"), (i&lowMask)+1, l, (i&uppMask)+1, u);
}
if (NAbounds) { // default NAbounds==TRUE => NA bound means TRUE; i.e. asif lower=-Inf or upper==Inf)
#pragma omp parallel for num_threads(getDTthreads())
#pragma omp parallel for num_threads(getDTthreads(OMP_ROWS, longest))
for (int i=0; i<longest; ++i) {
const int elem=xp[i & xMask], l=lp[i & lowMask], u=up[i & uppMask];
ansp[i] = elem==NA_INTEGER ? NA_LOGICAL : (l==NA_INTEGER || l+open<=elem) && (u==NA_INTEGER || elem<=u-open);
// +open so we can always use >= and <=. NA_INTEGER+1 == -INT_MAX == INT_MIN+1 (so NA limit handled by this too)
}
} else {
#pragma omp parallel for num_threads(getDTthreads())
#pragma omp parallel for num_threads(getDTthreads(OMP_ROWS, longest))
for (int i=0; i<longest; ++i) {
const int elem=xp[i & xMask], l=lp[i & lowMask], u=up[i & uppMask];
if (elem==NA_INTEGER) { ansp[i]=NA_LOGICAL; continue; }
Expand All @@ -95,13 +95,13 @@ SEXP between(SEXP x, SEXP lower, SEXP upper, SEXP incbounds, SEXP NAboundsArg, S
error(_("Item %d of lower (%"PRId64") is greater than item %d of upper (%"PRId64")"), (i&lowMask)+1, l, (i&uppMask)+1, u);
}
if (NAbounds) {
#pragma omp parallel for num_threads(getDTthreads())
#pragma omp parallel for num_threads(getDTthreads(OMP_ROWS, longest))
for (int i=0; i<longest; ++i) {
const int64_t elem=xp[i & xMask], l=lp[i & lowMask], u=up[i & uppMask];
ansp[i] = elem==NA_INTEGER64 ? NA_LOGICAL : (l==NA_INTEGER64 || l+open<=elem) && (u==NA_INTEGER64 || elem<=u-open);
}
} else {
#pragma omp parallel for num_threads(getDTthreads())
#pragma omp parallel for num_threads(getDTthreads(OMP_ROWS, longest))
for (int i=0; i<longest; ++i) {
const int64_t elem=xp[i & xMask], l=lp[i & lowMask], u=up[i & uppMask];
if (elem==NA_INTEGER64) { ansp[i]=NA_LOGICAL; continue; }
Expand All @@ -123,13 +123,13 @@ SEXP between(SEXP x, SEXP lower, SEXP upper, SEXP incbounds, SEXP NAboundsArg, S
}
if (open) {
if (NAbounds) {
#pragma omp parallel for num_threads(getDTthreads())
#pragma omp parallel for num_threads(getDTthreads(OMP_ROWS, longest))
for (int i=0; i<longest; ++i) {
const double elem=xp[i & xMask], l=lp[i & lowMask], u=up[i & uppMask];
ansp[i] = isnan(elem) ? NA_LOGICAL : (isnan(l) || l<elem) && (isnan(u) || elem<u);
}
} else {
#pragma omp parallel for num_threads(getDTthreads())
#pragma omp parallel for num_threads(getDTthreads(OMP_ROWS, longest))
for (int i=0; i<longest; ++i) {
const double elem=xp[i & xMask], l=lp[i & lowMask], u=up[i & uppMask];
if (isnan(elem)) { ansp[i]=NA_LOGICAL; continue; }
Expand All @@ -140,13 +140,13 @@ SEXP between(SEXP x, SEXP lower, SEXP upper, SEXP incbounds, SEXP NAboundsArg, S
if (verbose) Rprintf(_("between parallel processing of double with open bounds took %8.3fs\n"), omp_get_wtime()-tic);
} else {
if (NAbounds) {
#pragma omp parallel for num_threads(getDTthreads())
#pragma omp parallel for num_threads(getDTthreads(OMP_ROWS, longest))
for (int i=0; i<longest; ++i) {
const double elem=xp[i & xMask], l=lp[i & lowMask], u=up[i & uppMask];
ansp[i] = isnan(elem) ? NA_LOGICAL : (isnan(l) || l<=elem) && (isnan(u) || elem<=u);
}
} else {
#pragma omp parallel for num_threads(getDTthreads())
#pragma omp parallel for num_threads(getDTthreads(OMP_ROWS, longest))
for (int i=0; i<longest; ++i) {
const double elem=xp[i & xMask], l=lp[i & lowMask], u=up[i & uppMask];
if (isnan(elem)) { ansp[i]=NA_LOGICAL; continue; }
Expand Down
12 changes: 6 additions & 6 deletions src/cj.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,43 +20,43 @@ SEXP cj(SEXP base_list) {
case INTSXP: {
const int *restrict sourceP = INTEGER(source);
int *restrict targetP = INTEGER(target);
#pragma omp parallel for num_threads(getDTthreads())
#pragma omp parallel for num_threads(getDTthreads(OMP_ROWS, thislen))
// default static schedule so two threads won't write to same cache line in last column
// if they did write to same cache line (and will when last column's thislen is small) there's no correctness issue
for (int i=0; i<thislen; ++i) {
const int item = sourceP[i];
const int end = (i+1)*eachrep;
for (int j=i*eachrep; j<end; ++j) targetP[j] = item; // no div, mod or read ops inside loop; just rep a const contiguous write
}
#pragma omp parallel for num_threads(getDTthreads())
#pragma omp parallel for num_threads(getDTthreads(OMP_ROWS, ncopy))
for (int i=1; i<ncopy; ++i) {
memcpy(targetP + i*blocklen, targetP, blocklen*sizeof(int));
}
} break;
case REALSXP: {
const double *restrict sourceP = REAL(source);
double *restrict targetP = REAL(target);
#pragma omp parallel for num_threads(getDTthreads())
#pragma omp parallel for num_threads(getDTthreads(OMP_ROWS, thislen))
for (int i=0; i<thislen; ++i) {
const double item = sourceP[i];
const int end=(i+1)*eachrep;
for (int j=i*eachrep; j<end; ++j) targetP[j] = item;
}
#pragma omp parallel for num_threads(getDTthreads())
#pragma omp parallel for num_threads(getDTthreads(OMP_ROWS, ncopy))
for (int i=1; i<ncopy; ++i) {
memcpy(targetP + i*blocklen, targetP, blocklen*sizeof(double));
}
} break;
case CPLXSXP: {
const Rcomplex *restrict sourceP = COMPLEX(source);
Rcomplex *restrict targetP = COMPLEX(target);
#pragma omp parallel for num_threads(getDTthreads())
#pragma omp parallel for num_threads(getDTthreads(OMP_ROWS, thislen))
for (int i=0; i<thislen; ++i) {
const Rcomplex item = sourceP[i];
const int end=(i+1)*eachrep;
for (int j=i*eachrep; j<end; ++j) targetP[j] = item;
}
#pragma omp parallel for num_threads(getDTthreads())
#pragma omp parallel for num_threads(getDTthreads(OMP_ROWS, ncopy))
for (int i=1; i<ncopy; ++i) {
memcpy(targetP + i*blocklen, targetP, blocklen*sizeof(Rcomplex));
}
Expand Down
8 changes: 4 additions & 4 deletions src/coalesce.c
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ SEXP coalesce(SEXP x, SEXP inplaceArg) {
valP[k++] = INTEGER(item);
}
const bool final=(finalVal!=NA_INTEGER);
#pragma omp parallel for num_threads(getDTthreads())
#pragma omp parallel for num_threads(getDTthreads(OMP_ROWS, nrow))
for (int i=0; i<nrow; ++i) {
int val = xP[i];
if (val!=NA_INTEGER) continue;
Expand All @@ -88,7 +88,7 @@ SEXP coalesce(SEXP x, SEXP inplaceArg) {
valP[k++] = REAL(item);
}
const bool final = (finalVal!=NA_INTEGER64);
#pragma omp parallel for num_threads(getDTthreads())
#pragma omp parallel for num_threads(getDTthreads(OMP_ROWS, nrow))
for (int i=0; i<nrow; ++i) {
int64_t val=xP[i];
if (val!=NA_INTEGER64) continue;
Expand All @@ -109,7 +109,7 @@ SEXP coalesce(SEXP x, SEXP inplaceArg) {
valP[k++] = REAL(item);
}
const bool final = !ISNAN(finalVal);
#pragma omp parallel for num_threads(getDTthreads())
#pragma omp parallel for num_threads(getDTthreads(OMP_ROWS, nrow))
for (int i=0; i<nrow; ++i) {
double val=xP[i];
if (!ISNAN(val)) continue;
Expand All @@ -132,7 +132,7 @@ SEXP coalesce(SEXP x, SEXP inplaceArg) {
valP[k++] = COMPLEX(item);
}
const bool final = !ISNAN(finalVal.r) && !ISNAN(finalVal.i);
#pragma omp parallel for num_threads(getDTthreads())
#pragma omp parallel for num_threads(getDTthreads(OMP_ROWS, nrow))
for (int i=0; i<nrow; ++i) {
Rcomplex val=xP[i];
if (!ISNAN(val.r) && !ISNAN(val.i)) continue;
Expand Down
7 changes: 6 additions & 1 deletion src/data.table.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@
#define NEED2UTF8(s) !(IS_ASCII(s) || (s)==NA_STRING || IS_UTF8(s))
#define ENC2UTF8(s) (!NEED2UTF8(s) ? (s) : mkCharCE(translateCharUTF8(s), CE_UTF8))

// Types used to dynamically limit DT threads when passing to num_threads
#define OMP_BATCH 2 // limit to number of iterations (batches)
jangorecki marked this conversation as resolved.
Show resolved Hide resolved
#define OMP_ROWS 1 // limit to number of iterations (rows) divided by R_DATATABLE_ITER_PER_THREAD
#define OMP_ALL 0 // do not limit

// init.c
extern SEXP char_integer64;
extern SEXP char_ITime;
Expand Down Expand Up @@ -177,7 +182,7 @@ double wallclock();

// openmp-utils.c
void initDTthreads();
int getDTthreads();
int getDTthreads(int type, int iters);
void avoid_openmp_hang_within_fork();

// froll.c
Expand Down
8 changes: 4 additions & 4 deletions src/fifelse.c
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ SEXP fifelseR(SEXP l, SEXP a, SEXP b, SEXP na) {
const int *restrict pa = LOGICAL(a);
const int *restrict pb = LOGICAL(b);
const int pna = nonna ? LOGICAL(na)[0] : NA_LOGICAL;
#pragma omp parallel for num_threads(getDTthreads())
#pragma omp parallel for num_threads(getDTthreads(OMP_ROWS, len0))
for (int64_t i=0; i<len0; ++i) {
pans[i] = pl[i]==0 ? pb[i & bmask] : (pl[i]==1 ? pa[i & amask] : pna);
}
Expand All @@ -87,7 +87,7 @@ SEXP fifelseR(SEXP l, SEXP a, SEXP b, SEXP na) {
const int *restrict pa = INTEGER(a);
const int *restrict pb = INTEGER(b);
const int pna = nonna ? INTEGER(na)[0] : NA_INTEGER;
#pragma omp parallel for num_threads(getDTthreads())
#pragma omp parallel for num_threads(getDTthreads(OMP_ROWS, len0))
for (int64_t i=0; i<len0; ++i) {
pans[i] = pl[i]==0 ? pb[i & bmask] : (pl[i]==1 ? pa[i & amask] : pna);
}
Expand All @@ -98,7 +98,7 @@ SEXP fifelseR(SEXP l, SEXP a, SEXP b, SEXP na) {
const double *restrict pb = REAL(b);
const double na_double = Rinherits(a, char_integer64) ? NA_INT64_D : NA_REAL; // Rinherits() is true for nanotime
const double pna = nonna ? REAL(na)[0] : na_double;
#pragma omp parallel for num_threads(getDTthreads())
#pragma omp parallel for num_threads(getDTthreads(OMP_ROWS, len0))
for (int64_t i=0; i<len0; ++i) {
pans[i] = pl[i]==0 ? pb[i & bmask] : (pl[i]==1 ? pa[i & amask] : pna);
}
Expand All @@ -116,7 +116,7 @@ SEXP fifelseR(SEXP l, SEXP a, SEXP b, SEXP na) {
const Rcomplex *restrict pa = COMPLEX(a);
const Rcomplex *restrict pb = COMPLEX(b);
const Rcomplex pna = nonna ? COMPLEX(na)[0] : NA_CPLX;
#pragma omp parallel for num_threads(getDTthreads())
#pragma omp parallel for num_threads(getDTthreads(OMP_ROWS, len0))
for (int64_t i=0; i<len0; ++i) {
pans[i] = pl[i]==0 ? pb[i & bmask] : (pl[i]==1 ? pa[i & amask] : pna);
}
Expand Down
Loading