Skip to content

Commit

Permalink
Merge pull request nathanmarz#424 from lyogavin/master
Browse files Browse the repository at this point in the history
Drain error stream periodically to avoid dead lock between ShellBolt and shell process
  • Loading branch information
nathanmarz committed Mar 26, 2013
2 parents d6b189f + 55d5b8d commit 1d58110
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 2 deletions.
2 changes: 2 additions & 0 deletions src/jvm/backtype/storm/task/ShellBolt.java
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ public void run() {
if (write != null) {
_process.writeMessage(write);
}
// drain the error stream to avoid dead lock because of full error stream buffer
_process.drainErrorStream();
} catch (InterruptedException e) {
} catch (Throwable t) {
die(t);
Expand Down
20 changes: 18 additions & 2 deletions src/jvm/backtype/storm/utils/ShellProcess.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@
import org.apache.commons.io.IOUtils;
import org.json.simple.JSONObject;
import org.json.simple.JSONValue;
import org.apache.log4j.Logger;

public class ShellProcess {
public static Logger LOG = Logger.getLogger(ShellProcess.class);
private DataOutputStream processIn;
private BufferedReader processOut;
private InputStream processErrorStream;
Expand Down Expand Up @@ -80,6 +82,22 @@ public String getErrorsString() {
}
}

public void drainErrorStream()
{
try {
while (processErrorStream.available() > 0)
{
int bufferSize = processErrorStream.available();
byte[] errorReadingBuffer = new byte[bufferSize];

processErrorStream.read(errorReadingBuffer, 0, bufferSize);

LOG.info("Got error from shell process: " + new String(errorReadingBuffer));
}
} catch(Exception e) {
}
}

private String readString() throws IOException {
StringBuilder line = new StringBuilder();

Expand All @@ -95,8 +113,6 @@ private String readString() throws IOException {
else {
errorMessage.append(" Currently read output: " + line.toString() + "\n");
}
errorMessage.append("Shell Process Exception:\n");
errorMessage.append(getErrorsString() + "\n");
throw new RuntimeException(errorMessage.toString());
}
if(subline.equals("end")) {
Expand Down

0 comments on commit 1d58110

Please sign in to comment.