Skip to content
This repository has been archived by the owner on Jul 7, 2020. It is now read-only.

Cleanup of error handling in AbstractOutputWriter. #19

Closed
wants to merge 25 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
853263c
Merging spawn javascript minimization script into master.
Feb 5, 2014
c96de2a
Make sure to clear error state after enabling job
Feb 6, 2014
031adc0
[maven-release-plugin]prepare release v4.1.1
addthis-buildbot Feb 6, 2014
85054a7
[maven-release-plugin]prepare for next development iteration
addthis-buildbot Feb 6, 2014
fcb424c
Cleanup of logging level invocations. INFO is for informational errors.
Feb 6, 2014
3dc974a
Implement a new optional lazy loading mechanism for the plugin archit…
Feb 6, 2014
56910c0
[maven-release-plugin] bugfix to hydra 4.1.1. All users should upgrad…
addthis-buildbot Feb 6, 2014
f67c0ea
[maven-release-plugin] bugfix to hydra 4.1.1. All users should upgrad…
addthis-buildbot Feb 6, 2014
21b5f14
Don't update meters based on queries that never actually started.
Feb 7, 2014
b61a10b
fix an array logging statement in SSM
tea-dragon Feb 7, 2014
e64db9a
Adding additional documentation on the parameter 'hard' of the alias …
Feb 7, 2014
6aa279e
Instructions for OS X in README.mdown.
Feb 7, 2014
886bdb1
Adding test for executable commands that are prerequisites
Feb 7, 2014
c493aaf
Improvement to task logging output. Hydra processes units of data cal…
Feb 7, 2014
2b2968a
Adding ability to repair PageDB as a separate utility that can
Feb 10, 2014
2cde442
change log level for oplimit when additional bundles are received
abramsm Feb 10, 2014
def17b4
In the PageDB repair utility if a (key, value) pair in a page
Feb 10, 2014
e111922
inject QueryStatusObserver into more operations so that canceled or c…
abramsm Feb 10, 2014
79f01cd
add QueryStatusObserver to OpHistogram
abramsm Feb 10, 2014
64b2d06
Use cached value for queue size, removing need for sync lock
Feb 10, 2014
78a9f89
prevent NPE when there is no cell operation to perform
abramsm Feb 10, 2014
f6c38ea
Adding 'do not delete' parameter to job configurations that will prev…
Feb 10, 2014
fadbb19
default to PivotOp.SUM for cellop
abramsm Feb 10, 2014
294d41f
Initial cleanup of AbstractOutputWriter. Error messages
Feb 10, 2014
8c285c4
Cleanup of error handling in AbstractOutputWriter. If an
Feb 11, 2014
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,5 @@ hydra.ipr
hydra.iws
hydra-uber/src/main/resources/hydra-git.properties
*.pyc
hydra-main/web/spawn2/css/main.min.css
hydra-main/web/spawn2/js/main.min.js
9 changes: 8 additions & 1 deletion README.mdown
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,15 @@ adding the [fl-cow](http://xmailserver.org/flcow.html) library to

Many components assume that there is a local user called `hydra` and
that all minion nodes can ssh as that user to each other. This is
used most prominently for `rsync` based replicas.
used most prominently for `rsync` based replicas. The user `hydra`
is not necessary when running a local-stack environment (see below).

### OS X

On OS X several utilities are necessary to run the local-stack environment:

brew install coreutils
brew install wget

## Components

Expand Down
2 changes: 1 addition & 1 deletion hydra-avro/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
<parent>
<groupId>com.addthis.hydra</groupId>
<artifactId>hydra-parent</artifactId>
<version>4.1.1-SNAPSHOT</version>
<version>4.1.3-SNAPSHOT</version>
</parent>

<artifactId>hydra-avro</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion hydra-data/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
<parent>
<groupId>com.addthis.hydra</groupId>
<artifactId>hydra-parent</artifactId>
<version>4.1.1-SNAPSHOT</version>
<version>4.1.3-SNAPSHOT</version>
</parent>

<artifactId>hydra-data</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,13 @@ public void close() throws IOException {
}

@Override
public void sendTable(DataTable table) {
public void sendTable(DataTable table, QueryStatusObserver queryStatusObserver) {
for (Bundle row : table) {
send(row);
if (queryStatusObserver.queryCompleted || queryStatusObserver.queryCancelled) {
break;
} else {
send(row);
}
}
sendComplete();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@ public abstract class AbstractTableOp extends AbstractQueryOp implements QueryTa
protected DataTable table;

private final DataTableFactory tableFactory;
private final QueryStatusObserver queryStatusObserver;

public AbstractTableOp(DataTableFactory tableFactory) {
public AbstractTableOp(DataTableFactory tableFactory, QueryStatusObserver queryStatusObserver) {
this.tableFactory = tableFactory;
this.queryStatusObserver = queryStatusObserver;
}

public abstract DataTable tableOp(DataTable table);
Expand Down Expand Up @@ -66,7 +68,7 @@ public void sendTable(DataTable table) {
private void complete() {
QueryOp next = getNext();
if (next != null) {
next.sendTable(getTable());
next.sendTable(getTable(), queryStatusObserver);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public interface QueryOp extends BundleOutput, Closeable {
/**
* @return true if this is an instance of table op
*/
public void sendTable(DataTable table);
public void sendTable(DataTable table, QueryStatusObserver queryStatusObserver);

/**
* @return simple debugging name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ private void parseOps(String... opslist) {
appendOp(new OpRoll.AvgOpRoll(args));
break;
case CHANGEPOINTS:
appendOp(new OpChangePoints(this, args));
appendOp(new OpChangePoints(this, args, queryStatusObserver));
break;
case COMPARE:
appendOp(new OpCompare(args));
Expand All @@ -244,10 +244,10 @@ private void parseOps(String... opslist) {
appendOp(new OpDePivot(this, args));
break;
case DIFF:
appendOp(new OpDiff(this, args));
appendOp(new OpDiff(this, args, queryStatusObserver));
break;
case DISORDER:
appendOp(new OpDisorder(this, args));
appendOp(new OpDisorder(this, args, queryStatusObserver));
break;
case DSORT:
appendOp(new OpDiskSort(args, TMP_SORT_DIR_STRING, queryStatusObserver));
Expand All @@ -259,16 +259,16 @@ private void parseOps(String... opslist) {
appendOp(new OpFold(args));
break;
case FREQUENCYTABLE:
appendOp(new OpFrequencyTable(this, args));
appendOp(new OpFrequencyTable(this, args, queryStatusObserver));
break;
case GATHER:
appendOp(new OpGather(args, memTip, rowTip, tempDir.getPath(), queryStatusObserver));
break; // TODO move OpTop code into OpGather and delete OpTop
case HISTOGRAM:
appendOp(new OpHistogram(args));
appendOp(new OpHistogram(args, queryStatusObserver));
break;
case DISTRIBUTION:
appendOp(new OpPercentileDistribution(this, args));
appendOp(new OpPercentileDistribution(this, args, queryStatusObserver));
break;
case LIMIT:
appendOp(new OpLimit(args, queryStatusObserver));
Expand All @@ -283,7 +283,7 @@ private void parseOps(String... opslist) {
appendOp(new OpRoll.MaxOpRoll(args));
break;
case MEDIAN:
appendOp(new OpMedian(this));
appendOp(new OpMedian(this, queryStatusObserver));
break;
case MERGE:
appendOp(new OpMerge(args, queryStatusObserver));
Expand All @@ -307,25 +307,25 @@ private void parseOps(String... opslist) {
appendOp(new OpFill(args, true));
break;
case PERCENTRANK:
appendOp(new OpPercentileRank(this, args));
appendOp(new OpPercentileRank(this, args, queryStatusObserver));
break;
case PIVOT:
appendOp(new OpPivot(this, args));
appendOp(new OpPivot(this, args, queryStatusObserver));
break;
case RANGE:
appendOp(new OpRange(this, args));
appendOp(new OpRange(this, args, queryStatusObserver));
break;
case REVERSE:
appendOp(new OpReverse(this));
appendOp(new OpReverse(this, queryStatusObserver));
break;
case RMSING:
appendOp(new OpRemoveSingletons(this, args));
appendOp(new OpRemoveSingletons(this, args, queryStatusObserver));
break;
case RND_FAIL:
appendOp(new OpRandomFail(args));
break;
case SEEN:
appendOp(new OpSeen(this, args));
appendOp(new OpSeen(this, args, queryStatusObserver));
break;
case SKIP:
appendOp(new OpSkip(args));
Expand All @@ -350,7 +350,7 @@ private void parseOps(String... opslist) {
appendOp(new OpGather(args, memTip, rowTip, tempDir.getPath(), queryStatusObserver));
break;
case TRANSPOSE:
appendOp(new OpTranspose(this));
appendOp(new OpTranspose(this, queryStatusObserver));
break;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@

public class ResultAccumulator extends AbstractTableOp {

public ResultAccumulator(QueryOpProcessor processor) {
super(processor);
public ResultAccumulator(QueryOpProcessor processor, QueryStatusObserver queryStatusObserver) {
super(processor, queryStatusObserver);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.addthis.bundle.util.BundleColumnBinder;
import com.addthis.bundle.value.ValueFactory;
import com.addthis.hydra.data.query.AbstractTableOp;
import com.addthis.hydra.data.query.QueryStatusObserver;
import com.addthis.hydra.data.util.ChangePoint;
import com.addthis.hydra.data.util.FindChangePoints;

Expand All @@ -46,8 +47,8 @@ public class OpChangePoints extends AbstractTableOp {
int inactiveThreshold;
int windowSize;

public OpChangePoints(DataTableFactory factory, String args) {
super(factory);
public OpChangePoints(DataTableFactory factory, String args, QueryStatusObserver queryStatusObserver) {
super(factory, queryStatusObserver);
try {
String[] opt = args.split(":");
timeColumn = opt.length >= 1 ? Integer.parseInt(opt[0]) : 0;
Expand All @@ -57,10 +58,10 @@ public OpChangePoints(DataTableFactory factory, String args) {
minZScore = opt.length >= 5 ? Double.parseDouble(opt[4]) : 1.5;
inactiveThreshold = opt.length >= 6 ? Integer.parseInt(opt[5]) : 1;
windowSize = opt.length >= 7 ? Integer.parseInt(opt[6]) : 5;
log.warn("Initiated changepoints with parameters " +
log.info("Initiated changepoints with parameters " +
Strings.join(new Object[]{valColumn, minChange, minRatio, minZScore, inactiveThreshold}, ","));
} catch (Exception ex) {
log.warn("", ex);
log.error("", ex);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.addthis.bundle.value.ValueString;
import com.addthis.hydra.data.query.AbstractTableOp;
import com.addthis.hydra.data.query.QueryOpProcessor;
import com.addthis.hydra.data.query.QueryStatusObserver;


/**
Expand Down Expand Up @@ -55,8 +56,8 @@ private static enum DiffOp {

private ColumnType type[];

public OpDiff(DataTableFactory tableFactory, String args) {
super(tableFactory);
public OpDiff(DataTableFactory tableFactory, String args, QueryStatusObserver queryStatusObserver) {
super(tableFactory, queryStatusObserver);
type = new ColumnType[args.length()];
for (int i = 0; i < args.length(); i++) {
switch (args.charAt(i)) {
Expand All @@ -80,8 +81,8 @@ public OpDiff(DataTableFactory tableFactory, String args) {
}
}

public OpDiff(QueryOpProcessor processor, ColumnType type[]) {
super(processor);
public OpDiff(QueryOpProcessor processor, ColumnType type[], QueryStatusObserver queryStatusObserver) {
super(processor, queryStatusObserver);
this.type = type;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.addthis.bundle.util.ValueUtil;
import com.addthis.bundle.value.ValueFactory;
import com.addthis.hydra.data.query.AbstractTableOp;
import com.addthis.hydra.data.query.QueryStatusObserver;


/**
Expand Down Expand Up @@ -58,8 +59,8 @@ public class OpDisorder extends AbstractTableOp {
private int secondary;
private int frequency;

public OpDisorder(DataTableFactory tableFactory, String args) {
super(tableFactory);
public OpDisorder(DataTableFactory tableFactory, String args, QueryStatusObserver queryStatusObserver) {
super(tableFactory, queryStatusObserver);
String[] split = args.split(":");
if (split.length < 2 || split.length > 3) {
throw new IllegalArgumentException("expected disorder=p:s[:f], got " + args);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.addthis.hydra.data.query.AbstractTableOp;
import com.addthis.hydra.data.query.QueryOpProcessor;

import com.addthis.hydra.data.query.QueryStatusObserver;
import com.google.common.base.Objects;

/**
Expand All @@ -45,8 +46,8 @@ public class OpFrequencyTable extends AbstractTableOp {
private boolean appendTotal = false;

// foo=0,1,2,3:4,5:0.99,p12
public OpFrequencyTable(QueryOpProcessor processor, String args) {
super(processor);
public OpFrequencyTable(QueryOpProcessor processor, String args, QueryStatusObserver queryStatusObserver) {
super(processor, queryStatusObserver);
String tuple[] = Strings.splitArray(args, ":");
cols = Strings.splitArray(tuple[0], ",");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.addthis.bundle.util.ValueUtil;
import com.addthis.bundle.value.ValueFactory;
import com.addthis.hydra.data.query.AbstractRowOp;
import com.addthis.hydra.data.query.QueryStatusObserver;
import com.addthis.hydra.data.util.KeyHistogram;


Expand All @@ -39,6 +40,7 @@ public class OpHistogram extends AbstractRowOp {

private final int scale;
private final int column;
private final QueryStatusObserver queryStatusObserver;

/**
* usage: column, scale
Expand All @@ -48,7 +50,8 @@ public class OpHistogram extends AbstractRowOp {
*
* @param args
*/
public OpHistogram(String args) {
public OpHistogram(String args, QueryStatusObserver queryStatusObserver) {
this.queryStatusObserver = queryStatusObserver;
int v[] = csvToInts(args);
if (v.length < 1) {
throw new RuntimeException("missing required column");
Expand Down Expand Up @@ -77,10 +80,14 @@ public Bundle rowOp(Bundle row) {
public void sendComplete() {
Map<Long, Long> map = histo.getSortedHistogram();
for (Entry<Long, Long> e : map.entrySet()) {
Bundle row = rowFactory.createBundle();
binder.appendColumn(row, ValueFactory.create(e.getKey()));
binder.appendColumn(row, ValueFactory.create(e.getValue()));
getNext().send(row);
if (queryStatusObserver.queryCompleted || queryStatusObserver.queryCancelled) {
break;
} else {
Bundle row = rowFactory.createBundle();
binder.appendColumn(row, ValueFactory.create(e.getKey()));
binder.appendColumn(row, ValueFactory.create(e.getValue()));
getNext().send(row);
}
}
super.sendComplete();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,10 @@ protected void setup(int limit, int offset) {

@Override
public void send(Bundle row) throws DataChannelError {
if (queryStatusObserver.queryCompleted ) {
if (queryStatusObserver.queryCompleted) {
// Someone is attempting to send data even after we marked the query completed to true flag. This means
// they are doing work and sending us bundles. Throw an exception because that needs to be checked.
log.error("Limit reached, sendComplete was called.");
log.trace("Limit reached, sendComplete was called.");
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.addthis.bundle.table.DataTable;
import com.addthis.hydra.data.query.AbstractTableOp;
import com.addthis.hydra.data.query.QueryOpProcessor;
import com.addthis.hydra.data.query.QueryStatusObserver;


/**
Expand All @@ -33,8 +34,8 @@
*/
public class OpMedian extends AbstractTableOp {

public OpMedian(QueryOpProcessor processor) {
super(processor);
public OpMedian(QueryOpProcessor processor, QueryStatusObserver queryStatusObserver) {
super(processor, queryStatusObserver);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.addthis.bundle.util.ValueUtil;
import com.addthis.bundle.value.ValueFactory;
import com.addthis.hydra.data.query.AbstractTableOp;
import com.addthis.hydra.data.query.QueryStatusObserver;
import com.addthis.hydra.data.util.KeyPercentileDistribution;

import com.yammer.metrics.stats.Snapshot;
Expand Down Expand Up @@ -49,8 +50,8 @@ public class OpPercentileDistribution extends AbstractTableOp {
* @param tableFactory
* @param args
*/
public OpPercentileDistribution(DataTableFactory tableFactory, String args) {
super(tableFactory);
public OpPercentileDistribution(DataTableFactory tableFactory, String args, QueryStatusObserver queryStatusObserver) {
super(tableFactory, queryStatusObserver);
int v[] = csvToInts(args);
if (v.length < 1) {
throw new RuntimeException("missing required column");
Expand Down
Loading