Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Segfaults with itoken_parallel() #264

Closed
pommedeterresautee opened this issue May 29, 2018 · 11 comments
Closed

Segfaults with itoken_parallel() #264

pommedeterresautee opened this issue May 29, 2018 · 11 comments
Labels

Comments

@pommedeterresautee
Copy link

I have no idea why but since a few days, without change in my source code, parallel processing doesn't work anymore.
Even more strange, very same code works without parallel processing.
Error message change from time to time, but at the end there is always a "memory not mapped".
I already tried to delete all packages + R and reinstall everything but didn't fix.
I also deleted my Makevars file in ~/.R before reinstalling (so no optimization).
The source code is launched from Rscript or Rstudio, both crash.

Same behaviour with last text2vec from Cran and last commit from GH.
On Ubuntu 18.04 and R 3.4.4

> Sys.info()
                                      sysname                                       release 
                                      "Linux"                           "4.15.0-22-generic" 
                                      version                                      nodename 
"#24-Ubuntu SMP Wed May 16 12:15:17 UTC 2018"                      "geantvert-Aspire-M5910" 
                                      machine                                         login 
                                     "x86_64"                                   "geantvert" 
                                         user                                effective_user 
                                  "geantvert"                                   "geantvert" 

Source code

config <- config::get()
set.seed(config$seed)

suppressMessages(library(text2vec))
suppressMessages(library(data.table))
library(doParallel)
library(Matrix)

registerDoParallel(config$cpu)

# declare functions ----

print_with_time <- function(text) {
  print(paste(Sys.time(), "-", text))
}

read_dt_file <- function(path, ...) {
  dt <- fread(file = path, sep = "|", showProgress = FALSE, colClasses = "character", header = TRUE, nThread = 1, select = 1:2)
  # dt <- readr::read_delim(file = path, delim = "|", progress = FALSE, col_types = "cc")
  # dt <- fread(file = path, sep = "|", showProgress = FALSE, select = 1:2)
  dt <- dt[nchar(text) > config$size_char_min]
  text <- dt$text
  names(text) = dt$file
  text
}

# list files to vectorize ----

files <- list.files(path = config$all_prefixed,
                    pattern = ".txt",
                    full.names = TRUE)

# consistency <- pbapply::pblapply(X = files, FUN = function(path) {
#   # dt <- readr::read_delim(file = path, delim = "|", progress = FALSE, col_types = "cc")
#   dt <- fread(file = path, sep = "|", showProgress = FALSE, colClasses = "character", header = TRUE, nThread = 1, select = 1:2)
#   (ncol(dt) == 2) & (nrow(dt) > 0)
# })
#
# assertthat::assert_that(all(unlist(consistency)))

file_path_groups <- split(x = files,
                          f = sort(seq(files) %% 5))

# Generate vocabulary

get_voc <- function(file_path_groups, ngram, doc_min, doc_prop_max) {

  voc <- list()

  for (index in seq(file_path_groups)) {
    print_with_time(paste("voc - round", index))
    parallel_files_iterator = ifiles_parallel(file_paths = file_path_groups[[index]],
                                              reader = read_dt_file,
                                              n_chunks = config$cpu)

    it <- itoken_parallel(iterable = parallel_files_iterator,
                          progressbar = FALSE)

    voc[[index]] <- local({
      v <- create_vocabulary(it = it, ngram = seq.int(ngram))
      prune_vocabulary(vocabulary = v, doc_count_min = 10)
    })
  }

  voc <- do.call(text2vec::combine_vocabularies, args = voc)

  voc <- prune_vocabulary(vocabulary = voc,
                          doc_proportion_max = doc_prop_max,
                          doc_count_min = doc_min)

  voc
}

full_voc <- get_voc(file_path_groups = file_path_groups,
                    ngram = config$ngram,
                    doc_min = config$doc_min,
                    doc_prop_max = config$doc_prop_max)

Error:

*** caught segfault ***
address (nil), cause 'unknown'

 *** caught segfault ***
address (nil), cause 'unknown'

Traceback:
 1: .Call(C_stri_split_fixed, str, pattern, n, omit_empty, tokens_only,     simplify, opts_fixed)
 2: stringi::stri_split_fixed(strings, pattern = sep, ...)
 3: self$tokenizer[[1]](tokens)
 4: super$nextElem()
 5: obj$nextElem()
 6: nextElem.abstractiter(X[[i]], ...)
 7: FUN(X[[i]], ...)
 8: lapply(obj$iargs, nextElem)
 9: doTryCatch(return(expr), name, parentenv, handler)
