Skip to content
This repository has been archived by the owner on Jul 7, 2020. It is now read-only.

Commit

Permalink
Merge pull request #279 from addthis/minion-servlet-context
Browse files Browse the repository at this point in the history
use servlets in minion http handling
  • Loading branch information
stevedonnelly authored Jun 4, 2018
2 parents a92d483 + 6a3165e commit a878dd5
Show file tree
Hide file tree
Showing 2 changed files with 129 additions and 71 deletions.
7 changes: 5 additions & 2 deletions hydra-main/src/main/java/com/addthis/hydra/minion/Minion.java
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@

import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import org.slf4j.Logger;
Expand Down Expand Up @@ -196,7 +197,8 @@ public static void main(String[] args) throws Exception {
final AtomicLong diskTotal = new AtomicLong(0);
final AtomicLong diskFree = new AtomicLong(0);
final Server jetty;
final MinionHandler minionHandler = new MinionHandler(this);
private ServletContextHandler servletContext = new MinionServletContext(this).build();

boolean diskReadOnly;
MinionWriteableDiskCheck diskHealthCheck;
int minionPid = -1;
Expand Down Expand Up @@ -268,7 +270,8 @@ private Minion(@JsonProperty("dataDir") File rootDir,
activeTaskKeys = new HashSet<>();
jetty = new Server(webPort);

jetty.setHandler(minionHandler);

jetty.setHandler(servletContext);
jetty.start();

waitForJetty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package com.addthis.hydra.minion;

import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

Expand All @@ -29,77 +30,61 @@
import com.addthis.hydra.util.PrometheusServletCreator;
import com.addthis.maljson.JSONObject;

import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class MinionHandler extends AbstractHandler {
private static final Logger log = LoggerFactory.getLogger(MinionHandler.class);
public class MinionServletContext {
private static final Logger log = LoggerFactory.getLogger(MinionServletContext.class);
Minion minion;

private final Minion minion;
final ServletContextHandler metricsHandler = new ServletContextHandler();

public MinionHandler(Minion minion) {
public MinionServletContext(Minion minion) {
this.minion = minion;
PrometheusServletCreator.create(metricsHandler);
}

@Override
public void doStop() {
try {
// stop prometheus jetty handler
metricsHandler.stop();
} catch (Exception ex) {
log.error("Unable to stop prometheus handler", ex);
}
}
public ServletContextHandler build() {
ServletContextHandler servletContex = new ServletContextHandler();
PrometheusServletCreator.create(servletContex);

@Override
public void doStart() {
try {
// start prometheus jetty handler
metricsHandler.start();
} catch (Exception ex) {
log.error("Unable to start prometheus handler", ex);
}
}
servletContex.addServlet(new ServletHolder(new PingServlet()), "/ping");
servletContex.addServlet(new ServletHolder(new JobPortServlet()), "/job.port");
servletContex.addServlet(new ServletHolder(new JobProfileServlet()), "/job.profile");
servletContex.addServlet(new ServletHolder(new JobHeadServlet()), "/job.head");
servletContex.addServlet(new ServletHolder(new JobTailServlet()), "/job.tail");
servletContex.addServlet(new ServletHolder(new JobLogServlet()), "/job.log");
servletContex.addServlet(new ServletHolder(new JobImportServlet()), "/jobs.import");
servletContex.addServlet(new ServletHolder(new FindExtPortServlet()), "/xdebug/findnextport");
servletContex.addServlet(new ServletHolder(new ActiveTasksServlet()), "/active.tasks");
servletContex.addServlet(new ServletHolder(new TaskSizeServlet()), "/task.size");

@Override
public void handle(String target,
Request request,
HttpServletRequest httpServletRequest,
HttpServletResponse httpServletResponse) throws IOException, ServletException {
try {
doHandle(target, request, httpServletRequest, httpServletResponse);
} catch (IOException | ServletException io) {
throw io;
} catch (Exception ex) {
throw new IOException(ex);
}
return servletContex;
}

public void doHandle(String target,
Request baseRequest,
HttpServletRequest request,
HttpServletResponse response) throws Exception {
response.setBufferSize(65535);
response.setHeader("Access-Control-Allow-Origin", "*");
response.setHeader("Access-Control-Allow-Headers", "accept, username");
response.setHeader("Access-Control-Allow-Methods", "GET, POST, PUT");
private static KVPairs parseRequest(HttpServletRequest request) {
KVPairs kv = new KVPairs();
boolean handled = true;
for (Enumeration<String> e = request.getParameterNames(); e.hasMoreElements(); ) {
String k = e.nextElement();
String v = request.getParameter(k);
kv.add(k, v);
}
if (target.equals("/ping")) {
return kv;
}

private static class PingServlet extends HttpServlet {
@Override
protected void doGet(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {
response.getWriter().write("ACK");
} else if (target.equals("/metrics")) {
metricsHandler.handle(target, baseRequest, request, response);
} else if (target.equals("/job.port")) {
}
}

private class JobPortServlet extends HttpServlet {
@Override
protected void doGet(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {
KVPairs kv = parseRequest(request);

String job = kv.getValue("id");
int taskID = kv.getIntValue("node", -1);
JobKey key = new JobKey(job, taskID);
Expand All @@ -109,15 +94,29 @@ public void doHandle(String target,
jobPort = task.getPort();
}
response.getWriter().write("{port:" + (jobPort != null ? jobPort : 0) + "}");
} else if (target.equals("/job.profile")) {
}
}

private class JobProfileServlet extends HttpServlet {
@Override
protected void doGet(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {
KVPairs kv = parseRequest(request);
String jobName = kv.getValue("id", "") + "/" + kv.getIntValue("node", 0);
JobTask job = minion.tasks.get(jobName);
if (job != null) {
response.getWriter().write(job.profile());
} else {
response.sendError(400, "No Job");
}
} else if (target.equals("/job.head")) {
}
}

private class JobHeadServlet extends HttpServlet {
@Override
protected void doGet(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {
KVPairs kv = parseRequest(request);
String jobName = kv.getValue("id", "") + "/" + kv.getIntValue("node", 0);
int lines = kv.getIntValue("lines", 10);
boolean out = !kv.getValue("out", "0").equals("0");
Expand All @@ -133,12 +132,23 @@ public void doHandle(String target,
response.setContentType("text/html");
response.getWriter().write(html);
} else {
response.getWriter().write(new JSONObject().put("out", outB).put("err", errB).toString());
try {
response.getWriter().write(new JSONObject().put("out", outB).put("err", errB).toString());
} catch (Exception e) {
log.error("Failed to generate response: ", e);
}
}
} else {
response.sendError(400, "No Job");
}
} else if (target.equals("/job.tail")) {
}
}

private class JobTailServlet extends HttpServlet {
@Override
protected void doGet(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {
KVPairs kv = parseRequest(request);
String jobName = kv.getValue("id", "") + "/" + kv.getIntValue("node", 0);
int lines = kv.getIntValue("lines", 10);
boolean out = !kv.getValue("out", "0").equals("0");
Expand All @@ -154,12 +164,23 @@ public void doHandle(String target,
response.setContentType("text/html");
response.getWriter().write(html);
} else {
response.getWriter().write(new JSONObject().put("out", outB).put("err", errB).toString());
try {
response.getWriter().write(new JSONObject().put("out", outB).put("err", errB).toString());
} catch (Exception e) {
log.error("Failed to generate response: ", e);
}
}
} else {
response.sendError(400, "No Job");
}
} else if (target.equals("/job.log")) {
}
}

private class JobLogServlet extends HttpServlet {
@Override
protected void doGet(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {
KVPairs kv = parseRequest(request);
String jobName = kv.getValue("id", "") + "/" + kv.getIntValue("node", 0);
int offset = kv.getIntValue("offset", -1);
int lines = kv.getIntValue("lines", 10);
Expand All @@ -174,33 +195,67 @@ public void doHandle(String target,
runsAgo -= 1;
logSuffix += ".bad";
}
response.getWriter().write(LogUtils.readLogLines(job, offset, lines, runsAgo, logSuffix).toString());
try {
response.getWriter()
.write(LogUtils.readLogLines(job, offset, lines, runsAgo, logSuffix).toString());
} catch (Exception e) {
log.error("Failed to generate response: ", e);
}
} else {
response.sendError(400, "No Job");
}
} else if (target.equals("/jobs.import")) {
}
}

private class JobImportServlet extends HttpServlet {
@Override
protected void doGet(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {
KVPairs kv = parseRequest(request);
int count = minion.updateJobsMeta(new File(kv.getValue("dir", ".")));
response.getWriter().write("imported " + count + " jobs");
} else if (target.equals("/xdebug/findnextport")) {
}
}

private static class FindExtPortServlet extends HttpServlet {
@Override
protected void doGet(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {
response.getWriter().write("port: " + 0);
} else if (target.equals("/active.tasks")) {
}
}

private class ActiveTasksServlet extends HttpServlet {
@Override
protected void doGet(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {
Minion.capacityLock.lock();
try {
response.getWriter().write("tasks: " + minion.activeTaskKeys.toString());
} finally {
Minion.capacityLock.unlock();
}
} else if (target.equals("/task.size")) {
}
}

private class TaskSizeServlet extends HttpServlet {
@Override
protected void doGet(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {
KVPairs kv = parseRequest(request);
String jobId = kv.getValue("id");
int taskId = kv.getIntValue("node", -1);
if (jobId != null && taskId >= 0) {
String duOutput = new SimpleExec(MacUtils.ducmd + " -s --block-size=1 " + ProcessUtils.getTaskBaseDir(
minion.rootDir.getAbsolutePath(), jobId, taskId)).join().stdoutString();
response.getWriter().write(duOutput.split("\t")[0]);
try {
String duOutput =
new SimpleExec(MacUtils.ducmd + " -s --block-size=1 " + ProcessUtils.getTaskBaseDir(
minion.rootDir.getAbsolutePath(), jobId, taskId)).join().stdoutString();
response.getWriter().write(duOutput.split("\t")[0]);
} catch (Exception e) {
log.error("Failed to generate response: ", e);
}
}
} else {
response.sendError(404);
}
((Request) request).setHandled(handled);
}

}

0 comments on commit a878dd5

Please sign in to comment.