Skip to content

Commit

Permalink
Merge branch 'release/1.32.0'
Browse files Browse the repository at this point in the history
  • Loading branch information
HenrikBengtsson committed Mar 7, 2023
2 parents 8883656 + a08b6f9 commit c81d2a8
Show file tree
Hide file tree
Showing 46 changed files with 3,084 additions and 6,160 deletions.
7 changes: 4 additions & 3 deletions .github/workflows/R-CMD-check.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ jobs:
_R_CHECK_MATRIX_DATA_: true
_R_CHECK_SUGGESTS_ONLY_: true
_R_CHECK_THINGS_IN_TEMP_DIR_: true
RCMDCHECK_ERROR_ON: note
## Specific to futures
R_FUTURE_RNG_ONMISUSE: error
R_FUTURE_GLOBALS_KEEPWHERE: ${{ matrix.config.globals_keepWhere }}
Expand Down Expand Up @@ -100,9 +101,10 @@ jobs:
R_FUTURE_FORK_MULTITHREADING_ENABLE: ${{ matrix.config.fork_multithreading_enable }}
R_FUTURE_PSOCK_RELAY_IMMEDIATE: ${{ matrix.config.psock_relay_immediate }}
run: |
if (nzchar(Sys.getenv("R_FUTURE_PLAN")) || getRversion() < "3.5.0") Sys.setenv(RCMDCHECK_ERROR_ON = "error")
rcmdcheck::rcmdcheck(
args = c("--no-manual", "--as-cran"),
error_on = if (nzchar(Sys.getenv("R_FUTURE_PLAN"))) "error" else "note",
build_args = if (getRversion() < "3.5.0") "--no-build-vignettes",
args = c("--no-manual", "--as-cran", if (getRversion() < "3.5.0") c("--no-vignettes", "--no-build-vignettes", "--ignore-vignettes")),
check_dir = "check"
)
shell: Rscript {0}
Expand All @@ -112,7 +114,6 @@ jobs:
run: |
rcmdcheck::rcmdcheck(
args = c("--no-manual", "--as-cran", if (.Platform$OS.type == "windows" && getRversion() >= "4.2.0") "--no-multiarch"),
error_on = "note",
check_dir = "check"
)
shell: Rscript {0}
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/future_tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ jobs:
- { plan: 'multisession' }
- { plan: 'sequential' }
- { plan: 'future.batchtools::batchtools_local' }
- { plan: 'future.batchtools::batchtools_bash' }
- { plan: 'future.callr::callr' }

env:
GITHUB_PAT: ${{ secrets.GITHUB_TOKEN }}
RSPM: https://packagemanager.rstudio.com/cran/__linux__/focal/latest
R_REMOTES_NO_ERRORS_FROM_WARNINGS: true
## R CMD check
_R_CHECK_LENGTH_1_CONDITION_: true
_R_CHECK_LENGTH_1_LOGIC2_: true
_R_CHECK_MATRIX_DATA_: true
_R_CHECK_CRAN_INCOMING_: false
Expand Down
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
Package: future
Version: 1.31.0
Version: 1.32.0
Title: Unified Parallel and Distributed Processing in R for Everyone
Imports:
digest,
Expand Down
12 changes: 12 additions & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Generated by roxygen2: do not edit by hand

S3method("$<-",Future)
S3method("[",FutureGlobals)
S3method("[",sessionDetails)
S3method(as.FutureGlobals,FutureGlobals)
Expand All @@ -16,6 +17,10 @@ S3method(getExpression,Future)
S3method(getExpression,MulticoreFuture)
S3method(getExpression,MultisessionFuture)
S3method(getExpression,UniprocessFuture)
S3method(journal,Future)
S3method(journal,FutureJournal)
S3method(journal,FutureJournalCondition)
S3method(journal,list)
S3method(mandelbrot,matrix)
S3method(mandelbrot,numeric)
S3method(nbrOfFreeWorkers,"NULL")
Expand All @@ -33,6 +38,8 @@ S3method(nbrOfWorkers,uniprocess)
S3method(plot,Mandelbrot)
S3method(print,Future)
S3method(print,FutureCondition)
S3method(print,FutureJournal)
S3method(print,FutureJournalSummary)
S3method(print,FutureResult)
S3method(print,FutureStrategy)
S3method(print,FutureStrategyList)
Expand Down Expand Up @@ -64,6 +71,7 @@ S3method(run,ConstantFuture)
S3method(run,Future)
S3method(run,MulticoreFuture)
S3method(run,UniprocessFuture)
S3method(summary,FutureJournal)
S3method(tweak,"function")
S3method(tweak,character)
S3method(tweak,future)
Expand All @@ -89,9 +97,13 @@ export(Future)
export(FutureCondition)
export(FutureError)
export(FutureGlobals)
export(FutureJournalCondition)
export(FutureMessage)
export(FutureResult)
export(FutureWarning)
export(GlobalEnvFutureCondition)
export(GlobalEnvFutureError)
export(GlobalEnvFutureWarning)
export(MulticoreFuture)
export(MultiprocessFuture)
export(MultisessionFuture)
Expand Down
33 changes: 32 additions & 1 deletion NEWS.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,37 @@
# Version 1.32.0 [2023-03-06]