10: tryCatchOne(expr, names, parentenv, handlers[[1L]])
11: tryCatchList(expr, classes, parentenv, handlers)
12: tryCatch({    ix <- which(!nzchar(obj$argnames))    elem <- if (length(ix) > 0) {        lapply(obj$iargs[ix], nextElem)        ix <- which(nzchar(obj$argnames))        if (length(ix) > 0)             lapply(obj$iargs[ix], nextElem)        else list()    }    else {        lapply(obj$iargs, nextElem)    }}, error = function(e) {    if (identical(conditionMessage(e), "StopIteration")) {        obj$state$stopped <- TRUE        if (complete(obj))             callCombine(obj, TRUE)    }    stop(e)})
13: nextElem.iforeach(it)
14: nextElem(it)
15: doTryCatch(return(expr), name, parentenv, handler)
16: tryCatchOne(expr, names, parentenv, handlers[[1L]])
17: tryCatchList(expr, classes, parentenv, handlers)
18: tryCatch({    repeat {        args <- nextElem(it)        if (obj$verbose) {            cat(sprintf("evaluation # %d:\n", i))            print(args)        }        for (a in names(args)) assign(a, args[[a]], pos = envir,             inherits = FALSE)        r <- tryCatch(eval(xpr, envir = envir), error = function(e) e)        if (obj$verbose) {            cat("result of evaluating expression:\n")            print(r)        }        tryCatch(accumulator(list(r), i), error = function(e) {            cat("error calling combine function:\n")            print(e)            NULL        })        i <- i + 1    }}, error = function(e) {    if (!identical(conditionMessage(e), "StopIteration"))         stop(simpleError(conditionMessage(e), expr))})
19: e$fun(obj, substitute(ex), parent.frame(), e$data)
20: foreach(tokens = it) %do% {    vocabulary_insert_document_batch_generic(vocab_ptr, tokens$tokens)}
21: create_vocabulary.itoken(it, ngram, stopwords)
22: create_vocabulary(it, ngram, stopwords)
23: eval(c.expr, envir = args, enclos = envir)
24: eval(c.expr, envir = args, enclos = envir)
25: doTryCatch(return(expr), name, parentenv, handler)
26: tryCatchOne(expr, names, parentenv, handlers[[1L]])
27: tryCatchList(expr, classes, parentenv, handlers)
28: tryCatch(eval(c.expr, envir = args, enclos = envir), error = function(e) e)
29: FUN(X[[i]], ...)
30: lapply(X = S, FUN = FUN, ...)
31: doTryCatch(return(expr), name, parentenv, handler)
32: tryCatchOne(expr, names, parentenv, handlers[[1L]])
33: tryCatchList(expr, classes, parentenv, handlers)
34: tryCatch(expr, error = function(e) {    call <- conditionCall(e)    if (!is.null(call)) {        if (identical(call[[1L]], quote(doTryCatch)))             call <- sys.call(-4L)        dcall <- deparse(call)[1L]        prefix <- paste("Error in", dcall, ": ")        LONG <- 75L        msg <- conditionMessage(e)        sm <- strsplit(msg, "\n")[[1L]]        w <- 14L + nchar(dcall, type = "w") + nchar(sm[1L], type = "w")        if (is.na(w))             w <- 14L + nchar(dcall, type = "b") + nchar(sm[1L],                 type = "b")        if (w > LONG)             prefix <- paste0(prefix, "\n  ")    }    else prefix <- "Error : "    msg <- paste0(prefix, conditionMessage(e), "\n")    .Internal(seterrmessage(msg[1L]))    if (!silent && identical(getOption("show.error.messages"),         TRUE)) {        cat(msg, file = outFile)        .Internal(printDeferredWarnings())    }    invisible(structure(msg, class = "try-error", condition = e))})
35: try(lapply(X = S, FUN = FUN, ...), silent = TRUE)
36: sendMaster(try(lapply(X = S, FUN = FUN, ...), silent = TRUE))
37: FUN(X[[i]], ...)
38: lapply(seq_len(cores), inner.do)
39: mclapply(argsList, FUN, mc.preschedule = preschedule, mc.set.seed = set.seed,     mc.silent = silent, mc.cores = cores)
40: e$fun(obj, substitute(ex), parent.frame(), e$data)
41: foreach(it = it, .combine = combine_vocabularies, .inorder = FALSE,     .multicombine = TRUE, ...) %dopar% {    create_vocabulary(it, ngram, stopwords)}
42: create_vocabulary.itoken_parallel(it = it, ngram = seq.int(ngram))
43: create_vocabulary(it = it, ngram = seq.int(ngram))
44: (function() {    v <- create_vocabulary(it = it, ngram = seq.int(ngram))    prune_vocabulary(vocabulary = v, doc_count_min = 10)})()
45: get_voc(file_path_groups = file_path_groups, ngram = config$ngram,     doc_min = config$doc_min, doc_prop_max = config$doc_prop_max)
An irrecoverable exception occurred. R is aborting now ...

 *** caught segfault ***
