Skip to content

Commit

Permalink
Improved per-option timeout handling #9
Browse files Browse the repository at this point in the history
  • Loading branch information
elmer-garduno committed Dec 12, 2012
1 parent d32a115 commit da29202
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 18 deletions.
51 changes: 35 additions & 16 deletions src/main/java/edu/cmu/lti/oaqa/ecd/phase/BasePhase.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,16 @@
// The superclass also adds this annotation, we just want to be explicit about doing it
@OperationalProperties(outputsNewCases = true)
public final class BasePhase extends JCasMultiplier_ImplBase {

private final ExecutorService executor = Executors.newSingleThreadExecutor();

public static final String QA_INTERNAL_PHASEID = "__.qa.internal.phaseid.__";

private static final String TIMEOUT_KEY = "option-timeout";

private static final String CROSS_PARAMS_KEY = "cross-opts";

private static final int DEFAULT_OPTION_TIMEOUT = 5;

private AnalysisEngine[] options;

Expand All @@ -88,6 +93,8 @@ public final class BasePhase extends JCasMultiplier_ImplBase {

private Integer phaseNo;

private Integer optionTimeout;

private String phaseName;

private PhasePersistenceProvider persistence;
Expand All @@ -103,6 +110,10 @@ public void initialize(UimaContext ctx) throws ResourceInitializationException {
this.persistence = BaseExperimentBuilder.loadProvider(pp, PhasePersistenceProvider.class);
this.phaseName = (String) ctx.getConfigParameterValue("name");
this.phaseNo = (Integer) ctx.getConfigParameterValue(QA_INTERNAL_PHASEID);
this.optionTimeout = (Integer) ctx.getConfigParameterValue(TIMEOUT_KEY);
if (optionTimeout == null) {
this.optionTimeout = DEFAULT_OPTION_TIMEOUT;
}
System.out.println("Phase: " + toString());
String experimentId = (String) ctx
.getConfigParameterValue(BaseExperimentBuilder.EXPERIMENT_UUID_PROPERTY);
Expand Down Expand Up @@ -180,25 +191,32 @@ private void process(final AnalysisEngine ae, JCas nextCas, String prevCasId, Tr
@Override
public void run() {
try {
ae.process(wrapped); // Process the next option
// Process the next option
ae.process(wrapped);
} catch (Exception e) {
Throwables.propagate(e);
Throwables.propagate(e); // Propagate the exception so the thread effectively dies.
}
}
});
future.get(15, TimeUnit.MINUTES);
long b = System.currentTimeMillis();
updateTrace(nextCas, optionId, key);
storeCas(nextCas, b, key);
System.out.printf("[%s] Execution time for option %s: %ss\n", sequenceId, optionId,
(b - a) / 1000);
} catch (TimeoutException e) {
wrapped.invalidate();
long b = System.currentTimeMillis();
storeException(b, e, key, ExecutionStatus.TIMEOUT);
System.out.printf("[%s] Execution timed out for option: %s after %ss\n", sequenceId,
optionId, (b - a) / 1000);
throw e; // Re-throw exception to allow the Flow controller do its job
try {
future.get(optionTimeout, TimeUnit.MINUTES);
long b = System.currentTimeMillis();
updateTrace(nextCas, optionId, key);
storeCas(nextCas, b, key);
System.out.printf("[%s] Execution time for option %s: %ss\n", sequenceId, optionId,
(b - a) / 1000);
} catch (TimeoutException e) {
// If the task is taking to long, most likely it is reading from a resource
// If it is an iterative algorithm, another approach must be taken
boolean canceled = future.cancel(true);
wrapped.invalidate();
long b = System.currentTimeMillis();
storeException(b, e, key, ExecutionStatus.TIMEOUT);
System.out.printf(
"[%s] Execution timed out for option: %s after %ss (task was interrupted=%s)\n",
sequenceId, optionId, (b - a) / 1000, canceled);
throw e; // Re-throw exception to allow the Flow controller do its job
}
} catch (Exception e) {
long b = System.currentTimeMillis();
try {
Expand Down Expand Up @@ -447,7 +465,8 @@ private Set<List<Object>> doCartesianProduct(AnyObject crossParams) {
return product;
}

private void setInnerParams(List<String> paramNames, List<Object> configuration, Map<String, Object> inner) {
private void setInnerParams(List<String> paramNames, List<Object> configuration,
Map<String, Object> inner) {
for (int i = 0; i < paramNames.size(); i++) {
String key = paramNames.get(i);
Object value = configuration.get(i);
Expand Down
9 changes: 7 additions & 2 deletions src/main/java/edu/cmu/lti/oaqa/ecd/phase/WrappedJCas.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,20 @@ public WrappedJCas(JCas aCas) {
}

void invalidate() {
// TODO: This is one of the places where we hacked the solution
// TODO: Fixme: this is one of the places where we hacked the timeout solution
delegate.getCasImpl().enableReset(true);
delegate.release();
this.alive = false;
}

/**
* All methods in the wrapper class invoke this method, to fail fast if the
* CAS has already been discarded. This will effectively teminate the thread
* that invoked it.
*/
private void testLiveness() {
if (!alive) {
throw new IllegalStateException("Cas was already death");
throw new IllegalStateException("CAS was already discarded");
}
}

Expand Down

0 comments on commit da29202

Please sign in to comment.