## New Features

* Add prototype of an internal event-logging framework for the
purpose of profiling futures and their backends.

* Add option `future.globalenv.onMisuse` for optionally assert that a
future expression does not result in variables being added to the
global environment.

* Add option `future.onFutureCondition.keepFuture` for controlling
whether `FutureCondition` objects should keep a copy of the
`Future` object or not. The default is to keep a copy, but if the
future carries large global objects, then the `FutureCondition`
will also be large, which can result in memory issues and slow
downs.

## Miscellaneous

* Fix a **future.tests** check that occurred only on MS Windows.

## Deprecated and Defunct

* The 'multiprocess' strategy, which has been deprecated since future
1.20.0 [2020-10-30] is now defunct. Please use 'multisession'
(recommended) or 'multicore' instead.

* Add optional assertion of the internal Future `state` field.


# Version 1.31.0 [2023-01-31]

## Signficant Changes
## Significant Changes

* Remove function `remote()`. Note that `plan(remote, ...)` has been
deprecated since **future** 1.24.0 [2022-02-19] and defunct since
Expand Down
57 changes: 55 additions & 2 deletions R/ClusterFuture-class.R
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ ClusterFuture <- function(expr = NULL, substitute = TRUE, envir = parent.frame()
future <- do.call(MultiprocessFuture, args = c(list(expr = quote(expr), substitute = FALSE, envir = envir, persistent = persistent, node = NA_integer_), args[future_args]), quote = FALSE)

future <- do.call(as_ClusterFuture, args = c(list(future, workers = workers), args[!future_args]), quote = TRUE)

future
}

