Skip to content

Commit

Permalink
merge changes from nathanmarz/master
Browse files Browse the repository at this point in the history
  • Loading branch information
afeng committed Mar 26, 2013
2 parents 67cb631 + a6f409e commit 51ca0f1
Show file tree
Hide file tree
Showing 23 changed files with 357 additions and 101 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,18 @@
* Nimbus throws NotAliveException instead of FileNotFoundException from various query methods when topology is no longer alive (thanks revans2)
* Escape HTML and Javascript appropriately in Storm UI (thanks d2r)
* Storm's Zookeeper client now uses bounded exponential backoff strategy on failures
* Automatically drain and log error stream of multilang subprocesses
* Append component name to thread name of running executors so that logs are easier to read
* Bug fix: Supervisor provides full path to workers to logging config rather than relative path (thanks revans2)
* Bug fix: Call ReducerAggregator#init properly when used within persistentAggregate (thanks lorcan)
* Bug fix: Set component-specific configs correctly for Trident spouts

## 0.8.3 (unreleased)

* Revert zmq layer to not rely on multipart messages to fix issue reported by some users
* Bug fix: Fix TransactionalMap and OpaqueMap to correctly do multiple updates to the same key in the same batch
* Bug fix: Fix race condition between supervisor and Nimbus that could lead to stormconf.ser errors and infinite crashing of supervisor
* Bug fix: Fix default scheduler to always reassign workers in a constrained topology when there are dead executors

## 0.8.2

