Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Forward errors from gcd.sh #250

Merged
merged 11 commits into from
Oct 16, 2015
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.google.gcloud.datastore.testing;

import static com.google.common.base.MoreObjects.firstNonNull;
import static java.nio.charset.StandardCharsets.UTF_8;

import com.google.common.base.Strings;
Expand Down Expand Up @@ -54,23 +55,23 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.regex.Pattern;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;

import java.util.logging.Level;
import java.util.logging.Logger;

/**
* Utility to start and stop local Google Cloud Datastore process.
*/
public class LocalGcdHelper {

private static final Logger log = Logger.getLogger(LocalGcdHelper.class.getName());

private final String projectId;
private Path gcdPath;
private Process startProcess;
private ProcessStreamReader processReader;
private ProcessErrorStreamReader processErrorReader;
private final int port;

public static final String DEFAULT_PROJECT_ID = "projectid1";
Expand Down Expand Up @@ -179,49 +180,139 @@ private static Path executablePath(String cmd) {
}

private static class ProcessStreamReader extends Thread {

private final Process process;
private final BufferedReader reader;
private volatile boolean terminated;

ProcessStreamReader(Process process, String blockUntil) throws IOException {
ProcessStreamReader(InputStream inputStream) {
super("Local GCD InputStream reader");
setDaemon(true);
this.process = process;
reader = new BufferedReader(new InputStreamReader(process.getInputStream()));
reader = new BufferedReader(new InputStreamReader(inputStream));
}

void terminate() throws IOException {
terminated = true;
reader.close();

This comment was marked as spam.

}

@Override
public void run() {
while (!terminated) {
try {
String line = reader.readLine();
if (line == null) {
terminated = true;
}
} catch (IOException e) {
// ignore
}
}
}

public static ProcessStreamReader start(InputStream inputStream) {
ProcessStreamReader thread = new ProcessStreamReader(inputStream);
thread.start();
return thread;
}
}

private static class ProcessErrorStreamReader extends Thread {
private static final int LOG_LENGTH_LIMIT = 50000;
private static final String GCD_LOGGING_CLASS =
"com.google.apphosting.client.serviceapp.BaseApiServlet";

private final BufferedReader errorReader;
private StringBuilder currentLog;
private Level currentLogLevel;
private boolean collectionMode;
private volatile boolean terminated;

ProcessErrorStreamReader(InputStream errorStream, String blockUntil) throws IOException {
super("Local GCD ErrorStream reader");
setDaemon(true);
errorReader = new BufferedReader(new InputStreamReader(errorStream));
if (!Strings.isNullOrEmpty(blockUntil)) {
String line;
do {
line = reader.readLine();
line = errorReader.readLine();
} while (line != null && !line.contains(blockUntil));
}
}

void terminate() throws InterruptedException, IOException {
process.destroy();
process.waitFor();
reader.close();
void terminate() throws IOException {
terminated = true;
errorReader.close();
}

@Override
public void run() {
try {
while (reader.readLine() != null) {
// consume line
String previousLine = "";
String nextLine = "";
while (!terminated) {
try {
previousLine = nextLine;
nextLine = errorReader.readLine();
if (nextLine == null) {
terminated = true;
} else {
processLogLine(previousLine, nextLine);
}
} catch (IOException e) {
// ignore
}
}
processLogLine(previousLine, firstNonNull(nextLine, ""));
writeLog(currentLogLevel, currentLog);
}

private void processLogLine(String previousLine, String nextLine) {
// Each gcd log is two lines with the following format:
// [Date] [Time] [GCD_LOGGING_CLASS] [method]
// [LEVEL]: error message
// Exceptions and stack traces are included in gcd error stream, separated by a newline
Level nextLogLevel = getLevel(nextLine);
if (nextLogLevel != null) {
writeLog(currentLogLevel, currentLog);
currentLog = new StringBuilder();
currentLogLevel = nextLogLevel;
collectionMode = previousLine.contains(GCD_LOGGING_CLASS);
} else if (collectionMode) {
if (currentLog.length() > LOG_LENGTH_LIMIT) {
collectionMode = false;
} else if (currentLog.length() == 0) {
// strip level out of the line
currentLog.append("GCD");
currentLog.append(previousLine.split(":", 2)[1]);
currentLog.append(System.getProperty("line.separator"));
} else {
currentLog.append(previousLine);
currentLog.append(System.getProperty("line.separator"));
}
} catch (IOException e) {
// ignore
}
}

public static ProcessStreamReader start(Process process, String blockUntil) throws IOException {
ProcessStreamReader thread = new ProcessStreamReader(process, blockUntil);
private static void writeLog(Level level, StringBuilder msg) {
if (level != null && msg != null && msg.length() != 0) {
log.log(level, msg.toString().trim());
}
}

private static Level getLevel(String line) {
try {
return Level.parse(line.split(":")[0]);
} catch (IllegalArgumentException e) {
return null; // level wasn't supplied in this log line
}
}

public static ProcessErrorStreamReader start(InputStream errorStream, String blockUntil)
throws IOException {
ProcessErrorStreamReader thread = new ProcessErrorStreamReader(errorStream, blockUntil);
thread.start();
return thread;
}
}

private static class CommandWrapper {

private final List<String> prefix;
private List<String> command;
private String nullFilename;
Expand Down Expand Up @@ -392,13 +483,15 @@ private void startGcd(Path executablePath) throws IOException, InterruptedExcept
if (log.isLoggable(Level.FINE)) {
log.log(Level.FINE, "Starting datastore emulator for the project: {0}", projectId);
}
Process startProcess = CommandWrapper.create()
.command(gcdAbsolutePath.toString(), "start", "--testing", "--allow_remote_shutdown",
"--port=" + Integer.toString(port), projectId)
.directory(gcdPath)
.redirectErrorStream()
.start();
processReader = ProcessStreamReader.start(startProcess, "Dev App Server is now running");
startProcess =
CommandWrapper.create()
.command(gcdAbsolutePath.toString(), "start", "--testing", "--allow_remote_shutdown",
"--port=" + Integer.toString(port), projectId)
.directory(gcdPath)
.start();
processReader = ProcessStreamReader.start(startProcess.getInputStream());
processErrorReader = ProcessErrorStreamReader.start(
startProcess.getErrorStream(), "Dev App Server is now running");
}

private static String md5(File gcdZipFile) throws IOException {
Expand Down Expand Up @@ -454,6 +547,9 @@ public void stop() throws IOException, InterruptedException {
sendQuitRequest(port);
if (processReader != null) {
processReader.terminate();
processErrorReader.terminate();
startProcess.destroy();
startProcess.waitFor();
}
if (gcdPath != null) {
deleteRecurse(gcdPath);
Expand All @@ -465,7 +561,6 @@ private static void deleteRecurse(Path path) throws IOException {
return;
}
Files.walkFileTree(path, new SimpleFileVisitor<Path>() {

@Override
public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
Files.delete(dir);
Expand All @@ -480,7 +575,7 @@ public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IO
});
}

public static LocalGcdHelper start(String projectId, int port)
public static LocalGcdHelper start(String projectId, int port)
throws IOException, InterruptedException {
LocalGcdHelper helper = new LocalGcdHelper(projectId, port);
helper.start();
Expand All @@ -490,15 +585,14 @@ public static LocalGcdHelper start(String projectId, int port)
public static void main(String... args) throws IOException, InterruptedException {
Map<String, String> parsedArgs = parseArgs(args);
String action = parsedArgs.get("action");
int port = (parsedArgs.get("port") == null) ? DEFAULT_PORT
: Integer.parseInt(parsedArgs.get("port"));
int port =
(parsedArgs.get("port") == null) ? DEFAULT_PORT : Integer.parseInt(parsedArgs.get("port"));
switch (action) {
case "START":
if (!isActive(DEFAULT_PROJECT_ID, port)) {
LocalGcdHelper helper = start(DEFAULT_PROJECT_ID, port);
try (FileWriter writer = new FileWriter(".local_gcd_helper")) {
writer.write(
helper.gcdPath.toAbsolutePath().toString() + System.lineSeparator());
writer.write(helper.gcdPath.toAbsolutePath().toString() + System.lineSeparator());
writer.write(Integer.toString(port));
}
}
Expand Down