Skip to content

Commit

Permalink
fix conflict
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanmarz committed Mar 26, 2013
2 parents 92586a4 + b12a1b9 commit e61bbf4
Show file tree
Hide file tree
Showing 12 changed files with 262 additions and 83 deletions.
10 changes: 5 additions & 5 deletions bin/build_release.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ DIR=_release/storm-$RELEASE

rm -rf _release
rm -f *.zip
$LEIN with-profile release clean
$LEIN with-profile release deps
$LEIN with-profile release jar
$LEIN with-profile release pom
mvn dependency:copy-dependencies
$LEIN with-profile release clean || exit 1
$LEIN with-profile release deps || exit 1
$LEIN with-profile release jar || exit 1
$LEIN with-profile release pom || exit 1
mvn dependency:copy-dependencies || exit 1

mkdir -p $DIR/lib
cp target/storm-*.jar $DIR/storm-${RELEASE}.jar
Expand Down
34 changes: 18 additions & 16 deletions src/clj/backtype/storm/daemon/common.clj
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
(:import [backtype.storm.utils Utils])
(:import [backtype.storm.task WorkerTopologyContext])
(:import [backtype.storm Constants])
(:import [backtype.storm.spout NoOpSpout])
(:import [backtype.storm.metric SystemBolt])
(:require [clojure.set :as set])
(:require [backtype.storm.daemon.acker :as acker])
(:require [backtype.storm.thrift :as thrift])
Expand Down Expand Up @@ -241,8 +241,9 @@
(number-duplicates)
(map #(str Constants/METRICS_COMPONENT_ID_PREFIX %))))

(defn metrics-consumer-bolt-specs [components-ids-that-emit-metrics storm-conf]
(let [inputs (->> (for [comp-id components-ids-that-emit-metrics]
(defn metrics-consumer-bolt-specs [storm-conf topology]
(let [component-ids-that-emit-metrics (cons SYSTEM-COMPONENT-ID (keys (all-components topology)))
inputs (->> (for [comp-id component-ids-that-emit-metrics]
{[comp-id METRICS-STREAM-ID] :shuffle})
(into {}))

Expand All @@ -261,27 +262,28 @@
(metrics-consumer-register-ids storm-conf)
(get storm-conf TOPOLOGY-METRICS-CONSUMER-REGISTER))))

(defn add-metric-components! [storm-conf ^StormTopology topology]
(doseq [[comp-id bolt-spec] (metrics-consumer-bolt-specs (keys (all-components topology)) storm-conf)]
(defn add-metric-components! [storm-conf ^StormTopology topology]
(doseq [[comp-id bolt-spec] (metrics-consumer-bolt-specs storm-conf topology)]
(.put_to_bolts topology comp-id bolt-spec)))

(defn add-system-components! [^StormTopology topology]
(let [system-spout (thrift/mk-spout-spec*
(NoOpSpout.)
{SYSTEM-TICK-STREAM-ID (thrift/output-fields ["rate_secs"])
METRICS-TICK-STREAM-ID (thrift/output-fields ["interval"])}
:p 0
:conf {TOPOLOGY-TASKS 0})]
(.put_to_spouts topology SYSTEM-COMPONENT-ID system-spout)))
(defn add-system-components! [conf ^StormTopology topology]
(let [system-bolt-spec (thrift/mk-bolt-spec*
{}
(SystemBolt.)
{SYSTEM-TICK-STREAM-ID (thrift/output-fields ["rate_secs"])
METRICS-TICK-STREAM-ID (thrift/output-fields ["interval"])}
:p 0
:conf {TOPOLOGY-TASKS 0})]
(.put_to_bolts topology SYSTEM-COMPONENT-ID system-bolt-spec)))

(defn system-topology! [storm-conf ^StormTopology topology]
(validate-basic! topology)
(let [ret (.deepCopy topology)]
(add-acker! storm-conf ret)
(add-metric-components! storm-conf ret)
(add-metric-streams! ret)
(add-metric-components! storm-conf ret)
(add-system-components! storm-conf ret)
(add-metric-streams! ret)
(add-system-streams! ret)
(add-system-components! ret)
(validate-structure! ret)
ret
))
Expand Down
11 changes: 5 additions & 6 deletions src/clj/backtype/storm/daemon/executor.clj
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,7 @@
(defn executor-type [^WorkerTopologyContext context component-id]
(let [topology (.getRawTopology context)
spouts (.get_spouts topology)
bolts (.get_bolts topology)
]
bolts (.get_bolts topology)]
(cond (contains? spouts component-id) :spout
(contains? bolts component-id) :bolt
:else (throw-runtime "Could not find " component-id " in topology " topology))))
Expand Down Expand Up @@ -182,7 +181,7 @@
(this task tuple nil)
)))

(defn executor-data [worker executor-id]
(defn mk-executor-data [worker executor-id]
(let [worker-context (worker-context worker)
task-ids (executor-id->tasks executor-id)
component-id (.getComponentId worker-context (first task-ids))
Expand Down Expand Up @@ -253,7 +252,7 @@
(fn []
(disruptor/publish
receive-queue
[[nil (TupleImpl. worker-context [interval] -1 Constants/METRICS_TICK_STREAM_ID)]]))))))
[[nil (TupleImpl. worker-context [interval] Constants/SYSTEM_TASK_ID Constants/METRICS_TICK_STREAM_ID)]]))))))

