diff --git a/bin/build_release.sh b/bin/build_release.sh index fdec1292f..089065f1c 100644 --- a/bin/build_release.sh +++ b/bin/build_release.sh @@ -14,11 +14,11 @@ DIR=_release/storm-$RELEASE rm -rf _release rm -f *.zip -$LEIN with-profile release clean -$LEIN with-profile release deps -$LEIN with-profile release jar -$LEIN with-profile release pom -mvn dependency:copy-dependencies +$LEIN with-profile release clean || exit 1 +$LEIN with-profile release deps || exit 1 +$LEIN with-profile release jar || exit 1 +$LEIN with-profile release pom || exit 1 +mvn dependency:copy-dependencies || exit 1 mkdir -p $DIR/lib cp target/storm-*.jar $DIR/storm-${RELEASE}.jar diff --git a/src/clj/backtype/storm/daemon/common.clj b/src/clj/backtype/storm/daemon/common.clj index da93fdd4c..d1456eaa6 100644 --- a/src/clj/backtype/storm/daemon/common.clj +++ b/src/clj/backtype/storm/daemon/common.clj @@ -5,7 +5,7 @@ (:import [backtype.storm.utils Utils]) (:import [backtype.storm.task WorkerTopologyContext]) (:import [backtype.storm Constants]) - (:import [backtype.storm.spout NoOpSpout]) + (:import [backtype.storm.metric SystemBolt]) (:require [clojure.set :as set]) (:require [backtype.storm.daemon.acker :as acker]) (:require [backtype.storm.thrift :as thrift]) @@ -241,8 +241,9 @@ (number-duplicates) (map #(str Constants/METRICS_COMPONENT_ID_PREFIX %)))) -(defn metrics-consumer-bolt-specs [components-ids-that-emit-metrics storm-conf] - (let [inputs (->> (for [comp-id components-ids-that-emit-metrics] +(defn metrics-consumer-bolt-specs [storm-conf topology] + (let [component-ids-that-emit-metrics (cons SYSTEM-COMPONENT-ID (keys (all-components topology))) + inputs (->> (for [comp-id component-ids-that-emit-metrics] {[comp-id METRICS-STREAM-ID] :shuffle}) (into {})) @@ -261,27 +262,28 @@ (metrics-consumer-register-ids storm-conf) (get storm-conf TOPOLOGY-METRICS-CONSUMER-REGISTER)))) -(defn add-metric-components! [storm-conf ^StormTopology topology] - (doseq [[comp-id bolt-spec] (metrics-consumer-bolt-specs (keys (all-components topology)) storm-conf)] +(defn add-metric-components! [storm-conf ^StormTopology topology] + (doseq [[comp-id bolt-spec] (metrics-consumer-bolt-specs storm-conf topology)] (.put_to_bolts topology comp-id bolt-spec))) -(defn add-system-components! [^StormTopology topology] - (let [system-spout (thrift/mk-spout-spec* - (NoOpSpout.) - {SYSTEM-TICK-STREAM-ID (thrift/output-fields ["rate_secs"]) - METRICS-TICK-STREAM-ID (thrift/output-fields ["interval"])} - :p 0 - :conf {TOPOLOGY-TASKS 0})] - (.put_to_spouts topology SYSTEM-COMPONENT-ID system-spout))) +(defn add-system-components! [conf ^StormTopology topology] + (let [system-bolt-spec (thrift/mk-bolt-spec* + {} + (SystemBolt.) + {SYSTEM-TICK-STREAM-ID (thrift/output-fields ["rate_secs"]) + METRICS-TICK-STREAM-ID (thrift/output-fields ["interval"])} + :p 0 + :conf {TOPOLOGY-TASKS 0})] + (.put_to_bolts topology SYSTEM-COMPONENT-ID system-bolt-spec))) (defn system-topology! [storm-conf ^StormTopology topology] (validate-basic! topology) (let [ret (.deepCopy topology)] (add-acker! storm-conf ret) - (add-metric-components! storm-conf ret) - (add-metric-streams! ret) + (add-metric-components! storm-conf ret) + (add-system-components! storm-conf ret) + (add-metric-streams! ret) (add-system-streams! ret) - (add-system-components! ret) (validate-structure! ret) ret )) diff --git a/src/clj/backtype/storm/daemon/executor.clj b/src/clj/backtype/storm/daemon/executor.clj index 33924303a..d4003ad80 100644 --- a/src/clj/backtype/storm/daemon/executor.clj +++ b/src/clj/backtype/storm/daemon/executor.clj @@ -111,8 +111,7 @@ (defn executor-type [^WorkerTopologyContext context component-id] (let [topology (.getRawTopology context) spouts (.get_spouts topology) - bolts (.get_bolts topology) - ] + bolts (.get_bolts topology)] (cond (contains? spouts component-id) :spout (contains? bolts component-id) :bolt :else (throw-runtime "Could not find " component-id " in topology " topology)))) @@ -182,7 +181,7 @@ (this task tuple nil) ))) -(defn executor-data [worker executor-id] +(defn mk-executor-data [worker executor-id] (let [worker-context (worker-context worker) task-ids (executor-id->tasks executor-id) component-id (.getComponentId worker-context (first task-ids)) @@ -253,7 +252,7 @@ (fn [] (disruptor/publish receive-queue - [[nil (TupleImpl. worker-context [interval] -1 Constants/METRICS_TICK_STREAM_ID)]])))))) + [[nil (TupleImpl. worker-context [interval] Constants/SYSTEM_TASK_ID Constants/METRICS_TICK_STREAM_ID)]])))))) (defn metrics-tick [executor-data task-datas ^TupleImpl tuple] (let [{:keys [interval->task->metric-registry ^WorkerTopologyContext worker-context]} executor-data @@ -293,11 +292,11 @@ (fn [] (disruptor/publish receive-queue - [[nil (TupleImpl. context [tick-time-secs] -1 Constants/SYSTEM_TICK_STREAM_ID)]] + [[nil (TupleImpl. context [tick-time-secs] Constants/SYSTEM_TASK_ID Constants/SYSTEM_TICK_STREAM_ID)]] ))))))) (defn mk-executor [worker executor-id] - (let [executor-data (executor-data worker executor-id) + (let [executor-data (mk-executor-data worker executor-id) _ (log-message "Loading executor " (:component-id executor-data) ":" (pr-str executor-id)) task-datas (->> executor-data :task-ids diff --git a/src/clj/backtype/storm/daemon/supervisor.clj b/src/clj/backtype/storm/daemon/supervisor.clj index 8083d82a7..99d0be0b9 100644 --- a/src/clj/backtype/storm/daemon/supervisor.clj +++ b/src/clj/backtype/storm/daemon/supervisor.clj @@ -80,8 +80,8 @@ (let [local-assignment (assigned-executors (:port worker-heartbeat))] (and local-assignment (= (:storm-id worker-heartbeat) (:storm-id local-assignment)) - (= (set (:executors worker-heartbeat)) (set (:executors local-assignment)))) - )) + (= (disj (set (:executors worker-heartbeat)) Constants/SYSTEM_EXECUTOR_ID) + (set (:executors local-assignment)))))) (defn read-allocated-workers "Returns map from worker id to worker heartbeat. if the heartbeat is nil, then the worker is dead (timed out or never wrote heartbeat)" diff --git a/src/clj/backtype/storm/daemon/worker.clj b/src/clj/backtype/storm/daemon/worker.clj index fbe83099c..a5e031386 100644 --- a/src/clj/backtype/storm/daemon/worker.clj +++ b/src/clj/backtype/storm/daemon/worker.clj @@ -9,15 +9,16 @@ (defmulti mk-suicide-fn cluster-mode) -(defn read-worker-executors [storm-cluster-state storm-id assignment-id port] +(defn read-worker-executors [storm-conf storm-cluster-state storm-id assignment-id port] (let [assignment (:executor->node+port (.assignment-info storm-cluster-state storm-id nil))] (doall + (concat + [Constants/SYSTEM_EXECUTOR_ID] (mapcat (fn [[executor loc]] - (if (= loc [assignment-id port]) - [executor] - )) - assignment)) - )) + (if (= loc [assignment-id port]) + [executor] + )) + assignment))))) (defnk do-executor-heartbeats [worker :executors nil] ;; stats is how we know what executors are assigned to this worker @@ -150,7 +151,7 @@ (let [cluster-state (cluster/mk-distributed-cluster-state conf) storm-cluster-state (cluster/mk-storm-cluster-state cluster-state) storm-conf (read-supervisor-storm-conf conf storm-id) - executors (set (read-worker-executors storm-cluster-state storm-id assignment-id port)) + executors (set (read-worker-executors storm-conf storm-cluster-state storm-id assignment-id port)) transfer-queue (disruptor/disruptor-queue (storm-conf TOPOLOGY-TRANSFER-BUFFER-SIZE) :wait-strategy (storm-conf TOPOLOGY-DISRUPTOR-WAIT-STRATEGY)) executor-receive-queue-map (mk-receive-queue-map storm-conf executors) diff --git a/src/jvm/backtype/storm/Constants.java b/src/jvm/backtype/storm/Constants.java index 705278986..a8ade3c53 100644 --- a/src/jvm/backtype/storm/Constants.java +++ b/src/jvm/backtype/storm/Constants.java @@ -1,11 +1,14 @@ package backtype.storm; import backtype.storm.coordination.CoordinatedBolt; +import clojure.lang.RT; public class Constants { public static final String COORDINATED_STREAM_ID = CoordinatedBolt.class.getName() + "/coord-stream"; + public static final long SYSTEM_TASK_ID = -1; + public static final Object SYSTEM_EXECUTOR_ID = RT.readString("[-1 -1]"); public static final String SYSTEM_COMPONENT_ID = "__system"; public static final String SYSTEM_TICK_STREAM_ID = "__tick"; public static final String METRICS_COMPONENT_ID_PREFIX = "__metrics"; diff --git a/src/jvm/backtype/storm/metric/SystemBolt.java b/src/jvm/backtype/storm/metric/SystemBolt.java new file mode 100644 index 000000000..da8bd2b96 --- /dev/null +++ b/src/jvm/backtype/storm/metric/SystemBolt.java @@ -0,0 +1,139 @@ +package backtype.storm.metric; + +import backtype.storm.Config; +import backtype.storm.metric.api.AssignableMetric; +import backtype.storm.metric.api.IMetric; +import backtype.storm.task.IBolt; +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.tuple.Tuple; +import clojure.lang.AFn; +import clojure.lang.IFn; +import clojure.lang.RT; +import com.google.common.collect.ImmutableMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.management.*; +import java.util.List; +import java.util.Map; + + +// There is one task inside one executor for each worker of the topology. +// TaskID is always -1, therefore you can only send-unanchored tuples to co-located SystemBolt. +// This bolt was conceived to export worker stats via metrics api. +public class SystemBolt implements IBolt { + private static Logger LOG = LoggerFactory.getLogger(SystemBolt.class); + private static boolean _prepareWasCalled = false; + + private static class MemoryUsageMetric implements IMetric { + IFn _getUsage; + public MemoryUsageMetric(IFn getUsage) { + _getUsage = getUsage; + } + @Override + public Object getValueAndReset() { + MemoryUsage memUsage = (MemoryUsage)_getUsage.invoke(); + return ImmutableMap.builder() + .put("maxBytes", memUsage.getMax()) + .put("committedBytes", memUsage.getCommitted()) + .put("initBytes", memUsage.getInit()) + .put("usedBytes", memUsage.getUsed()) + .put("virtualFreeBytes", memUsage.getMax() - memUsage.getUsed()) + .put("unusedBytes", memUsage.getCommitted() - memUsage.getUsed()) + .build(); + } + } + + // canonically the metrics data exported is time bucketed when doing counts. + // convert the absolute values here into time buckets. + private static class GarbageCollectorMetric implements IMetric { + GarbageCollectorMXBean _gcBean; + Long _collectionCount; + Long _collectionTime; + public GarbageCollectorMetric(GarbageCollectorMXBean gcBean) { + _gcBean = gcBean; + } + @Override + public Object getValueAndReset() { + Long collectionCountP = _gcBean.getCollectionCount(); + Long collectionTimeP = _gcBean.getCollectionCount(); + + Map ret = null; + if(_collectionCount!=null && _collectionTime!=null) { + ret = ImmutableMap.builder() + .put("count", collectionCountP - _collectionCount) + .put("timeMs", collectionTimeP - _collectionTime) + .build(); + } + + _collectionCount = collectionCountP; + _collectionTime = collectionTimeP; + return ret; + } + } + + @Override + public void prepare(final Map stormConf, TopologyContext context, OutputCollector collector) { + if(_prepareWasCalled && stormConf.get(Config.STORM_CLUSTER_MODE) != "local") { + throw new RuntimeException("A single worker should have 1 SystemBolt instance."); + } + _prepareWasCalled = true; + + int bucketSize = RT.intCast(stormConf.get(Config.TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS)); + + final RuntimeMXBean jvmRT = ManagementFactory.getRuntimeMXBean(); + + context.registerMetric("uptimeSecs", new IMetric() { + @Override + public Object getValueAndReset() { + return jvmRT.getUptime()/1000.0; + } + }, bucketSize); + + context.registerMetric("startTimeSecs", new IMetric() { + @Override + public Object getValueAndReset() { + return jvmRT.getStartTime()/1000.0; + } + }, bucketSize); + + context.registerMetric("newWorkerEvent", new IMetric() { + boolean doEvent = true; + + @Override + public Object getValueAndReset() { + if (doEvent) { + doEvent = false; + return 1; + } else return 0; + } + }, bucketSize); + + final MemoryMXBean jvmMemRT = ManagementFactory.getMemoryMXBean(); + + context.registerMetric("memory/heap", new MemoryUsageMetric(new AFn() { + public Object invoke() { + return jvmMemRT.getHeapMemoryUsage(); + } + }), bucketSize); + context.registerMetric("memory/nonHeap", new MemoryUsageMetric(new AFn() { + public Object invoke() { + return jvmMemRT.getNonHeapMemoryUsage(); + } + }), bucketSize); + + for(GarbageCollectorMXBean b : ManagementFactory.getGarbageCollectorMXBeans()) { + context.registerMetric("GC/" + b.getName().replaceAll("\\W", ""), new GarbageCollectorMetric(b), bucketSize); + } + } + + @Override + public void execute(Tuple input) { + throw new RuntimeException("Non-system tuples should never be sent to __system bolt."); + } + + @Override + public void cleanup() { + } +} diff --git a/src/jvm/backtype/storm/metric/api/CountMetric.java b/src/jvm/backtype/storm/metric/api/CountMetric.java index edf3a59f0..7a8f829cc 100644 --- a/src/jvm/backtype/storm/metric/api/CountMetric.java +++ b/src/jvm/backtype/storm/metric/api/CountMetric.java @@ -2,7 +2,7 @@ import backtype.storm.metric.api.IMetric; -public class CountMetric implements IMetric, java.io.Serializable { +public class CountMetric implements IMetric { long _value = 0; public CountMetric() { diff --git a/src/jvm/backtype/storm/metric/api/MultiCountMetric.java b/src/jvm/backtype/storm/metric/api/MultiCountMetric.java index 2649beeaf..02473ca6a 100644 --- a/src/jvm/backtype/storm/metric/api/MultiCountMetric.java +++ b/src/jvm/backtype/storm/metric/api/MultiCountMetric.java @@ -4,7 +4,7 @@ import java.util.HashMap; import java.util.Map; -public class MultiCountMetric implements IMetric, java.io.Serializable { +public class MultiCountMetric implements IMetric { Map _value = new HashMap(); public MultiCountMetric() { diff --git a/src/jvm/backtype/storm/spout/NoOpSpout.java b/src/jvm/backtype/storm/spout/NoOpSpout.java deleted file mode 100644 index 03586dcae..000000000 --- a/src/jvm/backtype/storm/spout/NoOpSpout.java +++ /dev/null @@ -1,36 +0,0 @@ -package backtype.storm.spout; - -import backtype.storm.task.TopologyContext; -import java.util.Map; - - -public class NoOpSpout implements ISpout { - @Override - public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { - } - - @Override - public void close() { - } - - @Override - public void activate() { - } - - @Override - public void deactivate() { - } - - @Override - public void nextTuple() { - } - - @Override - public void ack(Object msgId) { - } - - @Override - public void fail(Object msgId) { - } - -} diff --git a/src/jvm/backtype/storm/task/GeneralTopologyContext.java b/src/jvm/backtype/storm/task/GeneralTopologyContext.java index e9e638e38..3065b23d6 100644 --- a/src/jvm/backtype/storm/task/GeneralTopologyContext.java +++ b/src/jvm/backtype/storm/task/GeneralTopologyContext.java @@ -63,7 +63,7 @@ public StormTopology getRawTopology() { * @return the component id for the input task id */ public String getComponentId(int taskId) { - if(taskId==-1) { + if(taskId==Constants.SYSTEM_TASK_ID) { return Constants.SYSTEM_COMPONENT_ID; } else { return _taskToComponent.get(taskId); diff --git a/test/clj/backtype/storm/metrics_test.clj b/test/clj/backtype/storm/metrics_test.clj index 30ba9a768..62d7af239 100644 --- a/test/clj/backtype/storm/metrics_test.clj +++ b/test/clj/backtype/storm/metrics_test.clj @@ -75,6 +75,9 @@ (wait-for-atleast-N-buckets! N# ~comp-id ~metric-name) (is (= ~expected (subvec (lookup-bucket-by-comp-id-&-metric-name! ~comp-id ~metric-name) 0 N#)))))) +(defmacro assert-metric-data-exists! [comp-id metric-name] + `(is (not-empty (lookup-bucket-by-comp-id-&-metric-name! ~comp-id ~metric-name)))) + (deftest test-custom-metric (with-simulated-time-local-cluster [cluster :daemon-conf {TOPOLOGY-METRICS-CONSUMER-REGISTER @@ -103,7 +106,7 @@ (deftest test-builtin-metrics-1 (with-simulated-time-local-cluster - [cluster :daemon-conf {TOPOLOGY-METRICS-CONSUMER-REGISTER + [cluster :daemon-conf {TOPOLOGY-METRICS-CONSUMER-REGISTER [{"class" "clojure.storm.metric.testing.FakeMetricConsumer"}] TOPOLOGY-STATS-SAMPLE-RATE 1.0 TOPOLOGY-BUILTIN-METRICS-BUCKET-SIZE-SECS 60}] @@ -142,7 +145,6 @@ (with-simulated-time-local-cluster [cluster :daemon-conf {TOPOLOGY-METRICS-CONSUMER-REGISTER [{"class" "clojure.storm.metric.testing.FakeMetricConsumer"}] - TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS true TOPOLOGY-STATS-SAMPLE-RATE 1.0 TOPOLOGY-BUILTIN-METRICS-BUCKET-SIZE-SECS 5}] (let [feeder (feeder-spout ["field1"]) @@ -153,7 +155,7 @@ {"mybolt" (thrift/mk-bolt-spec {"myspout" :shuffle} ack-every-other)})] (submit-local-topology (:nimbus cluster) "metrics-tester" - {TOPOLOGY-MESSAGE-TIMEOUT-SECS 20} + {} topology) (.feed feeder ["a"] 1) @@ -175,11 +177,80 @@ (assert-buckets! "mybolt" "__ack-count/myspout:default" [1 0]) (assert-buckets! "mybolt" "__execute-count/myspout:default" [1 1]) - (advance-cluster-time cluster 30) - (assert-failed tracker 2) - (assert-buckets! "myspout" "__fail-count/default" [1]) + (advance-cluster-time cluster 15) (assert-buckets! "myspout" "__ack-count/default" [1 0 0 0 0]) (assert-buckets! "myspout" "__emit-count/default" [1 1 0 0 0]) - (assert-buckets! "myspout" "__transfer-count/default" [1 1 0 0 0]) + (assert-buckets! "myspout" "__transfer-count/default" [1 1 0 0 0]) (assert-buckets! "mybolt" "__ack-count/myspout:default" [1 0 0 0 0]) - (assert-buckets! "mybolt" "__execute-count/myspout:default" [1 1 0 0 0])))) + (assert-buckets! "mybolt" "__execute-count/myspout:default" [1 1 0 0 0]) + + (.feed feeder ["c"] 3) + (advance-cluster-time cluster 15) + (assert-buckets! "myspout" "__ack-count/default" [1 0 0 0 0 1 0 0]) + (assert-buckets! "myspout" "__emit-count/default" [1 1 0 0 0 1 0 0]) + (assert-buckets! "myspout" "__transfer-count/default" [1 1 0 0 0 1 0 0]) + (assert-buckets! "mybolt" "__ack-count/myspout:default" [1 0 0 0 0 1 0 0]) + (assert-buckets! "mybolt" "__execute-count/myspout:default" [1 1 0 0 0 1 0 0])))) + +(deftest test-builtin-metrics-3 + (with-simulated-time-local-cluster + [cluster :daemon-conf {TOPOLOGY-METRICS-CONSUMER-REGISTER + [{"class" "clojure.storm.metric.testing.FakeMetricConsumer"}] + TOPOLOGY-STATS-SAMPLE-RATE 1.0 + TOPOLOGY-BUILTIN-METRICS-BUCKET-SIZE-SECS 5 + TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS true}] + (let [feeder (feeder-spout ["field1"]) + tracker (AckFailMapTracker.) + _ (.setAckFailDelegate feeder tracker) + topology (thrift/mk-topology + {"myspout" (thrift/mk-spout-spec feeder)} + {"mybolt" (thrift/mk-bolt-spec {"myspout" :global} ack-every-other)})] + (submit-local-topology (:nimbus cluster) + "timeout-tester" + {TOPOLOGY-MESSAGE-TIMEOUT-SECS 10} + topology) + (.feed feeder ["a"] 1) + (.feed feeder ["b"] 2) + (.feed feeder ["c"] 3) + (advance-cluster-time cluster 9) + (assert-acked tracker 1 3) + (assert-buckets! "myspout" "__ack-count/default" [2]) + (assert-buckets! "myspout" "__emit-count/default" [3]) + (assert-buckets! "myspout" "__transfer-count/default" [3]) + (assert-buckets! "mybolt" "__ack-count/myspout:default" [2]) + (assert-buckets! "mybolt" "__execute-count/myspout:default" [3]) + + (is (not (.isFailed tracker 2))) + (advance-cluster-time cluster 30) + (assert-failed tracker 2) + (assert-buckets! "myspout" "__fail-count/default" [1]) + (assert-buckets! "myspout" "__ack-count/default" [2 0 0 0]) + (assert-buckets! "myspout" "__emit-count/default" [3 0 0 0]) + (assert-buckets! "myspout" "__transfer-count/default" [3 0 0 0]) + (assert-buckets! "mybolt" "__ack-count/myspout:default" [2 0 0 0]) + (assert-buckets! "mybolt" "__execute-count/myspout:default" [3 0 0 0])))) + +(deftest test-system-bolt + (with-simulated-time-local-cluster + [cluster :daemon-conf {TOPOLOGY-METRICS-CONSUMER-REGISTER + [{"class" "clojure.storm.metric.testing.FakeMetricConsumer"}] + TOPOLOGY-BUILTIN-METRICS-BUCKET-SIZE-SECS 60}] + (let [feeder (feeder-spout ["field1"]) + topology (thrift/mk-topology + {"1" (thrift/mk-spout-spec feeder)} + {})] + (submit-local-topology (:nimbus cluster) "metrics-tester" {} topology) + + (.feed feeder ["a"] 1) + (advance-cluster-time cluster 70) + (assert-buckets! "__system" "newWorkerEvent" [1]) + (assert-buckets! "__system" "configTopologyWorkers" [1]) + (assert-metric-data-exists! "__system" "uptimeSecs") + (assert-metric-data-exists! "__system" "startTimeSecs") + (assert-metric-data-exists! "__system" "topologyPartialUptimeSecs") + + (advance-cluster-time cluster 180) + (assert-buckets! "__system" "newWorkerEvent" [1 0 0 0]) + (assert-buckets! "__system" "configTopologyWorkers" [1 1 1 1])))) + +