address 0x83, cause 'memory not mapped'

 *** caught segfault ***
address (nil), cause 'unknown'

 *** caught segfault ***
address 0x65656d7279, cause 'memory not mapped'

Traceback:
 1: .Call(C_stri_split_fixed, str, pattern, n, omit_empty, tokens_only,     simplify, opts_fixed)
 2: stringi::stri_split_fixed(strings, pattern = sep, ...)
 3: self$tokenizer[[1]](tokens)
 4: super$nextElem()
 5: obj$nextElem()
 6: nextElem.abstractiter(X[[i]], ...)
 7: FUN(X[[i]], ...)
 8: lapply(obj$iargs, nextElem)
 9: doTryCatch(return(expr), name, parentenv, handler)
10: tryCatchOne(expr, names, parentenv, handlers[[1L]])
11: tryCatchList(expr, classes, parentenv, handlers)
12: tryCatch({    ix <- which(!nzchar(obj$argnames))    elem <- if (length(ix) > 0) {        lapply(obj$iargs[ix], nextElem)        ix <- which(nzchar(obj$argnames))        if (length(ix) > 0)             lapply(obj$iargs[ix], nextElem)        else list()    }    else {        lapply(obj$iargs, nextElem)    }}, error = function(e) {    if (identical(conditionMessage(e), "StopIteration")) {        obj$state$stopped <- TRUE        if (complete(obj))             callCombine(obj, TRUE)    }    stop(e)})
13: nextElem.iforeach(it)
14: nextElem(it)
15: doTryCatch(return(expr), name, parentenv, handler)
16: tryCatchOne(expr, names, parentenv, handlers[[1L]])
17: tryCatchList(expr, classes, parentenv, handlers)
18: tryCatch({    repeat {        args <- nextElem(it)        if (obj$verbose) {            cat(sprintf("evaluation # %d:\n", i))            print(args)        }        for (a in names(args)) assign(a, args[[a]], pos = envir,             inherits = FALSE)        r <- tryCatch(eval(xpr, envir = envir), error = function(e) e)        if (obj$verbose) {            cat("result of evaluating expression:\n")            print(r)        }        tryCatch(accumulator(list(r), i), error = function(e) {            cat("error calling combine function:\n")            print(e)            NULL        })        i <- i + 1    }}, error = function(e) {    if (!identical(conditionMessage(e), "StopIteration"))         stop(simpleError(conditionMessage(e), expr))})
19: e$fun(obj, substitute(ex), parent.frame(), e$data)
20: foreach(tokens = it) %do% {    vocabulary_insert_document_batch_generic(vocab_ptr, tokens$tokens)}
21: create_vocabulary.itoken(it, ngram, stopwords)
22: create_vocabulary(it, ngram, stopwords)
23: eval(c.expr, envir = args, enclos = envir)
24: eval(c.expr, envir = args, enclos = envir)
25: doTryCatch(return(expr), name, parentenv, handler)
26: tryCatchOne(expr, names, parentenv, handlers[[1L]])
27: tryCatchList(expr, classes, parentenv, handlers)
28: tryCatch(eval(c.expr, envir = args, enclos = envir), error = function(e) e)
29: FUN(X[[i]], ...)
30: lapply(X = S, FUN = FUN, ...)
31: doTryCatch(return(expr), name, parentenv, handler)
32: tryCatchOne(expr, names, parentenv, handlers[[1L]])
33: tryCatchList(expr, classes, parentenv, handlers)
34: tryCatch(expr, error = function(e) {    call <- conditionCall(e)    if (!is.null(call)) {        if (identical(call[[1L]], quote(doTryCatch)))             call <- sys.call(-4L)        dcall <- deparse(call)[1L]        prefix <- paste("Error in", dcall, ": ")        LONG <- 75L        msg <- conditionMessage(e)        sm <- strsplit(msg, "\n")[[1L]]        w <- 14L + nchar(dcall, type = "w") + nchar(sm[1L], type = "w")        if (is.na(w))             w <- 14L + nchar(dcall, type = "b") + nchar(sm[1L],                 type = "b")        if (w > LONG)             prefix <- paste0(prefix, "\n  ")    }    else prefix <- "Error : "    msg <- paste0(prefix, conditionMessage(e), "\n")    .Internal(seterrmessage(msg[1L]))    if (!silent && identical(getOption("show.error.messages"),         TRUE)) {        cat(msg, file = outFile)        .Internal(printDeferredWarnings())    }    invisible(structure(msg, class = "try-error", condition = e))})
35: try(lapply(X = S, FUN = FUN, ...), silent = TRUE)
36: sendMaster(try(lapply(X = S, FUN = FUN, ...), silent = TRUE))
37: FUN(X[[i]], ...)
38: lapply(seq_len(cores), inner.do)
39: mclapply(argsList, FUN, mc.preschedule = preschedule, mc.set.seed = set.seed,     mc.silent = silent, mc.cores = cores)
40: e$fun(obj, substitute(ex), parent.frame(), e$data)
41: foreach(it = it, .combine = combine_vocabularies, .inorder = FALSE,     .multicombine = TRUE, ...) %dopar% {    create_vocabulary(it, ngram, stopwords)}
42: create_vocabulary.itoken_parallel(it = it, ngram = seq.int(ngram))
43: create_vocabulary(it = it, ngram = seq.int(ngram))
44: (function() {    v <- create_vocabulary(it = it, ngram = seq.int(ngram))    prune_vocabulary(vocabulary = v, doc_count_min = 10)})()
45: get_voc(file_path_groups = file_path_groups, ngram = config$ngram,     doc_min = config$doc_min, doc_prop_max = config$doc_prop_max)
An irrecoverable exception occurred. R is aborting now ...

 *** caught segfault ***