(defn metrics-tick [executor-data task-datas ^TupleImpl tuple]
(let [{:keys [interval->task->metric-registry ^WorkerTopologyContext worker-context]} executor-data
Expand Down Expand Up @@ -293,11 +292,11 @@
(fn []
(disruptor/publish
receive-queue
[[nil (TupleImpl. context [tick-time-secs] -1 Constants/SYSTEM_TICK_STREAM_ID)]]
[[nil (TupleImpl. context [tick-time-secs] Constants/SYSTEM_TASK_ID Constants/SYSTEM_TICK_STREAM_ID)]]
)))))))

(defn mk-executor [worker executor-id]
(let [executor-data (executor-data worker executor-id)
(let [executor-data (mk-executor-data worker executor-id)
_ (log-message "Loading executor " (:component-id executor-data) ":" (pr-str executor-id))
task-datas (->> executor-data
:task-ids
Expand Down
4 changes: 2 additions & 2 deletions src/clj/backtype/storm/daemon/supervisor.clj
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@
(let [local-assignment (assigned-executors (:port worker-heartbeat))]
(and local-assignment
(= (:storm-id worker-heartbeat) (:storm-id local-assignment))
(= (set (:executors worker-heartbeat)) (set (:executors local-assignment))))
))
(= (disj (set (:executors worker-heartbeat)) Constants/SYSTEM_EXECUTOR_ID)
(set (:executors local-assignment))))))

(defn read-allocated-workers
"Returns map from worker id to worker heartbeat. if the heartbeat is nil, then the worker is dead (timed out or never wrote heartbeat)"
Expand Down
15 changes: 8 additions & 7 deletions src/clj/backtype/storm/daemon/worker.clj
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,16 @@

(defmulti mk-suicide-fn cluster-mode)

(defn read-worker-executors [storm-cluster-state storm-id assignment-id port]
(defn read-worker-executors [storm-conf storm-cluster-state storm-id assignment-id port]
(let [assignment (:executor->node+port (.assignment-info storm-cluster-state storm-id nil))]
(doall
(concat
[Constants/SYSTEM_EXECUTOR_ID]
(mapcat (fn [[executor loc]]
(if (= loc [assignment-id port])
[executor]
))
assignment))
))
(if (= loc [assignment-id port])
[executor]
))
assignment)))))