Expand Down
1 change: 1 addition & 0 deletions README.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ You must not remove this notice, or any other, from this software.
* Lorcan Coyle ([@lorcan](https://github.com/lorcan))
* Derek Dagit ([@d2r](https://github.com/d2r))
* Andrew Olson ([@noslowerdna](https://github.com/noslowerdna))
* Gavin Li ([@lyogavin](https://github.com/lyogavin))

## Acknowledgements

Expand Down
10 changes: 5 additions & 5 deletions bin/build_release.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ DIR=_release/storm-$RELEASE

rm -rf _release
rm -f *.zip
$LEIN with-profile release clean
$LEIN with-profile release deps
$LEIN with-profile release jar
$LEIN with-profile release pom
mvn dependency:copy-dependencies
$LEIN with-profile release clean || exit 1
$LEIN with-profile release deps || exit 1
$LEIN with-profile release jar || exit 1
$LEIN with-profile release pom || exit 1
mvn dependency:copy-dependencies || exit 1

mkdir -p $DIR/lib
cp target/storm-*.jar $DIR/storm-${RELEASE}.jar
Expand Down
1 change: 1 addition & 0 deletions project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
[com.googlecode.json-simple/json-simple "1.1"]
[compojure "1.1.3"]
[hiccup "0.3.6"]
[ring/ring-devel "0.3.11"]
[ring/ring-jetty-adapter "0.3.11"]
[org.clojure/tools.logging "0.2.3"]
[org.clojure/math.numeric-tower "0.0.1"]
Expand Down
34 changes: 18 additions & 16 deletions src/clj/backtype/storm/daemon/common.clj
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
(:import [backtype.storm.utils Utils])
(:import [backtype.storm.task WorkerTopologyContext])
(:import [backtype.storm Constants])
(:import [backtype.storm.spout NoOpSpout])
(:import [backtype.storm.metric SystemBolt])
(:require [clojure.set :as set])
(:require [backtype.storm.daemon.acker :as acker])
(:require [backtype.storm.thrift :as thrift])
Expand Down Expand Up @@ -241,8 +241,9 @@
(number-duplicates)
(map #(str Constants/METRICS_COMPONENT_ID_PREFIX %))))

(defn metrics-consumer-bolt-specs [components-ids-that-emit-metrics storm-conf]
(let [inputs (->> (for [comp-id components-ids-that-emit-metrics]
(defn metrics-consumer-bolt-specs [storm-conf topology]
(let [component-ids-that-emit-metrics (cons SYSTEM-COMPONENT-ID (keys (all-components topology)))
inputs (->> (for [comp-id component-ids-that-emit-metrics]
{[comp-id METRICS-STREAM-ID] :shuffle})
(into {}))

Expand All @@ -261,27 +262,28 @@
(metrics-consumer-register-ids storm-conf)
(get storm-conf TOPOLOGY-METRICS-CONSUMER-REGISTER))))

(defn add-metric-components! [storm-conf ^StormTopology topology]
(doseq [[comp-id bolt-spec] (metrics-consumer-bolt-specs (keys (all-components topology)) storm-conf)]
(defn add-metric-components! [storm-conf ^StormTopology topology]
(doseq [[comp-id bolt-spec] (metrics-consumer-bolt-specs storm-conf topology)]
(.put_to_bolts topology comp-id bolt-spec)))

(defn add-system-components! [^StormTopology topology]
(let [system-spout (thrift/mk-spout-spec*
(NoOpSpout.)
{SYSTEM-TICK-STREAM-ID (thrift/output-fields ["rate_secs"])
METRICS-TICK-STREAM-ID (thrift/output-fields ["interval"])}
:p 0
:conf {TOPOLOGY-TASKS 0})]
(.put_to_spouts topology SYSTEM-COMPONENT-ID system-spout)))
(defn add-system-components! [conf ^StormTopology topology]
(let [system-bolt-spec (thrift/mk-bolt-spec*
{}
(SystemBolt.)
{SYSTEM-TICK-STREAM-ID (thrift/output-fields ["rate_secs"])
METRICS-TICK-STREAM-ID (thrift/output-fields ["interval"])}
:p 0
:conf {TOPOLOGY-TASKS 0})]
(.put_to_bolts topology SYSTEM-COMPONENT-ID system-bolt-spec)))

(defn system-topology! [storm-conf ^StormTopology topology]
(validate-basic! topology)
(let [ret (.deepCopy topology)]
(add-acker! storm-conf ret)
(add-metric-components! storm-conf ret)
(add-metric-streams! ret)
(add-metric-components! storm-conf ret)
(add-system-components! storm-conf ret)
(add-metric-streams! ret)
(add-system-streams! ret)
(add-system-components! ret)
(validate-structure! ret)
ret
))
Expand Down
11 changes: 5 additions & 6 deletions src/clj/backtype/storm/daemon/executor.clj
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,7 @@
(defn executor-type [^WorkerTopologyContext context component-id]
(let [topology (.getRawTopology context)
spouts (.get_spouts topology)
bolts (.get_bolts topology)
]
bolts (.get_bolts topology)]
(cond (contains? spouts component-id) :spout
(contains? bolts component-id) :bolt
:else (throw-runtime "Could not find " component-id " in topology " topology))))
Expand Down Expand Up @@ -182,7 +181,7 @@
(this task tuple nil)
)))

(defn executor-data [worker executor-id]
(defn mk-executor-data [worker executor-id]
(let [worker-context (worker-context worker)
task-ids (executor-id->tasks executor-id)
component-id (.getComponentId worker-context (first task-ids))
Expand Down Expand Up @@ -253,7 +252,7 @@
(fn []
(disruptor/publish
receive-queue
[[nil (TupleImpl. worker-context [interval] -1 Constants/METRICS_TICK_STREAM_ID)]]))))))
[[nil (TupleImpl. worker-context [interval] Constants/SYSTEM_TASK_ID Constants/METRICS_TICK_STREAM_ID)]]))))))

(defn metrics-tick [executor-data task-datas ^TupleImpl tuple]
(let [{:keys [interval->task->metric-registry ^WorkerTopologyContext worker-context]} executor-data
Expand Down Expand Up @@ -293,11 +292,11 @@
(fn []
(disruptor/publish
receive-queue
[[nil (TupleImpl. context [tick-time-secs] -1 Constants/SYSTEM_TICK_STREAM_ID)]]
[[nil (TupleImpl. context [tick-time-secs] Constants/SYSTEM_TASK_ID Constants/SYSTEM_TICK_STREAM_ID)]]
)))))))

(defn mk-executor [worker executor-id]
(let [executor-data (executor-data worker executor-id)
(let [executor-data (mk-executor-data worker executor-id)
_ (log-message "Loading executor " (:component-id executor-data) ":" (pr-str executor-id))
task-datas (->> executor-data
:task-ids
Expand Down
4 changes: 2 additions & 2 deletions src/clj/backtype/storm/daemon/supervisor.clj
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@
(let [local-assignment (assigned-executors (:port worker-heartbeat))]
(and local-assignment
(= (:storm-id worker-heartbeat) (:storm-id local-assignment))
(= (set (:executors worker-heartbeat)) (set (:executors local-assignment))))
))
(= (disj (set (:executors worker-heartbeat)) Constants/SYSTEM_EXECUTOR_ID)
(set (:executors local-assignment))))))

(defn read-allocated-workers
"Returns map from worker id to worker heartbeat. if the heartbeat is nil, then the worker is dead (timed out or never wrote heartbeat)"
Expand Down
15 changes: 8 additions & 7 deletions src/clj/backtype/storm/daemon/worker.clj
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,16 @@

(defmulti mk-suicide-fn cluster-mode)

(defn read-worker-executors [storm-cluster-state storm-id assignment-id port]
(defn read-worker-executors [storm-conf storm-cluster-state storm-id assignment-id port]
(let [assignment (:executor->node+port (.assignment-info storm-cluster-state storm-id nil))]
(doall
(concat
[Constants/SYSTEM_EXECUTOR_ID]
(mapcat (fn [[executor loc]]
(if (= loc [assignment-id port])
[executor]
))
assignment))
))
(if (= loc [assignment-id port])
[executor]
))
assignment)))))

(defnk do-executor-heartbeats [worker :executors nil]
;; stats is how we know what executors are assigned to this worker
Expand Down Expand Up @@ -152,7 +153,7 @@
(let [cluster-state (cluster/mk-distributed-cluster-state conf)
storm-cluster-state (cluster/mk-storm-cluster-state cluster-state)
storm-conf (read-supervisor-storm-conf conf storm-id)
executors (set (read-worker-executors storm-cluster-state storm-id assignment-id port))
executors (set (read-worker-executors storm-conf storm-cluster-state storm-id assignment-id port))
transfer-queue (disruptor/disruptor-queue (storm-conf TOPOLOGY-TRANSFER-BUFFER-SIZE)
:wait-strategy (storm-conf TOPOLOGY-DISRUPTOR-WAIT-STRATEGY))
executor-receive-queue-map (mk-receive-queue-map storm-conf executors)
Expand Down
15 changes: 10 additions & 5 deletions src/clj/backtype/storm/messaging/zmq.clj
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@
(.array bb)
))

(defn mk-packet [task ^bytes message]
(let [bb (ByteBuffer/allocate (+ 2 (count message)))]
(.putShort bb (short task))
(.put bb message)
(.array bb)
))

(defn parse-packet [^bytes packet]
(let [bb (ByteBuffer/wrap packet)
port (.getShort bb)
Expand All @@ -37,9 +44,7 @@
(defprotocol ZMQContextQuery
(zmq-context [this]))

(def NOBLOCK-SNDMORE (bit-or ZMQ/SNDMORE ZMQ/NOBLOCK))

(deftype ZMQConnection [socket ^ByteBuffer bb]
(deftype ZMQConnection [socket]
IConnection
(^TaskMessage recv [this ^int flags]
(log-debug "ZMQConnection recv()")
Expand All @@ -55,7 +60,7 @@
(.close socket)))

(defn mk-connection [socket]
(ZMQConnection. socket (ByteBuffer/allocate 2)))
(ZMQConnection. socket))

(deftype ZMQContext [^{:volatile-mutable true} context
^{:volatile-mutable true} linger-ms
Expand Down Expand Up @@ -94,4 +99,4 @@
(deftype TransportPlugin []
ITransport
(^IContext newContext [this]
(ZMQContext. 0 0 0 true)))
(ZMQContext. 0 0 0 true)))
4 changes: 3 additions & 1 deletion src/clj/backtype/storm/scheduler/DefaultScheduler.clj
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,12 @@
(map #(vector (.getStartTask %) (.getEndTask %)))
set)
alive-assigned (EvenScheduler/get-alive-assigned-node+port->executors cluster topology-id)
alive-executors (->> alive-assigned vals (apply concat) set)
can-reassign-slots (slots-can-reassign cluster (keys alive-assigned))
total-slots-to-use (min (.getNumWorkers topology)
(+ (count can-reassign-slots) (count available-slots)))
bad-slots (if (> total-slots-to-use (count alive-assigned))
bad-slots (if (or (> total-slots-to-use (count alive-assigned))
(not= alive-executors all-executors))
(bad-slots alive-assigned (count all-executors) total-slots-to-use)
[])]]
(.freeSlots cluster bad-slots)
Expand Down
10 changes: 0 additions & 10 deletions src/clj/backtype/storm/scheduler/IsolationScheduler.clj
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,6 @@
(defn -prepare [this conf]
(container-set! (.state this) conf))


(defn- compute-worker-specs "Returns list of sets of executors"
[^TopologyDetails details]
(->> (.getExecutorToComponent details)
reverse-map
(map second)
(apply interleave-all)
(partition-fixed (.getNumWorkers details))
(map set)))

(defn- compute-worker-specs "Returns mutable set of sets of executors"
[^TopologyDetails details]
(->> (.getExecutorToComponent details)
Expand Down
5 changes: 4 additions & 1 deletion src/clj/backtype/storm/ui/core.clj
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
(ns backtype.storm.ui.core
(:use compojure.core)
(:use ring.middleware.reload)
(:use [hiccup core page-helpers])
(:use [backtype.storm config util log])
(:use [backtype.storm.ui helpers])
Expand Down Expand Up @@ -798,7 +799,9 @@
))))

(def app
(handler/site (-> main-routes catch-errors )))
(-> #'main-routes
(wrap-reload '[backtype.storm.ui.core])
catch-errors))

(defn start-server! [] (run-jetty app {:port (Integer. (*STORM-CONF* UI-PORT))
:join? false}))
Expand Down
3 changes: 3 additions & 0 deletions src/jvm/backtype/storm/Constants.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package backtype.storm;

import backtype.storm.coordination.CoordinatedBolt;
import clojure.lang.RT;


public class Constants {
public static final String COORDINATED_STREAM_ID = CoordinatedBolt.class.getName() + "/coord-stream";

public static final long SYSTEM_TASK_ID = -1;
public static final Object SYSTEM_EXECUTOR_ID = RT.readString("[-1 -1]");
public static final String SYSTEM_COMPONENT_ID = "__system";
public static final String SYSTEM_TICK_STREAM_ID = "__tick";
public static final String METRICS_COMPONENT_ID_PREFIX = "__metrics";
Expand Down
Loading

0 comments on commit 51ca0f1

Please sign in to comment.