From 7694401ddf8bc8c988dd70414f1aae8d1540e02b Mon Sep 17 00:00:00 2001 From: Gabriel Silk Date: Wed, 21 Nov 2012 09:33:56 -0800 Subject: [PATCH 01/15] ui: use wrap-reload to detect changes in ui and reload --- project.clj | 1 + src/clj/backtype/storm/ui/core.clj | 5 ++++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/project.clj b/project.clj index 555930795..7ad0356c8 100644 --- a/project.clj +++ b/project.clj @@ -22,6 +22,7 @@ [com.googlecode.json-simple/json-simple "1.1"] [compojure "0.6.4"] [hiccup "0.3.6"] + [ring/ring-devel "0.3.11"] [ring/ring-jetty-adapter "0.3.11"] [org.clojure/tools.logging "0.2.3"] [org.clojure/math.numeric-tower "0.0.1"] diff --git a/src/clj/backtype/storm/ui/core.clj b/src/clj/backtype/storm/ui/core.clj index ceb54fd94..9b5dde55b 100644 --- a/src/clj/backtype/storm/ui/core.clj +++ b/src/clj/backtype/storm/ui/core.clj @@ -1,5 +1,6 @@ (ns backtype.storm.ui.core (:use compojure.core) + (:use ring.middleware.reload) (:use [hiccup core page-helpers]) (:use [backtype.storm config util log]) (:use [backtype.storm.ui helpers]) @@ -794,7 +795,9 @@ )))) (def app - (handler/site (-> main-routes catch-errors ))) + (-> #'main-routes + (wrap-reload '[backtype.storm.ui.core]) + catch-errors)) (defn start-server! [] (run-jetty app {:port (Integer. (*STORM-CONF* UI-PORT)) :join? false})) From 55d5b8d52f5575b0f479386025572024091df29f Mon Sep 17 00:00:00 2001 From: Yu L Li Date: Mon, 17 Dec 2012 13:50:54 -0800 Subject: [PATCH 02/15] drain the error stream to avoid deak lock in shell process --- src/jvm/backtype/storm/task/ShellBolt.java | 2 ++ .../backtype/storm/utils/ShellProcess.java | 20 +++++++++++++++++-- 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/src/jvm/backtype/storm/task/ShellBolt.java b/src/jvm/backtype/storm/task/ShellBolt.java index 7c4e41f54..97d14dc59 100644 --- a/src/jvm/backtype/storm/task/ShellBolt.java +++ b/src/jvm/backtype/storm/task/ShellBolt.java @@ -123,6 +123,8 @@ public void run() { if (write != null) { _process.writeMessage(write); } + // drain the error stream to avoid dead lock because of full error stream buffer + _process.drainErrorStream(); } catch (InterruptedException e) { } catch (Throwable t) { die(t); diff --git a/src/jvm/backtype/storm/utils/ShellProcess.java b/src/jvm/backtype/storm/utils/ShellProcess.java index e4ee7c4c8..011b60691 100644 --- a/src/jvm/backtype/storm/utils/ShellProcess.java +++ b/src/jvm/backtype/storm/utils/ShellProcess.java @@ -13,8 +13,10 @@ import org.apache.commons.io.IOUtils; import org.json.simple.JSONObject; import org.json.simple.JSONValue; +import org.apache.log4j.Logger; public class ShellProcess { + public static Logger LOG = Logger.getLogger(ShellProcess.class); private DataOutputStream processIn; private BufferedReader processOut; private InputStream processErrorStream; @@ -80,6 +82,22 @@ public String getErrorsString() { } } + public void drainErrorStream() + { + try { + while (processErrorStream.available() > 0) + { + int bufferSize = processErrorStream.available(); + byte[] errorReadingBuffer = new byte[bufferSize]; + + processErrorStream.read(errorReadingBuffer, 0, bufferSize); + + LOG.info("Got error from shell process: " + new String(errorReadingBuffer)); + } + } catch(Exception e) { + } + } + private String readString() throws IOException { StringBuilder line = new StringBuilder(); @@ -95,8 +113,6 @@ private String readString() throws IOException { else { errorMessage.append(" Currently read output: " + line.toString() + "\n"); } - errorMessage.append("Shell Process Exception:\n"); - errorMessage.append(getErrorsString() + "\n"); throw new RuntimeException(errorMessage.toString()); } if(subline.equals("end")) { From b1a1851611f9c0cfdcc31cb90aef531e7b0ac5e9 Mon Sep 17 00:00:00 2001 From: Jason Jackson Date: Fri, 1 Feb 2013 00:23:31 -0800 Subject: [PATCH 03/15] Revert "made CountMetric and MultiCountMetric serializable" This reverts commit 3e29f13e405c0d3460781f8f6b310092d332903a. backwards incompatible change reverted, this is moved to 0.9.0-experimental --- src/jvm/backtype/storm/metric/api/CountMetric.java | 2 +- src/jvm/backtype/storm/metric/api/MultiCountMetric.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/jvm/backtype/storm/metric/api/CountMetric.java b/src/jvm/backtype/storm/metric/api/CountMetric.java index edf3a59f0..7a8f829cc 100644 --- a/src/jvm/backtype/storm/metric/api/CountMetric.java +++ b/src/jvm/backtype/storm/metric/api/CountMetric.java @@ -2,7 +2,7 @@ import backtype.storm.metric.api.IMetric; -public class CountMetric implements IMetric, java.io.Serializable { +public class CountMetric implements IMetric { long _value = 0; public CountMetric() { diff --git a/src/jvm/backtype/storm/metric/api/MultiCountMetric.java b/src/jvm/backtype/storm/metric/api/MultiCountMetric.java index 2649beeaf..02473ca6a 100644 --- a/src/jvm/backtype/storm/metric/api/MultiCountMetric.java +++ b/src/jvm/backtype/storm/metric/api/MultiCountMetric.java @@ -4,7 +4,7 @@ import java.util.HashMap; import java.util.Map; -public class MultiCountMetric implements IMetric, java.io.Serializable { +public class MultiCountMetric implements IMetric { Map _value = new HashMap(); public MultiCountMetric() { From cbd217c6ea20e4d97396bbd7f112081362782573 Mon Sep 17 00:00:00 2001 From: Jason Jackson Date: Fri, 1 Feb 2013 14:54:09 -0800 Subject: [PATCH 04/15] bump "0.9.0-wip16" --- project.clj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project.clj b/project.clj index 574c942e6..7f593876d 100644 --- a/project.clj +++ b/project.clj @@ -1,4 +1,4 @@ -(defproject storm/storm "0.9.0-wip15" +(defproject storm/storm "0.9.0-wip16" :url "http://storm-project.clj" :description "Distributed and fault-tolerant realtime computation" :license {:name "Eclipse Public License - Version 1.0" :url "https://github.com/nathanmarz/storm/blob/master/LICENSE.html"} From 98d24466c3ab48f0a2f8fc71440d8f5e114bcd3a Mon Sep 17 00:00:00 2001 From: Nathan Marz Date: Sun, 24 Mar 2013 23:55:24 -0700 Subject: [PATCH 05/15] remove usage of 2 part zmq messages --- src/clj/backtype/storm/messaging/zmq.clj | 35 ++++++++++++------------ 1 file changed, 18 insertions(+), 17 deletions(-) diff --git a/src/clj/backtype/storm/messaging/zmq.clj b/src/clj/backtype/storm/messaging/zmq.clj index 4d319ba62..cfd55ea41 100644 --- a/src/clj/backtype/storm/messaging/zmq.clj +++ b/src/clj/backtype/storm/messaging/zmq.clj @@ -6,10 +6,19 @@ (:require [zilch.mq :as mq])) -(defn parse-packet [^bytes part1 ^bytes part2] - (let [bb (ByteBuffer/wrap part1) - port (.getShort bb)] - [(int port) part2] +(defn mk-packet [task ^bytes message] + (let [bb (ByteBuffer/allocate (+ 2 (count message)))] + (.putShort bb (short task)) + (.put bb message) + (.array bb) + )) + +(defn parse-packet [^bytes packet] + (let [bb (ByteBuffer/wrap packet) + port (.getShort bb) + msg (byte-array (- (count packet) 2))] + (.get bb msg) + [(int port) msg] )) (defn get-bind-zmq-url [local? port] @@ -26,27 +35,19 @@ (defprotocol ZMQContextQuery (zmq-context [this])) -(def NOBLOCK-SNDMORE (bit-or ZMQ/SNDMORE ZMQ/NOBLOCK)) - -(deftype ZMQConnection [socket ^ByteBuffer bb] +(deftype ZMQConnection [socket] Connection (recv-with-flags [this flags] - (let [part1 (mq/recv socket flags)] - (when part1 - (when-not (mq/recv-more? socket) - (throw (RuntimeException. "Should always receive two-part ZMQ messages"))) - (parse-packet part1 (mq/recv socket))))) + (if-let [packet (mq/recv socket flags)] + (parse-packet packet))) (send [this task message] - (.clear bb) - (.putShort bb (short task)) - (mq/send socket (.array bb) NOBLOCK-SNDMORE) - (mq/send socket message ZMQ/NOBLOCK)) ;; TODO: how to do backpressure if doing noblock?... need to only unblock if the target disappears + (mq/send socket (mk-packet task message) ZMQ/NOBLOCK)) ;; TODO: how to do backpressure if doing noblock?... need to only unblock if the target disappears (close [this] (.close socket) )) (defn mk-connection [socket] - (ZMQConnection. socket (ByteBuffer/allocate 2))) + (ZMQConnection. socket)) (deftype ZMQContext [context linger-ms hwm local?] Context From c6cbcfc53b92fbb15cd402f9411e44901f161008 Mon Sep 17 00:00:00 2001 From: Jason Jackson Date: Fri, 22 Mar 2013 02:45:03 -0700 Subject: [PATCH 06/15] Added SystemBolt. SystemBolt has one executor per worker always. It's perfect for exporting worker related metrics data. --- bin/build_release.sh | 10 +- src/clj/backtype/storm/daemon/common.clj | 34 ++-- src/clj/backtype/storm/daemon/executor.clj | 11 +- src/clj/backtype/storm/daemon/supervisor.clj | 11 +- src/clj/backtype/storm/daemon/worker.clj | 15 +- src/jvm/backtype/storm/Constants.java | 3 + src/jvm/backtype/storm/metric/SystemBolt.java | 164 ++++++++++++++++++ src/jvm/backtype/storm/spout/NoOpSpout.java | 36 ---- .../storm/task/GeneralTopologyContext.java | 2 +- test/clj/backtype/storm/metrics_test.clj | 87 +++++++++- 10 files changed, 291 insertions(+), 82 deletions(-) create mode 100644 src/jvm/backtype/storm/metric/SystemBolt.java delete mode 100644 src/jvm/backtype/storm/spout/NoOpSpout.java diff --git a/bin/build_release.sh b/bin/build_release.sh index fdec1292f..089065f1c 100644 --- a/bin/build_release.sh +++ b/bin/build_release.sh @@ -14,11 +14,11 @@ DIR=_release/storm-$RELEASE rm -rf _release rm -f *.zip -$LEIN with-profile release clean -$LEIN with-profile release deps -$LEIN with-profile release jar -$LEIN with-profile release pom -mvn dependency:copy-dependencies +$LEIN with-profile release clean || exit 1 +$LEIN with-profile release deps || exit 1 +$LEIN with-profile release jar || exit 1 +$LEIN with-profile release pom || exit 1 +mvn dependency:copy-dependencies || exit 1 mkdir -p $DIR/lib cp target/storm-*.jar $DIR/storm-${RELEASE}.jar diff --git a/src/clj/backtype/storm/daemon/common.clj b/src/clj/backtype/storm/daemon/common.clj index da93fdd4c..d1456eaa6 100644 --- a/src/clj/backtype/storm/daemon/common.clj +++ b/src/clj/backtype/storm/daemon/common.clj @@ -5,7 +5,7 @@ (:import [backtype.storm.utils Utils]) (:import [backtype.storm.task WorkerTopologyContext]) (:import [backtype.storm Constants]) - (:import [backtype.storm.spout NoOpSpout]) + (:import [backtype.storm.metric SystemBolt]) (:require [clojure.set :as set]) (:require [backtype.storm.daemon.acker :as acker]) (:require [backtype.storm.thrift :as thrift]) @@ -241,8 +241,9 @@ (number-duplicates) (map #(str Constants/METRICS_COMPONENT_ID_PREFIX %)))) -(defn metrics-consumer-bolt-specs [components-ids-that-emit-metrics storm-conf] - (let [inputs (->> (for [comp-id components-ids-that-emit-metrics] +(defn metrics-consumer-bolt-specs [storm-conf topology] + (let [component-ids-that-emit-metrics (cons SYSTEM-COMPONENT-ID (keys (all-components topology))) + inputs (->> (for [comp-id component-ids-that-emit-metrics] {[comp-id METRICS-STREAM-ID] :shuffle}) (into {})) @@ -261,27 +262,28 @@ (metrics-consumer-register-ids storm-conf) (get storm-conf TOPOLOGY-METRICS-CONSUMER-REGISTER)))) -(defn add-metric-components! [storm-conf ^StormTopology topology] - (doseq [[comp-id bolt-spec] (metrics-consumer-bolt-specs (keys (all-components topology)) storm-conf)] +(defn add-metric-components! [storm-conf ^StormTopology topology] + (doseq [[comp-id bolt-spec] (metrics-consumer-bolt-specs storm-conf topology)] (.put_to_bolts topology comp-id bolt-spec))) -(defn add-system-components! [^StormTopology topology] - (let [system-spout (thrift/mk-spout-spec* - (NoOpSpout.) - {SYSTEM-TICK-STREAM-ID (thrift/output-fields ["rate_secs"]) - METRICS-TICK-STREAM-ID (thrift/output-fields ["interval"])} - :p 0 - :conf {TOPOLOGY-TASKS 0})] - (.put_to_spouts topology SYSTEM-COMPONENT-ID system-spout))) +(defn add-system-components! [conf ^StormTopology topology] + (let [system-bolt-spec (thrift/mk-bolt-spec* + {} + (SystemBolt.) + {SYSTEM-TICK-STREAM-ID (thrift/output-fields ["rate_secs"]) + METRICS-TICK-STREAM-ID (thrift/output-fields ["interval"])} + :p 0 + :conf {TOPOLOGY-TASKS 0})] + (.put_to_bolts topology SYSTEM-COMPONENT-ID system-bolt-spec))) (defn system-topology! [storm-conf ^StormTopology topology] (validate-basic! topology) (let [ret (.deepCopy topology)] (add-acker! storm-conf ret) - (add-metric-components! storm-conf ret) - (add-metric-streams! ret) + (add-metric-components! storm-conf ret) + (add-system-components! storm-conf ret) + (add-metric-streams! ret) (add-system-streams! ret) - (add-system-components! ret) (validate-structure! ret) ret )) diff --git a/src/clj/backtype/storm/daemon/executor.clj b/src/clj/backtype/storm/daemon/executor.clj index 2e176bcd2..a816237b5 100644 --- a/src/clj/backtype/storm/daemon/executor.clj +++ b/src/clj/backtype/storm/daemon/executor.clj @@ -111,8 +111,7 @@ (defn executor-type [^WorkerTopologyContext context component-id] (let [topology (.getRawTopology context) spouts (.get_spouts topology) - bolts (.get_bolts topology) - ] + bolts (.get_bolts topology)] (cond (contains? spouts component-id) :spout (contains? bolts component-id) :bolt :else (throw-runtime "Could not find " component-id " in topology " topology)))) @@ -182,7 +181,7 @@ (this task tuple nil) ))) -(defn executor-data [worker executor-id] +(defn mk-executor-data [worker executor-id] (let [worker-context (worker-context worker) task-ids (executor-id->tasks executor-id) component-id (.getComponentId worker-context (first task-ids)) @@ -253,7 +252,7 @@ (fn [] (disruptor/publish receive-queue - [[nil (TupleImpl. worker-context [interval] -1 Constants/METRICS_TICK_STREAM_ID)]])))))) + [[nil (TupleImpl. worker-context [interval] Constants/SYSTEM_TASK_ID Constants/METRICS_TICK_STREAM_ID)]])))))) (defn metrics-tick [executor-data task-datas ^TupleImpl tuple] (let [{:keys [interval->task->metric-registry ^WorkerTopologyContext worker-context]} executor-data @@ -293,11 +292,11 @@ (fn [] (disruptor/publish receive-queue - [[nil (TupleImpl. context [tick-time-secs] -1 Constants/SYSTEM_TICK_STREAM_ID)]] + [[nil (TupleImpl. context [tick-time-secs] Constants/SYSTEM_TASK_ID Constants/SYSTEM_TICK_STREAM_ID)]] ))))))) (defn mk-executor [worker executor-id] - (let [executor-data (executor-data worker executor-id) + (let [executor-data (mk-executor-data worker executor-id) _ (log-message "Loading executor " (:component-id executor-data) ":" (pr-str executor-id)) task-datas (->> executor-data :task-ids diff --git a/src/clj/backtype/storm/daemon/supervisor.clj b/src/clj/backtype/storm/daemon/supervisor.clj index c27a276ec..690a3f34a 100644 --- a/src/clj/backtype/storm/daemon/supervisor.clj +++ b/src/clj/backtype/storm/daemon/supervisor.clj @@ -85,8 +85,8 @@ (let [local-assignment (assigned-executors (:port worker-heartbeat))] (and local-assignment (= (:storm-id worker-heartbeat) (:storm-id local-assignment)) - (= (set (:executors worker-heartbeat)) (set (:executors local-assignment)))) - )) + (= (disj (set (:executors worker-heartbeat)) Constants/SYSTEM_EXECUTOR_ID) + (set (:executors local-assignment)))))) (defn read-allocated-workers "Returns map from worker id to worker heartbeat. if the heartbeat is nil, then the worker is dead (timed out or never wrote heartbeat)" @@ -117,7 +117,12 @@ (defn- wait-for-worker-launch [conf id start-time] (let [state (worker-state conf id)] (loop [] - (let [hb (.get state LS-WORKER-HEARTBEAT)] + (let [hb (try (.get state LS-WORKER-HEARTBEAT) + (catch java.io.FileNotFoundException e + ;; This solves race condition in unit tests if you try to shutdown + ;; a worker which cleans up worker state while you also try to wait + ;; for worker to launch by reading the same state. + nil))] (when (and (not hb) (< diff --git a/src/clj/backtype/storm/daemon/worker.clj b/src/clj/backtype/storm/daemon/worker.clj index f614fa497..157fb3297 100644 --- a/src/clj/backtype/storm/daemon/worker.clj +++ b/src/clj/backtype/storm/daemon/worker.clj @@ -9,15 +9,16 @@ (defmulti mk-suicide-fn cluster-mode) -(defn read-worker-executors [storm-cluster-state storm-id assignment-id port] +(defn read-worker-executors [storm-conf storm-cluster-state storm-id assignment-id port] (let [assignment (:executor->node+port (.assignment-info storm-cluster-state storm-id nil))] (doall + (concat + [Constants/SYSTEM_EXECUTOR_ID] (mapcat (fn [[executor loc]] - (if (= loc [assignment-id port]) - [executor] - )) - assignment)) - )) + (if (= loc [assignment-id port]) + [executor] + )) + assignment))))) (defnk do-executor-heartbeats [worker :executors nil] ;; stats is how we know what executors are assigned to this worker @@ -144,7 +145,7 @@ (let [cluster-state (cluster/mk-distributed-cluster-state conf) storm-cluster-state (cluster/mk-storm-cluster-state cluster-state) storm-conf (read-supervisor-storm-conf conf storm-id) - executors (set (read-worker-executors storm-cluster-state storm-id assignment-id port)) + executors (set (read-worker-executors storm-conf storm-cluster-state storm-id assignment-id port)) transfer-queue (disruptor/disruptor-queue (storm-conf TOPOLOGY-TRANSFER-BUFFER-SIZE) :wait-strategy (storm-conf TOPOLOGY-DISRUPTOR-WAIT-STRATEGY)) executor-receive-queue-map (mk-receive-queue-map storm-conf executors) diff --git a/src/jvm/backtype/storm/Constants.java b/src/jvm/backtype/storm/Constants.java index 705278986..a8ade3c53 100644 --- a/src/jvm/backtype/storm/Constants.java +++ b/src/jvm/backtype/storm/Constants.java @@ -1,11 +1,14 @@ package backtype.storm; import backtype.storm.coordination.CoordinatedBolt; +import clojure.lang.RT; public class Constants { public static final String COORDINATED_STREAM_ID = CoordinatedBolt.class.getName() + "/coord-stream"; + public static final long SYSTEM_TASK_ID = -1; + public static final Object SYSTEM_EXECUTOR_ID = RT.readString("[-1 -1]"); public static final String SYSTEM_COMPONENT_ID = "__system"; public static final String SYSTEM_TICK_STREAM_ID = "__tick"; public static final String METRICS_COMPONENT_ID_PREFIX = "__metrics"; diff --git a/src/jvm/backtype/storm/metric/SystemBolt.java b/src/jvm/backtype/storm/metric/SystemBolt.java new file mode 100644 index 000000000..68ae85351 --- /dev/null +++ b/src/jvm/backtype/storm/metric/SystemBolt.java @@ -0,0 +1,164 @@ +package backtype.storm.metric; + +import backtype.storm.Config; +import backtype.storm.metric.api.AssignableMetric; +import backtype.storm.metric.api.IMetric; +import backtype.storm.task.IBolt; +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.tuple.Tuple; +import clojure.lang.AFn; +import clojure.lang.IFn; +import clojure.lang.RT; +import com.google.common.collect.ImmutableMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.management.*; +import java.util.List; +import java.util.Map; + + +// There is one task inside one executor for each worker of the topology. +// TaskID is always -1, therefore you can only send-unanchored tuples to co-located SystemBolt. +// This bolt was conceived to export worker stats via metrics api. +public class SystemBolt implements IBolt { + private static Logger LOG = LoggerFactory.getLogger(SystemBolt.class); + private static boolean _prepareWasCalled = false; + + private static class MemoryUsageMetric implements IMetric { + IFn _getUsage; + public MemoryUsageMetric(IFn getUsage) { + _getUsage = getUsage; + } + @Override + public Object getValueAndReset() { + MemoryUsage memUsage = (MemoryUsage)_getUsage.invoke(); + return ImmutableMap.builder() + .put("maxBytes", memUsage.getMax()) + .put("committedBytes", memUsage.getCommitted()) + .put("initBytes", memUsage.getInit()) + .put("usedBytes", memUsage.getUsed()) + .put("virtualFreeBytes", memUsage.getMax() - memUsage.getUsed()) + .put("unusedBytes", memUsage.getCommitted() - memUsage.getUsed()) + .build(); + } + } + + // canonically the metrics data exported is time bucketed when doing counts. + // convert the absolute values here into time buckets. + private static class GarbageCollectorMetric implements IMetric { + GarbageCollectorMXBean _gcBean; + Long _collectionCount; + Long _collectionTime; + public GarbageCollectorMetric(GarbageCollectorMXBean gcBean) { + _gcBean = gcBean; + } + @Override + public Object getValueAndReset() { + Long collectionCountP = _gcBean.getCollectionCount(); + Long collectionTimeP = _gcBean.getCollectionCount(); + + Map ret = null; + if(_collectionCount!=null && _collectionTime!=null) { + ret = ImmutableMap.builder() + .put("count", collectionCountP - _collectionCount) + .put("timeMs", collectionTimeP - _collectionTime) + .build(); + } + + _collectionCount = collectionCountP; + _collectionTime = collectionTimeP; + return ret; + } + } + + @Override + public void prepare(final Map stormConf, TopologyContext context, OutputCollector collector) { + if(_prepareWasCalled && stormConf.get(Config.STORM_CLUSTER_MODE) != "local") { + throw new RuntimeException("A single worker should have 1 SystemBolt instance."); + } + _prepareWasCalled = true; + + int bucketSize = RT.intCast(stormConf.get(Config.TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS)); + + final RuntimeMXBean jvmRT = ManagementFactory.getRuntimeMXBean(); + + context.registerMetric("uptimeSecs", new IMetric() { + @Override + public Object getValueAndReset() { + return jvmRT.getUptime()/1000.0; + } + }, bucketSize); + + // You can calculate topology percent uptime between T_0 to T_1 using this metric data: + // let s = sum topologyPartialUptimeSecs for each worker for each time buckets between T_0 and T_1 + // topology percent uptime = s/(T_1-T_0) + // Even if the number of workers change over time the value is still correct because I divide by TOPOLOGY_WORKERS. + context.registerMetric("topologyPartialUptimeSecs", new IMetric() { + private long _prevUptime = jvmRT.getUptime(); + private final double NUM_WORKERS = RT.doubleCast(stormConf.get(Config.TOPOLOGY_WORKERS)); + @Override + public Object getValueAndReset() { + long _nowUptime = jvmRT.getUptime(); + double ret = (_nowUptime - _prevUptime)/1000.0/NUM_WORKERS; + _prevUptime = _nowUptime; + return ret; + } + }, bucketSize); + + context.registerMetric("startTimeSecs", new IMetric() { + @Override + public Object getValueAndReset() { + return jvmRT.getStartTime()/1000.0; + } + }, bucketSize); + + context.registerMetric("newWorkerEvent", new IMetric() { + boolean doEvent = true; + + @Override + public Object getValueAndReset() { + if (doEvent) { + doEvent = false; + return 1; + } else return 0; + } + }, bucketSize); + + // This is metric data global to topology, but we don't support this concept, so put it here. + // It's very useful to have time series of TOPOLOGY_WORKERS to compare actual worker count. + context.registerMetric("configTopologyWorkers", new IMetric() { + @Override + public Object getValueAndReset() { + return stormConf.get(Config.TOPOLOGY_WORKERS); + } + }, bucketSize); + + final MemoryMXBean jvmMemRT = ManagementFactory.getMemoryMXBean(); + + context.registerMetric("memory/heap", new MemoryUsageMetric(new AFn() { + public Object invoke() { + return jvmMemRT.getHeapMemoryUsage(); + } + }), bucketSize); + context.registerMetric("memory/nonHeap", new MemoryUsageMetric(new AFn() { + public Object invoke() { + return jvmMemRT.getNonHeapMemoryUsage(); + } + }), bucketSize); + + for(GarbageCollectorMXBean b : ManagementFactory.getGarbageCollectorMXBeans()) { + context.registerMetric("GC/" + b.getName().replaceAll("\\W", ""), new GarbageCollectorMetric(b), bucketSize); + } + } + + @Override + public void execute(Tuple input) { + throw new RuntimeException("Non-system tuples should never be sent to __system bolt."); + } + + @Override + public void cleanup() { + } +} diff --git a/src/jvm/backtype/storm/spout/NoOpSpout.java b/src/jvm/backtype/storm/spout/NoOpSpout.java deleted file mode 100644 index 03586dcae..000000000 --- a/src/jvm/backtype/storm/spout/NoOpSpout.java +++ /dev/null @@ -1,36 +0,0 @@ -package backtype.storm.spout; - -import backtype.storm.task.TopologyContext; -import java.util.Map; - - -public class NoOpSpout implements ISpout { - @Override - public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { - } - - @Override - public void close() { - } - - @Override - public void activate() { - } - - @Override - public void deactivate() { - } - - @Override - public void nextTuple() { - } - - @Override - public void ack(Object msgId) { - } - - @Override - public void fail(Object msgId) { - } - -} diff --git a/src/jvm/backtype/storm/task/GeneralTopologyContext.java b/src/jvm/backtype/storm/task/GeneralTopologyContext.java index e9e638e38..3065b23d6 100644 --- a/src/jvm/backtype/storm/task/GeneralTopologyContext.java +++ b/src/jvm/backtype/storm/task/GeneralTopologyContext.java @@ -63,7 +63,7 @@ public StormTopology getRawTopology() { * @return the component id for the input task id */ public String getComponentId(int taskId) { - if(taskId==-1) { + if(taskId==Constants.SYSTEM_TASK_ID) { return Constants.SYSTEM_COMPONENT_ID; } else { return _taskToComponent.get(taskId); diff --git a/test/clj/backtype/storm/metrics_test.clj b/test/clj/backtype/storm/metrics_test.clj index 30ba9a768..62d7af239 100644 --- a/test/clj/backtype/storm/metrics_test.clj +++ b/test/clj/backtype/storm/metrics_test.clj @@ -75,6 +75,9 @@ (wait-for-atleast-N-buckets! N# ~comp-id ~metric-name) (is (= ~expected (subvec (lookup-bucket-by-comp-id-&-metric-name! ~comp-id ~metric-name) 0 N#)))))) +(defmacro assert-metric-data-exists! [comp-id metric-name] + `(is (not-empty (lookup-bucket-by-comp-id-&-metric-name! ~comp-id ~metric-name)))) + (deftest test-custom-metric (with-simulated-time-local-cluster [cluster :daemon-conf {TOPOLOGY-METRICS-CONSUMER-REGISTER @@ -103,7 +106,7 @@ (deftest test-builtin-metrics-1 (with-simulated-time-local-cluster - [cluster :daemon-conf {TOPOLOGY-METRICS-CONSUMER-REGISTER + [cluster :daemon-conf {TOPOLOGY-METRICS-CONSUMER-REGISTER [{"class" "clojure.storm.metric.testing.FakeMetricConsumer"}] TOPOLOGY-STATS-SAMPLE-RATE 1.0 TOPOLOGY-BUILTIN-METRICS-BUCKET-SIZE-SECS 60}] @@ -142,7 +145,6 @@ (with-simulated-time-local-cluster [cluster :daemon-conf {TOPOLOGY-METRICS-CONSUMER-REGISTER [{"class" "clojure.storm.metric.testing.FakeMetricConsumer"}] - TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS true TOPOLOGY-STATS-SAMPLE-RATE 1.0 TOPOLOGY-BUILTIN-METRICS-BUCKET-SIZE-SECS 5}] (let [feeder (feeder-spout ["field1"]) @@ -153,7 +155,7 @@ {"mybolt" (thrift/mk-bolt-spec {"myspout" :shuffle} ack-every-other)})] (submit-local-topology (:nimbus cluster) "metrics-tester" - {TOPOLOGY-MESSAGE-TIMEOUT-SECS 20} + {} topology) (.feed feeder ["a"] 1) @@ -175,11 +177,80 @@ (assert-buckets! "mybolt" "__ack-count/myspout:default" [1 0]) (assert-buckets! "mybolt" "__execute-count/myspout:default" [1 1]) - (advance-cluster-time cluster 30) - (assert-failed tracker 2) - (assert-buckets! "myspout" "__fail-count/default" [1]) + (advance-cluster-time cluster 15) (assert-buckets! "myspout" "__ack-count/default" [1 0 0 0 0]) (assert-buckets! "myspout" "__emit-count/default" [1 1 0 0 0]) - (assert-buckets! "myspout" "__transfer-count/default" [1 1 0 0 0]) + (assert-buckets! "myspout" "__transfer-count/default" [1 1 0 0 0]) (assert-buckets! "mybolt" "__ack-count/myspout:default" [1 0 0 0 0]) - (assert-buckets! "mybolt" "__execute-count/myspout:default" [1 1 0 0 0])))) + (assert-buckets! "mybolt" "__execute-count/myspout:default" [1 1 0 0 0]) + + (.feed feeder ["c"] 3) + (advance-cluster-time cluster 15) + (assert-buckets! "myspout" "__ack-count/default" [1 0 0 0 0 1 0 0]) + (assert-buckets! "myspout" "__emit-count/default" [1 1 0 0 0 1 0 0]) + (assert-buckets! "myspout" "__transfer-count/default" [1 1 0 0 0 1 0 0]) + (assert-buckets! "mybolt" "__ack-count/myspout:default" [1 0 0 0 0 1 0 0]) + (assert-buckets! "mybolt" "__execute-count/myspout:default" [1 1 0 0 0 1 0 0])))) + +(deftest test-builtin-metrics-3 + (with-simulated-time-local-cluster + [cluster :daemon-conf {TOPOLOGY-METRICS-CONSUMER-REGISTER + [{"class" "clojure.storm.metric.testing.FakeMetricConsumer"}] + TOPOLOGY-STATS-SAMPLE-RATE 1.0 + TOPOLOGY-BUILTIN-METRICS-BUCKET-SIZE-SECS 5 + TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS true}] + (let [feeder (feeder-spout ["field1"]) + tracker (AckFailMapTracker.) + _ (.setAckFailDelegate feeder tracker) + topology (thrift/mk-topology + {"myspout" (thrift/mk-spout-spec feeder)} + {"mybolt" (thrift/mk-bolt-spec {"myspout" :global} ack-every-other)})] + (submit-local-topology (:nimbus cluster) + "timeout-tester" + {TOPOLOGY-MESSAGE-TIMEOUT-SECS 10} + topology) + (.feed feeder ["a"] 1) + (.feed feeder ["b"] 2) + (.feed feeder ["c"] 3) + (advance-cluster-time cluster 9) + (assert-acked tracker 1 3) + (assert-buckets! "myspout" "__ack-count/default" [2]) + (assert-buckets! "myspout" "__emit-count/default" [3]) + (assert-buckets! "myspout" "__transfer-count/default" [3]) + (assert-buckets! "mybolt" "__ack-count/myspout:default" [2]) + (assert-buckets! "mybolt" "__execute-count/myspout:default" [3]) + + (is (not (.isFailed tracker 2))) + (advance-cluster-time cluster 30) + (assert-failed tracker 2) + (assert-buckets! "myspout" "__fail-count/default" [1]) + (assert-buckets! "myspout" "__ack-count/default" [2 0 0 0]) + (assert-buckets! "myspout" "__emit-count/default" [3 0 0 0]) + (assert-buckets! "myspout" "__transfer-count/default" [3 0 0 0]) + (assert-buckets! "mybolt" "__ack-count/myspout:default" [2 0 0 0]) + (assert-buckets! "mybolt" "__execute-count/myspout:default" [3 0 0 0])))) + +(deftest test-system-bolt + (with-simulated-time-local-cluster + [cluster :daemon-conf {TOPOLOGY-METRICS-CONSUMER-REGISTER + [{"class" "clojure.storm.metric.testing.FakeMetricConsumer"}] + TOPOLOGY-BUILTIN-METRICS-BUCKET-SIZE-SECS 60}] + (let [feeder (feeder-spout ["field1"]) + topology (thrift/mk-topology + {"1" (thrift/mk-spout-spec feeder)} + {})] + (submit-local-topology (:nimbus cluster) "metrics-tester" {} topology) + + (.feed feeder ["a"] 1) + (advance-cluster-time cluster 70) + (assert-buckets! "__system" "newWorkerEvent" [1]) + (assert-buckets! "__system" "configTopologyWorkers" [1]) + (assert-metric-data-exists! "__system" "uptimeSecs") + (assert-metric-data-exists! "__system" "startTimeSecs") + (assert-metric-data-exists! "__system" "topologyPartialUptimeSecs") + + (advance-cluster-time cluster 180) + (assert-buckets! "__system" "newWorkerEvent" [1 0 0 0]) + (assert-buckets! "__system" "configTopologyWorkers" [1 1 1 1])))) + + From 92586a455de77d9fd90cf2ca83d4a08b7089950e Mon Sep 17 00:00:00 2001 From: Nathan Marz Date: Mon, 25 Mar 2013 17:02:14 -0700 Subject: [PATCH 07/15] fix default scheduler to always reassign topology if some of its executors are dead --- .../storm/scheduler/DefaultScheduler.clj | 4 +- test/clj/backtype/storm/nimbus_test.clj | 51 +++++++++++++++++++ 2 files changed, 54 insertions(+), 1 deletion(-) diff --git a/src/clj/backtype/storm/scheduler/DefaultScheduler.clj b/src/clj/backtype/storm/scheduler/DefaultScheduler.clj index 59f2cdc96..bbec50e6f 100644 --- a/src/clj/backtype/storm/scheduler/DefaultScheduler.clj +++ b/src/clj/backtype/storm/scheduler/DefaultScheduler.clj @@ -47,10 +47,12 @@ (map #(vector (.getStartTask %) (.getEndTask %))) set) alive-assigned (EvenScheduler/get-alive-assigned-node+port->executors cluster topology-id) + alive-executors (->> alive-assigned vals (apply concat) set) can-reassign-slots (slots-can-reassign cluster (keys alive-assigned)) total-slots-to-use (min (.getNumWorkers topology) (+ (count can-reassign-slots) (count available-slots))) - bad-slots (if (> total-slots-to-use (count alive-assigned)) + bad-slots (if (or (> total-slots-to-use (count alive-assigned)) + (not= alive-executors all-executors)) (bad-slots alive-assigned (count all-executors) total-slots-to-use) [])]] (.freeSlots cluster bad-slots) diff --git a/test/clj/backtype/storm/nimbus_test.clj b/test/clj/backtype/storm/nimbus_test.clj index a731668a5..749303469 100644 --- a/test/clj/backtype/storm/nimbus_test.clj +++ b/test/clj/backtype/storm/nimbus_test.clj @@ -503,6 +503,57 @@ (check-consistency cluster "test") ))) + +(deftest test-reassignment-to-constrained-cluster + (with-simulated-time-local-cluster [cluster :supervisors 0 + :daemon-conf {SUPERVISOR-ENABLE false + NIMBUS-TASK-LAUNCH-SECS 60 + NIMBUS-TASK-TIMEOUT-SECS 20 + NIMBUS-MONITOR-FREQ-SECS 10 + NIMBUS-SUPERVISOR-TIMEOUT-SECS 100 + TOPOLOGY-ACKER-EXECUTORS 0}] + (letlocals + (add-supervisor cluster :ports 1 :id "a") + (add-supervisor cluster :ports 1 :id "b") + (bind conf (:daemon-conf cluster)) + (bind topology (thrift/mk-topology + {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 2)} + {} + )) + (bind state (:storm-cluster-state cluster)) + (submit-local-topology (:nimbus cluster) "test" {TOPOLOGY-WORKERS 2} topology) + (check-consistency cluster "test") + (bind storm-id (get-storm-id state "test")) + (bind [executor-id1 executor-id2] (topology-executors cluster storm-id)) + (bind ass1 (executor-assignment cluster storm-id executor-id1)) + (bind ass2 (executor-assignment cluster storm-id executor-id2)) + + (advance-cluster-time cluster 59) + (do-executor-heartbeat cluster storm-id executor-id1) + (do-executor-heartbeat cluster storm-id executor-id2) + + (advance-cluster-time cluster 13) + (is (= ass1 (executor-assignment cluster storm-id executor-id1))) + (is (= ass2 (executor-assignment cluster storm-id executor-id2))) + (kill-supervisor cluster "b") + (do-executor-heartbeat cluster storm-id executor-id1) + + (advance-cluster-time cluster 11) + (do-executor-heartbeat cluster storm-id executor-id1) + + (advance-cluster-time cluster 11) + (do-executor-heartbeat cluster storm-id executor-id1) + + (advance-cluster-time cluster 11) + (do-executor-heartbeat cluster storm-id executor-id1) + + (advance-cluster-time cluster 11) + (do-executor-heartbeat cluster storm-id executor-id1) + + (check-consistency cluster "test") + (is (= 1 (storm-num-workers state "test"))) + ))) + (defn check-executor-distribution [slot-executors distribution] (check-distribution (vals slot-executors) distribution)) From 4bc5f5182800e698192cef767d69bf0b8f92b907 Mon Sep 17 00:00:00 2001 From: Jason Jackson Date: Mon, 25 Mar 2013 18:57:14 -0700 Subject: [PATCH 08/15] this section of code was moved to new PR --- src/clj/backtype/storm/daemon/supervisor.clj | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/src/clj/backtype/storm/daemon/supervisor.clj b/src/clj/backtype/storm/daemon/supervisor.clj index 690a3f34a..fda038faa 100644 --- a/src/clj/backtype/storm/daemon/supervisor.clj +++ b/src/clj/backtype/storm/daemon/supervisor.clj @@ -117,12 +117,7 @@ (defn- wait-for-worker-launch [conf id start-time] (let [state (worker-state conf id)] (loop [] - (let [hb (try (.get state LS-WORKER-HEARTBEAT) - (catch java.io.FileNotFoundException e - ;; This solves race condition in unit tests if you try to shutdown - ;; a worker which cleans up worker state while you also try to wait - ;; for worker to launch by reading the same state. - nil))] + (let [hb (.get state LS-WORKER-HEARTBEAT)] (when (and (not hb) (< From 4a31a41eaf8b20904a4a75807f7aa11411230eca Mon Sep 17 00:00:00 2001 From: Jason Jackson Date: Mon, 25 Mar 2013 18:58:55 -0700 Subject: [PATCH 09/15] removed topology level stat --- src/jvm/backtype/storm/metric/SystemBolt.java | 9 --------- 1 file changed, 9 deletions(-) diff --git a/src/jvm/backtype/storm/metric/SystemBolt.java b/src/jvm/backtype/storm/metric/SystemBolt.java index 68ae85351..b222ebf54 100644 --- a/src/jvm/backtype/storm/metric/SystemBolt.java +++ b/src/jvm/backtype/storm/metric/SystemBolt.java @@ -126,15 +126,6 @@ public Object getValueAndReset() { } }, bucketSize); - // This is metric data global to topology, but we don't support this concept, so put it here. - // It's very useful to have time series of TOPOLOGY_WORKERS to compare actual worker count. - context.registerMetric("configTopologyWorkers", new IMetric() { - @Override - public Object getValueAndReset() { - return stormConf.get(Config.TOPOLOGY_WORKERS); - } - }, bucketSize); - final MemoryMXBean jvmMemRT = ManagementFactory.getMemoryMXBean(); context.registerMetric("memory/heap", new MemoryUsageMetric(new AFn() { From b12a1b9bb16ff04536b75b07a06f446a90d313ed Mon Sep 17 00:00:00 2001 From: Jason Jackson Date: Mon, 25 Mar 2013 19:26:01 -0700 Subject: [PATCH 10/15] removed topologyPartialUptimeSecs --- src/jvm/backtype/storm/metric/SystemBolt.java | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/src/jvm/backtype/storm/metric/SystemBolt.java b/src/jvm/backtype/storm/metric/SystemBolt.java index b222ebf54..da8bd2b96 100644 --- a/src/jvm/backtype/storm/metric/SystemBolt.java +++ b/src/jvm/backtype/storm/metric/SystemBolt.java @@ -91,22 +91,6 @@ public Object getValueAndReset() { } }, bucketSize); - // You can calculate topology percent uptime between T_0 to T_1 using this metric data: - // let s = sum topologyPartialUptimeSecs for each worker for each time buckets between T_0 and T_1 - // topology percent uptime = s/(T_1-T_0) - // Even if the number of workers change over time the value is still correct because I divide by TOPOLOGY_WORKERS. - context.registerMetric("topologyPartialUptimeSecs", new IMetric() { - private long _prevUptime = jvmRT.getUptime(); - private final double NUM_WORKERS = RT.doubleCast(stormConf.get(Config.TOPOLOGY_WORKERS)); - @Override - public Object getValueAndReset() { - long _nowUptime = jvmRT.getUptime(); - double ret = (_nowUptime - _prevUptime)/1000.0/NUM_WORKERS; - _prevUptime = _nowUptime; - return ret; - } - }, bucketSize); - context.registerMetric("startTimeSecs", new IMetric() { @Override public Object getValueAndReset() { From 11503f13f8883e66503bcaa62e6a664e14974e37 Mon Sep 17 00:00:00 2001 From: Nathan Marz Date: Mon, 25 Mar 2013 20:14:34 -0700 Subject: [PATCH 11/15] get rid of dead code --- .../backtype/storm/scheduler/IsolationScheduler.clj | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/src/clj/backtype/storm/scheduler/IsolationScheduler.clj b/src/clj/backtype/storm/scheduler/IsolationScheduler.clj index bd15f187b..b235de2d8 100644 --- a/src/clj/backtype/storm/scheduler/IsolationScheduler.clj +++ b/src/clj/backtype/storm/scheduler/IsolationScheduler.clj @@ -17,16 +17,6 @@ (defn -prepare [this conf] (container-set! (.state this) conf)) - -(defn- compute-worker-specs "Returns list of sets of executors" - [^TopologyDetails details] - (->> (.getExecutorToComponent details) - reverse-map - (map second) - (apply interleave-all) - (partition-fixed (.getNumWorkers details)) - (map set))) - (defn- compute-worker-specs "Returns mutable set of sets of executors" [^TopologyDetails details] (->> (.getExecutorToComponent details) From 930fcbfb6787e364e779bdc332dc0ebcbe076c51 Mon Sep 17 00:00:00 2001 From: Nathan Marz Date: Mon, 25 Mar 2013 21:24:33 -0700 Subject: [PATCH 12/15] add back in dumping of error stream when pipe to subprocess is broken --- src/jvm/backtype/storm/utils/ShellProcess.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/jvm/backtype/storm/utils/ShellProcess.java b/src/jvm/backtype/storm/utils/ShellProcess.java index 011b60691..49c428ae3 100644 --- a/src/jvm/backtype/storm/utils/ShellProcess.java +++ b/src/jvm/backtype/storm/utils/ShellProcess.java @@ -113,6 +113,8 @@ private String readString() throws IOException { else { errorMessage.append(" Currently read output: " + line.toString() + "\n"); } + errorMessage.append("Shell Process Exception:\n"); + errorMessage.append(getErrorsString() + "\n"); throw new RuntimeException(errorMessage.toString()); } if(subline.equals("end")) { From c15b28fd71098f77323680db9b1bb0e76cfdbe94 Mon Sep 17 00:00:00 2001 From: Nathan Marz Date: Mon, 25 Mar 2013 21:24:40 -0700 Subject: [PATCH 13/15] fix metrics test --- test/clj/backtype/storm/metrics_test.clj | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/test/clj/backtype/storm/metrics_test.clj b/test/clj/backtype/storm/metrics_test.clj index 62d7af239..e6a0e9942 100644 --- a/test/clj/backtype/storm/metrics_test.clj +++ b/test/clj/backtype/storm/metrics_test.clj @@ -244,13 +244,11 @@ (.feed feeder ["a"] 1) (advance-cluster-time cluster 70) (assert-buckets! "__system" "newWorkerEvent" [1]) - (assert-buckets! "__system" "configTopologyWorkers" [1]) (assert-metric-data-exists! "__system" "uptimeSecs") (assert-metric-data-exists! "__system" "startTimeSecs") - (assert-metric-data-exists! "__system" "topologyPartialUptimeSecs") (advance-cluster-time cluster 180) (assert-buckets! "__system" "newWorkerEvent" [1 0 0 0]) - (assert-buckets! "__system" "configTopologyWorkers" [1 1 1 1])))) + ))) From cfe7662c800c4b69313457053201478f8506d38b Mon Sep 17 00:00:00 2001 From: Nathan Marz Date: Mon, 25 Mar 2013 21:24:45 -0700 Subject: [PATCH 14/15] update changelog --- CHANGELOG.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index b4db2c4a2..78963c83f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,12 +12,18 @@ * Nimbus throws NotAliveException instead of FileNotFoundException from various query methods when topology is no longer alive (thanks revans2) * Escape HTML and Javascript appropriately in Storm UI (thanks d2r) * Storm's Zookeeper client now uses bounded exponential backoff strategy on failures + * Automatically drain and log error stream of multilang subprocesses * Append component name to thread name of running executors so that logs are easier to read * Bug fix: Supervisor provides full path to workers to logging config rather than relative path (thanks revans2) * Bug fix: Call ReducerAggregator#init properly when used within persistentAggregate (thanks lorcan) * Bug fix: Set component-specific configs correctly for Trident spouts + +## 0.8.3 (unreleased) + + * Revert zmq layer to not rely on multipart messages to fix issue reported by some users * Bug fix: Fix TransactionalMap and OpaqueMap to correctly do multiple updates to the same key in the same batch * Bug fix: Fix race condition between supervisor and Nimbus that could lead to stormconf.ser errors and infinite crashing of supervisor + * Bug fix: Fix default scheduler to always reassign workers in a constrained topology when there are dead executors ## 0.8.2 From a6f409e46c0c4a4caa8131fecc89e6d7aa9fd8cf Mon Sep 17 00:00:00 2001 From: Nathan Marz Date: Mon, 25 Mar 2013 21:26:12 -0700 Subject: [PATCH 15/15] update contributors --- README.markdown | 1 + 1 file changed, 1 insertion(+) diff --git a/README.markdown b/README.markdown index 5532937b4..2a7d45cb4 100644 --- a/README.markdown +++ b/README.markdown @@ -70,6 +70,7 @@ You must not remove this notice, or any other, from this software. * Lorcan Coyle ([@lorcan](https://github.com/lorcan)) * Derek Dagit ([@d2r](https://github.com/d2r)) * Andrew Olson ([@noslowerdna](https://github.com/noslowerdna)) +* Gavin Li ([@lyogavin](https://github.com/lyogavin)) ## Acknowledgements