address 0x73657279, cause 'memory not mapped'

 *** caught segfault ***
address 0x65746569637f, cause 'memory not mapped'

 *** caught segfault ***
address 0x10, cause 'memory not mapped'

 *** caught segfault ***
address 0x557b9d78ebbb, cause 'memory not mapped'
@pommedeterresautee pommedeterresautee changed the title Lots of segfault since a few days with ifiles_parallel during building voc Lots of segfault since a few days with ifiles_parallel during building DTM voc May 29, 2018
@dselivanov
Copy link
Owner

dselivanov commented May 29, 2018 via email

@dselivanov dselivanov changed the title Lots of segfault since a few days with ifiles_parallel during building DTM voc Segfaults with itoken_parallel() May 29, 2018
@pommedeterresautee
Copy link
Author

pommedeterresautee commented May 29, 2018

I don't think it is the reason.
I removed stringi from the tokenizer and then I got:

geantvert@geantvert-Aspire-M5910:~/workspace/similar_legal_case/src$ Rscript build_dtm.R 
Le chargement a nécessité le package : foreach
Le chargement a nécessité le package : iterators
Le chargement a nécessité le package : parallel

 *** caught segfault ***
address 0x10, cause 'memory not mapped'

Traceback:
 1: fread(file = path, sep = "|", showProgress = FALSE, colClasses = "character",     header = TRUE, nThread = 1, select = 1:2)
 2: FUN(X[[i]], ...)
 3: lapply(X, FUN, ...)
 4: pbapply::pblapply(X = files, FUN = function(path) {    dt <- fread(file = path, sep = "|", showProgress = FALSE,         colClasses = "character", header = TRUE, nThread = 1,         select = 1:2)    (ncol(dt) == 2) & (nrow(dt) > 0)})
An irrecoverable exception occurred. R is aborting now ...
Erreur de segmentation
geantvert@geantvert-Aspire-M5910:~/workspace/similar_legal_case/src$ killall R
R: aucun processus trouvé
geantvert@geantvert-Aspire-M5910:~/workspace/similar_legal_case/src$ killall R
R: aucun processus trouvé
geantvert@geantvert-Aspire-M5910:~/workspace/similar_legal_case/src$ 

Each time I try I get a new error. The only thing coming again and again is 'memory not mapped'.

For what it worths, I replaced foreach and parallel installed from source by the precompiled one from Synaptic (in case my version is not ok) but it didn't helped.

@pommedeterresautee
Copy link
Author

pommedeterresautee commented May 31, 2018

Ok, many tests later, I think this is related to this issue:
https://stackoverflow.com/questions/43050763/weird-segfault-in-r-when-using-mclapply-in-linux

