diff --git a/src/jvm/backtype/storm/task/ShellBolt.java b/src/jvm/backtype/storm/task/ShellBolt.java index 3ff8696c7..854aa8f53 100644 --- a/src/jvm/backtype/storm/task/ShellBolt.java +++ b/src/jvm/backtype/storm/task/ShellBolt.java @@ -124,6 +124,8 @@ public void run() { if (write != null) { _process.writeMessage(write); } + // drain the error stream to avoid dead lock because of full error stream buffer + _process.drainErrorStream(); } catch (InterruptedException e) { } catch (Throwable t) { die(t); diff --git a/src/jvm/backtype/storm/utils/ShellProcess.java b/src/jvm/backtype/storm/utils/ShellProcess.java index e4ee7c4c8..011b60691 100644 --- a/src/jvm/backtype/storm/utils/ShellProcess.java +++ b/src/jvm/backtype/storm/utils/ShellProcess.java @@ -13,8 +13,10 @@ import org.apache.commons.io.IOUtils; import org.json.simple.JSONObject; import org.json.simple.JSONValue; +import org.apache.log4j.Logger; public class ShellProcess { + public static Logger LOG = Logger.getLogger(ShellProcess.class); private DataOutputStream processIn; private BufferedReader processOut; private InputStream processErrorStream; @@ -80,6 +82,22 @@ public String getErrorsString() { } } + public void drainErrorStream() + { + try { + while (processErrorStream.available() > 0) + { + int bufferSize = processErrorStream.available(); + byte[] errorReadingBuffer = new byte[bufferSize]; + + processErrorStream.read(errorReadingBuffer, 0, bufferSize); + + LOG.info("Got error from shell process: " + new String(errorReadingBuffer)); + } + } catch(Exception e) { + } + } + private String readString() throws IOException { StringBuilder line = new StringBuilder(); @@ -95,8 +113,6 @@ private String readString() throws IOException { else { errorMessage.append(" Currently read output: " + line.toString() + "\n"); } - errorMessage.append("Shell Process Exception:\n"); - errorMessage.append(getErrorsString() + "\n"); throw new RuntimeException(errorMessage.toString()); } if(subline.equals("end")) {