Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

throttle threads for iterated small data tasks #4484

Merged
merged 10 commits into from
Jun 18, 2020
16 changes: 8 additions & 8 deletions src/between.c
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,14 @@ SEXP between(SEXP x, SEXP lower, SEXP upper, SEXP incbounds, SEXP NAboundsArg, S
error(_("Item %d of lower (%d) is greater than item %d of upper (%d)"), (i&lowMask)+1, l, (i&uppMask)+1, u);
}
if (NAbounds) { // default NAbounds==TRUE => NA bound means TRUE; i.e. asif lower=-Inf or upper==Inf)
#pragma omp parallel for num_threads(getDTthreads())
#pragma omp parallel for if (longest>=getDTthreads()) num_threads(getDTthreads())
for (int i=0; i<longest; ++i) {
const int elem=xp[i & xMask], l=lp[i & lowMask], u=up[i & uppMask];
ansp[i] = elem==NA_INTEGER ? NA_LOGICAL : (l==NA_INTEGER || l+open<=elem) && (u==NA_INTEGER || elem<=u-open);
// +open so we can always use >= and <=. NA_INTEGER+1 == -INT_MAX == INT_MIN+1 (so NA limit handled by this too)
}
} else {
#pragma omp parallel for num_threads(getDTthreads())
#pragma omp parallel for if (longest>=getDTthreads()) num_threads(getDTthreads())
for (int i=0; i<longest; ++i) {
const int elem=xp[i & xMask], l=lp[i & lowMask], u=up[i & uppMask];
if (elem==NA_INTEGER) { ansp[i]=NA_LOGICAL; continue; }
Expand All @@ -95,13 +95,13 @@ SEXP between(SEXP x, SEXP lower, SEXP upper, SEXP incbounds, SEXP NAboundsArg, S
error(_("Item %d of lower (%"PRId64") is greater than item %d of upper (%"PRId64")"), (i&lowMask)+1, l, (i&uppMask)+1, u);
}
if (NAbounds) {
#pragma omp parallel for num_threads(getDTthreads())
#pragma omp parallel for if (longest>=getDTthreads()) num_threads(getDTthreads())
for (int i=0; i<longest; ++i) {
const int64_t elem=xp[i & xMask], l=lp[i & lowMask], u=up[i & uppMask];
ansp[i] = elem==NA_INTEGER64 ? NA_LOGICAL : (l==NA_INTEGER64 || l+open<=elem) && (u==NA_INTEGER64 || elem<=u-open);
}
} else {
#pragma omp parallel for num_threads(getDTthreads())
#pragma omp parallel for if (longest>=getDTthreads()) num_threads(getDTthreads())
for (int i=0; i<longest; ++i) {
const int64_t elem=xp[i & xMask], l=lp[i & lowMask], u=up[i & uppMask];
if (elem==NA_INTEGER64) { ansp[i]=NA_LOGICAL; continue; }
Expand All @@ -123,13 +123,13 @@ SEXP between(SEXP x, SEXP lower, SEXP upper, SEXP incbounds, SEXP NAboundsArg, S
}
if (open) {
if (NAbounds) {
#pragma omp parallel for num_threads(getDTthreads())
#pragma omp parallel for if (longest>=getDTthreads()) num_threads(getDTthreads())
for (int i=0; i<longest; ++i) {
const double elem=xp[i & xMask], l=lp[i & lowMask], u=up[i & uppMask];
ansp[i] = isnan(elem) ? NA_LOGICAL : (isnan(l) || l<elem) && (isnan(u) || elem<u);
}
} else {
#pragma omp parallel for num_threads(getDTthreads())
#pragma omp parallel for if (longest>=getDTthreads()) num_threads(getDTthreads())
for (int i=0; i<longest; ++i) {
const double elem=xp[i & xMask], l=lp[i & lowMask], u=up[i & uppMask];
if (isnan(elem)) { ansp[i]=NA_LOGICAL; continue; }
Expand All @@ -140,13 +140,13 @@ SEXP between(SEXP x, SEXP lower, SEXP upper, SEXP incbounds, SEXP NAboundsArg, S
if (verbose) Rprintf(_("between parallel processing of double with open bounds took %8.3fs\n"), omp_get_wtime()-tic);
} else {
if (NAbounds) {
#pragma omp parallel for num_threads(getDTthreads())
#pragma omp parallel for if (longest>=getDTthreads()) num_threads(getDTthreads())
for (int i=0; i<longest; ++i) {
const double elem=xp[i & xMask], l=lp[i & lowMask], u=up[i & uppMask];
ansp[i] = isnan(elem) ? NA_LOGICAL : (isnan(l) || l<=elem) && (isnan(u) || elem<=u);
}
} else {
#pragma omp parallel for num_threads(getDTthreads())
#pragma omp parallel for if (longest>=getDTthreads()) num_threads(getDTthreads())
for (int i=0; i<longest; ++i) {
const double elem=xp[i & xMask], l=lp[i & lowMask], u=up[i & uppMask];
if (isnan(elem)) { ansp[i]=NA_LOGICAL; continue; }
Expand Down
6 changes: 3 additions & 3 deletions src/subset.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ void subsetVectorRaw(SEXP ans, SEXP source, SEXP idx, const bool anyNA)

#define PARLOOP(_NAVAL_) \
if (anyNA) { \
_Pragma("omp parallel for num_threads(getDTthreads())") \
_Pragma("omp parallel for if (n>=getDTthreads()) num_threads(getDTthreads())") \
jangorecki marked this conversation as resolved.
Show resolved Hide resolved
for (int i=0; i<n; i++) { \
int elem = idxp[i]; \
ap[i] = elem==NA_INTEGER ? _NAVAL_ : sp[elem-1]; \
} \
} else { \
_Pragma("omp parallel for num_threads(getDTthreads())") \
_Pragma("omp parallel for if (n>=getDTthreads()) num_threads(getDTthreads())") \
for (int i=0; i<n; i++) { \
ap[i] = sp[idxp[i]-1]; \
} \
Expand Down Expand Up @@ -121,7 +121,7 @@ SEXP convertNegAndZeroIdx(SEXP idx, SEXP maxArg, SEXP allowOverMax)
int *idxp = INTEGER(idx);

bool stop = false;
#pragma omp parallel for num_threads(getDTthreads())
#pragma omp parallel for if (n>=getDTthreads()) num_threads(getDTthreads())
for (int i=0; i<n; i++) {
if (stop) continue;
int elem = idxp[i];
Expand Down