Expand Down Expand Up @@ -109,30 +109,51 @@ run.ClusterFuture <- function(future, ...) {
## FutureRegistry to use
reg <- sprintf("workers-%s", attr(workers, "name", exact = TRUE))


## Next available cluster node
t_start <- Sys.time()
node_idx <- requestNode(await = function() {
FutureRegistry(reg, action = "collect-first", earlySignal = TRUE)
}, workers = workers)
future$node <- node_idx

## Cluster node to use
cl <- workers[node_idx]

if (inherits(future$.journal, "FutureJournal")) {
appendToFutureJournal(future,
event = "getWorker",
category = "overhead",
parent = "launch",
start = t_start,
stop = Sys.time()
)
}


## (i) Reset global environment of cluster node such that
## previous futures are not affecting this one, which
## may happen even if the future is evaluated inside a
## local, e.g. local({ a <<- 1 }).
if (!persistent) {
t_start <- Sys.time()
cluster_call(cl, fun = grmall, future = future, when = "call grmall() on")
if (inherits(future$.journal, "FutureJournal")) {
appendToFutureJournal(future,
event = "eraseWorker",
category = "overhead",
parent = "launch",
start = t_start,
stop = Sys.time()
)
}
}


## (ii) Attach packages that needs to be attached
## NOTE: Already take care of by getExpression() of the Future class.
## However, if we need to get an early error about missing packages,
## we can get the error here before launching the future.
t_start <- Sys.time()
packages <- packages(future)
if (future$earlySignal && length(packages) > 0) {
if (debug) mdebugf("Attaching %d packages (%s) on cluster node #%d ...",
Expand All @@ -144,10 +165,20 @@ run.ClusterFuture <- function(future, ...) {
length(packages), hpaste(sQuote(packages)), node_idx)
}

if (inherits(future$.journal, "FutureJournal")) {
appendToFutureJournal(future,
event = "attachPackages",
category = "overhead",
parent = "launch",
start = t_start,
stop = Sys.time()
)
}

## (iii) Export globals
globals <- globals(future)
if (length(globals) > 0) {
t_start <- Sys.time()
if (debug) {
total_size <- asIEC(objectSize(globals))
mdebugf("Exporting %d global objects (%s) to cluster node #%d ...", length(globals), total_size, node_idx)
Expand All @@ -170,6 +201,16 @@ run.ClusterFuture <- function(future, ...) {
value <- NULL
}
if (debug) mdebugf("Exporting %d global objects (%s) to cluster node #%d ... DONE", length(globals), total_size, node_idx)

if (inherits(future$.journal, "FutureJournal")) {
appendToFutureJournal(future,
event = "exportGlobals",
category = "overhead",
parent = "launch",
start = t_start,
stop = Sys.time()
)
}
}
## Not needed anymore
globals <- NULL
Expand Down Expand Up @@ -354,6 +395,8 @@ receiveMessageFromWorker <- function(future, ...) {
}
}

t_start <- Sys.time()

## If not, wait for process to finish, and
## then collect and record the value
msg <- NULL
Expand Down Expand Up @@ -400,6 +443,16 @@ receiveMessageFromWorker <- function(future, ...) {
if (inherits(msg, "FutureResult")) {
result <- msg

if (inherits(future$.journal, "FutureJournal")) {
appendToFutureJournal(future,
event = "receiveResult",
category = "overhead",
parent = "gather",
start = t_start,
stop = Sys.time()
)
}

## Add back already signaled and muffled conditions so that also
## they will be resignaled each time value() is called.
signaled <- future$.signaledConditions
Expand Down
95 changes: 71 additions & 24 deletions R/Future-class.R
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@
#' @name Future-class
Future <- function(expr = NULL, envir = parent.frame(), substitute = TRUE, stdout = TRUE, conditions = "condition", globals = list(), packages = NULL, seed = FALSE, lazy = FALSE, gc = FALSE, earlySignal = FALSE, label = NULL, ...) {
if (substitute) expr <- substitute(expr)

t_start <- Sys.time()

if (is.null(seed)) {
} else if (isFALSE(seed)) {
} else if (is_lecyer_cmrg_seed(seed)) {
Expand Down Expand Up @@ -163,28 +164,9 @@ Future <- function(expr = NULL, envir = parent.frame(), substitute = TRUE, stdou
.Defunct(msg = "Future field 'value' is defunct and must not be set", package = .packageName)
}

## 'local' is now defunct
## 'local' is defunct
if ("local" %in% args_names) {
dfcn <- .Defunct
msg <- "Argument 'local' is defunct as of future 1.31.0 (2023-01-31)"

## SPECIAL CASE: Temporarily allow the 'civis' package to keep using
## 'local' for a tad longer, although it has zero effect since a
## long time (https://github.com/civisanalytics/civis-r/issues/244)
## Only allow for this is local = TRUE.
## /HB 2023-01-27
if (isTRUE(args$local) &&
Sys.getenv("R_FUTURE_CHECK_IGNORE_CIVIS", "true") == "true") {
for (call in sys.calls()) {
if ("CivisFuture" %in% as.character(call[[1]])) {
msg <- sprintf("%s. In this case it was because civis::CivisFuture() was used. Please contact the maintainers of the 'civis' package about this problem.", msg)
dfcn <- .Deprecated
break
}
}
}

dfcn(msg = msg, package = .packageName)
.Defunct(msg = "Argument 'local' is defunct as of future 1.31.0 (2023-01-31)", package = .packageName)
}

core <- new.env(parent = emptyenv())
Expand Down Expand Up @@ -477,12 +459,55 @@ run.Future <- function(future, ...) {
future
}

run <- function(...) UseMethod("run")
#' @export
#' @keywords internal
run <- function(future, ...) {
## Automatically update journal entries for Future object
if (inherits(future, "Future") &&
inherits(future$.journal, "FutureJournal")) {
start <- Sys.time()
on.exit({
appendToFutureJournal(future,
event = "launch",
category = "overhead",
start = start,
stop = Sys.time()
)
})
}
UseMethod("run")
}


#' @export
#' @keywords internal
result <- function(...) UseMethod("result")
result <- function(future, ...) {
## Automatically update journal entries for Future object
if (inherits(future, "Future") &&
inherits(future$.journal, "FutureJournal")) {
start <- Sys.time()
on.exit({
appendToFutureJournal(future,
event = "gather",
category = "overhead",
start = start,
stop = Sys.time()
)

## Signal FutureJournalCondition?
if (!isTRUE(future$.journal_signalled)) {
journal <- journal(future)
label <- future$label
if (is.null(label)) label <- "<none>"
msg <- sprintf("A future ('%s') of class %s was resolved", label, class(future)[1])
cond <- FutureJournalCondition(message = msg, journal = journal)
signalCondition(cond)
future$.journal_signalled <- TRUE
}
})
}
UseMethod("result")
}

#' Get the results of a resolved future
#'
Expand Down Expand Up @@ -819,6 +844,7 @@ getExpression.Future <- local({
} ## getExpression()
})


globals <- function(future, ...) UseMethod("globals")

globals.Future <- function(future, ...) {
Expand All @@ -830,3 +856,24 @@ packages <- function(future, ...) UseMethod("packages")
packages.Future <- function(future, ...) {
future[["packages"]]
}


#' @export
`$<-.Future` <- function(x, name, value) {
if (name == "state") {
if (!is.element(value, c("created", "running", "finished", "failed", "interrupted"))) {
action <- getOption("future.state.onInvalid", "warning")

if (action != "ignore") {
msg <- sprintf("Trying to assign an invalid value to the internal '%s' field of a %s object: %s", name, class(x)[1], value)
if (action == "error") {
stop(FutureError(msg, call = sys.call(), future = x))
} else {
warning(FutureWarning(msg, call = sys.call(), future = x))
}
}
}
}

NextMethod()
}
Loading

0 comments on commit c81d2a8

Please sign in to comment.