(defnk do-executor-heartbeats [worker :executors nil]
;; stats is how we know what executors are assigned to this worker
Expand Down Expand Up @@ -150,7 +151,7 @@
(let [cluster-state (cluster/mk-distributed-cluster-state conf)
storm-cluster-state (cluster/mk-storm-cluster-state cluster-state)
storm-conf (read-supervisor-storm-conf conf storm-id)
executors (set (read-worker-executors storm-cluster-state storm-id assignment-id port))
executors (set (read-worker-executors storm-conf storm-cluster-state storm-id assignment-id port))
transfer-queue (disruptor/disruptor-queue (storm-conf TOPOLOGY-TRANSFER-BUFFER-SIZE)
:wait-strategy (storm-conf TOPOLOGY-DISRUPTOR-WAIT-STRATEGY))
executor-receive-queue-map (mk-receive-queue-map storm-conf executors)
Expand Down
3 changes: 3 additions & 0 deletions src/jvm/backtype/storm/Constants.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package backtype.storm;

import backtype.storm.coordination.CoordinatedBolt;
import clojure.lang.RT;


public class Constants {
public static final String COORDINATED_STREAM_ID = CoordinatedBolt.class.getName() + "/coord-stream";

public static final long SYSTEM_TASK_ID = -1;
public static final Object SYSTEM_EXECUTOR_ID = RT.readString("[-1 -1]");
public static final String SYSTEM_COMPONENT_ID = "__system";
public static final String SYSTEM_TICK_STREAM_ID = "__tick";
public static final String METRICS_COMPONENT_ID_PREFIX = "__metrics";
Expand Down
139 changes: 139 additions & 0 deletions src/jvm/backtype/storm/metric/SystemBolt.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package backtype.storm.metric;

import backtype.storm.Config;
import backtype.storm.metric.api.AssignableMetric;
import backtype.storm.metric.api.IMetric;
import backtype.storm.task.IBolt;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Tuple;
import clojure.lang.AFn;
import clojure.lang.IFn;
import clojure.lang.RT;
import com.google.common.collect.ImmutableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.management.*;
import java.util.List;
import java.util.Map;


// There is one task inside one executor for each worker of the topology.
// TaskID is always -1, therefore you can only send-unanchored tuples to co-located SystemBolt.
// This bolt was conceived to export worker stats via metrics api.
public class SystemBolt implements IBolt {
private static Logger LOG = LoggerFactory.getLogger(SystemBolt.class);
private static boolean _prepareWasCalled = false;

private static class MemoryUsageMetric implements IMetric {
IFn _getUsage;
public MemoryUsageMetric(IFn getUsage) {
_getUsage = getUsage;
}
@Override
public Object getValueAndReset() {
MemoryUsage memUsage = (MemoryUsage)_getUsage.invoke();
return ImmutableMap.builder()
.put("maxBytes", memUsage.getMax())
.put("committedBytes", memUsage.getCommitted())
.put("initBytes", memUsage.getInit())
.put("usedBytes", memUsage.getUsed())
.put("virtualFreeBytes", memUsage.getMax() - memUsage.getUsed())
.put("unusedBytes", memUsage.getCommitted() - memUsage.getUsed())
.build();
}
}

// canonically the metrics data exported is time bucketed when doing counts.
// convert the absolute values here into time buckets.
private static class GarbageCollectorMetric implements IMetric {
GarbageCollectorMXBean _gcBean;
Long _collectionCount;
Long _collectionTime;
public GarbageCollectorMetric(GarbageCollectorMXBean gcBean) {
_gcBean = gcBean;
}
@Override
public Object getValueAndReset() {
Long collectionCountP = _gcBean.getCollectionCount();
Long collectionTimeP = _gcBean.getCollectionCount();

Map ret = null;
if(_collectionCount!=null && _collectionTime!=null) {
ret = ImmutableMap.builder()
.put("count", collectionCountP - _collectionCount)
.put("timeMs", collectionTimeP - _collectionTime)
.build();
}

_collectionCount = collectionCountP;
_collectionTime = collectionTimeP;
return ret;
}
}

@Override
public void prepare(final Map stormConf, TopologyContext context, OutputCollector collector) {
if(_prepareWasCalled && stormConf.get(Config.STORM_CLUSTER_MODE) != "local") {
throw new RuntimeException("A single worker should have 1 SystemBolt instance.");
}
_prepareWasCalled = true;

int bucketSize = RT.intCast(stormConf.get(Config.TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS));

final RuntimeMXBean jvmRT = ManagementFactory.getRuntimeMXBean();

context.registerMetric("uptimeSecs", new IMetric() {
@Override
public Object getValueAndReset() {
return jvmRT.getUptime()/1000.0;
}
}, bucketSize);

context.registerMetric("startTimeSecs", new IMetric() {
@Override
public Object getValueAndReset() {
return jvmRT.getStartTime()/1000.0;
}
}, bucketSize);

context.registerMetric("newWorkerEvent", new IMetric() {
boolean doEvent = true;

@Override
public Object getValueAndReset() {
if (doEvent) {
doEvent = false;
return 1;
} else return 0;
}
}, bucketSize);

final MemoryMXBean jvmMemRT = ManagementFactory.getMemoryMXBean();

context.registerMetric("memory/heap", new MemoryUsageMetric(new AFn() {
public Object invoke() {
return jvmMemRT.getHeapMemoryUsage();
}
}), bucketSize);
context.registerMetric("memory/nonHeap", new MemoryUsageMetric(new AFn() {
public Object invoke() {
return jvmMemRT.getNonHeapMemoryUsage();
}
}), bucketSize);

for(GarbageCollectorMXBean b : ManagementFactory.getGarbageCollectorMXBeans()) {
context.registerMetric("GC/" + b.getName().replaceAll("\\W", ""), new GarbageCollectorMetric(b), bucketSize);
}
}

@Override
public void execute(Tuple input) {
throw new RuntimeException("Non-system tuples should never be sent to __system bolt.");
}

@Override
public void cleanup() {
}
}
2 changes: 1 addition & 1 deletion src/jvm/backtype/storm/metric/api/CountMetric.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import backtype.storm.metric.api.IMetric;

public class CountMetric implements IMetric, java.io.Serializable {
public class CountMetric implements IMetric {
long _value = 0;

public CountMetric() {
Expand Down
2 changes: 1 addition & 1 deletion src/jvm/backtype/storm/metric/api/MultiCountMetric.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import java.util.HashMap;
import java.util.Map;

public class MultiCountMetric implements IMetric, java.io.Serializable {
public class MultiCountMetric implements IMetric {
Map<String, CountMetric> _value = new HashMap();

public MultiCountMetric() {
Expand Down
36 changes: 0 additions & 36 deletions src/jvm/backtype/storm/spout/NoOpSpout.java

This file was deleted.

2 changes: 1 addition & 1 deletion src/jvm/backtype/storm/task/GeneralTopologyContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public StormTopology getRawTopology() {
* @return the component id for the input task id
*/
public String getComponentId(int taskId) {
if(taskId==-1) {
if(taskId==Constants.SYSTEM_TASK_ID) {
return Constants.SYSTEM_COMPONENT_ID;
} else {
return _taskToComponent.get(taskId);
Expand Down
Loading

0 comments on commit e61bbf4

Please sign in to comment.