Skip to content

Commit

Permalink
add try counts and increase time interval
Browse files Browse the repository at this point in the history
  • Loading branch information
lythesia committed Feb 20, 2015
1 parent 4cd7d3f commit 179ab38
Showing 1 changed file with 19 additions and 8 deletions.
27 changes: 19 additions & 8 deletions pkg/R/sparkR.R
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ sparkR.init <- function(
sparkExecutorEnv = list(),
sparkJars = "",
sparkRLibDir = "",
sparkRBackendPort = 12345) {
sparkRBackendPort = 12345,
sparkRRetryCount = 6) {

if (exists(".sparkRjsc", envir = .sparkREnv)) {
cat("Re-using existing Spark Context. Please stop SparkR with sparkR.stop() or restart R to create a new Spark Context\n")
Expand All @@ -111,18 +112,28 @@ sparkR.init <- function(
args = as.character(sparkRBackendPort),
javaOpts = paste("-Xmx", sparkMem, sep = ""))

cat("Waiting JVM bring up ...\n")
while(TRUE) {
.sparkREnv$sparkRBackendPort <- sparkRBackendPort
cat("Waiting for JVM to come up...\n")
tries <- 0
while(tries < sparkRRetryCount) {
if(!connExists(.sparkREnv)) {
Sys.sleep(1)
cat(".")
connectBackend("localhost", sparkRBackendPort) # Connect to it
Sys.sleep(2 ^ tries)
tryCatch({
connectBackend("localhost", .sparkREnv$sparkRBackendPort)
}, error = function(err) {
cat("Error in Connection, retrying...\n")
}, warning = function(war) {
cat("No Connection Found, retrying...\n")
})
tries <- tries + 1
} else {
cat(" ok.\n")
cat("Connect ok.\n")
break
}
}
.sparkREnv$sparkRBackendPort <- sparkRBackendPort
if (tries == sparkRRetryCount) {
stop(sprintf("Failed to connect JVM after %d tries.\n", sparkRRetryCount))
}

if (nchar(sparkHome) != 0) {
sparkHome <- normalizePath(sparkHome)
Expand Down

0 comments on commit 179ab38

Please sign in to comment.