I didn't say it at first because I didn't thought it was a pattern, but the crash never happens during the first iteration, and is very likely to happen during the second or third one. So I think this is related to my issue and a simple case (not based on text2vec, just a foreach in a loop itself) seems to validate the theory.

If this is true, the function to merge large voc described there #263 should be designed carefully.

A possible clean solution : https://cran.r-project.org/web/packages/foreach/vignettes/nested.pdf (I am not super comfortable with foreach but seems the good candidate to replace my ugly nested loops)

@dselivanov
Copy link
Owner

Nice! I use preschedule = FALSE in create_dtm/create_tcm, but not in create_vocabulary -

text2vec/R/dtm.R

Lines 129 to 131 in 8742b36

# user already made split for jobs
# preschedule = FALSE is much more memory efficient
.options.multicore = list(preschedule = FALSE)) %dopar% {
.
I think I need to put it there as well.

dselivanov added a commit that referenced this issue May 31, 2018
@dselivanov
Copy link
Owner

@pommedeterresautee I've pushed update - please try latest version from master

@pommedeterresautee
Copy link
Author

Tested the last version but doesn't help.
I think the only solution is to work at the for each level.

@dselivanov
Copy link
Owner

Obviously I can't test it myself since there is no reproducible example. If you can test the solution from SO and it works - please send PR.

@pommedeterresautee
Copy link
Author

pommedeterresautee commented Jun 1, 2018

So far:

  • the solution presented on SO doesn't work in my case (the C code)
  • it s documented everywhere that mclapply and foreach are not friend, they tend to corrupt the inter thread communication of each other
  • even when I use the 1 thread version of build vocab inside mclapply, at some point it crashes, definitely foreach and mclapply are not friend

Right now, what seems to work (I run the code 3 times without crash, I hope it ll continue), is:

  • replace mclapply by a foreach / dopar
  • create explictely a cluster (without that it doesn't work)
  • use mono thread version of build vocab

My feeling right now is that there is lots of magick in multithreaded app on R. We may need something more clean. I am looking at future package which seems to make obvious the separation between backend and API, plus the API seems Ok for a future, but not tried.

I ll come back with new findings, if any...

@dselivanov
Copy link
Owner

I think future is a nice package, but it has even more "magic" which always comes when you add more abstraction layers.
In the meantime may be it worth to use just mclapply. foreach with socket cluster is not very efficient in current setup.

@pommedeterresautee
Copy link
Author

Thinking to it a little more, I am wondering if it s not better to add a Vignette instead of making new options for light cleaning of voc?

FYI, code is now:

config <- config::get()
set.seed(config$seed)

suppressMessages(library(text2vec))
suppressMessages(library(data.table))
library(doParallel)
library(foreach)
library(Matrix)
library(parallel)

# declare functions ----

print_with_time <- function(text) {
  print(paste(Sys.time(), "-", text))
}

read_dt_file <- function(path, ...) {
  dt <- data.table::fread(file = path,
                          sep = "|",
                          showProgress = FALSE,
                          colClasses = "character",
                          header = TRUE,
                          nThread = 1)
  dt <- dt[nchar(text) > config$size_char_min]
  text <- dt$text
  names(text) = dt$file
  text
}

# list files to vectorize ----

files <- list.files(path = config$all_prefixed,
                    pattern = ".txt",
                    full.names = TRUE)

# Generate vocabulary

print_with_time("Build vocab")

cl <- makeCluster(config$cpu)
registerDoParallel(cl)

full_voc <- foreach(file = files,
                    .combine = text2vec::combine_vocabularies,
                    .multicombine = TRUE,
                    .options.multicore = list(preschedule = FALSE)) %dopar% {

                      text <- read_dt_file(file)

                      it <- text2vec::itoken(iterable = text,
                                             ids = names(text),
                                             progressbar = FALSE)


                      voc <- text2vec::create_vocabulary(it = it, ngram = seq.int(config$ngram))
                      text2vec::prune_vocabulary(vocabulary = voc, doc_count_min = 2)
                    }

full_voc <- text2vec::prune_vocabulary(vocabulary = full_voc,
                                       doc_proportion_max = config$doc_prop_max,
                                       doc_count_min = config$doc_min)

stopCluster(cl)

vectorizer <- vocab_vectorizer(full_voc)

I have no opinion about mclapply / foreach, the only thing is to not use boths at the same time.

@dselivanov
Copy link
Owner

Parallel processing part was substantially reworked - I've dropped foreach and dropped parallel processing on windows. If there will be a reproducible example - feel free to reopen.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants