Skip to content

Commit

Permalink
fix default scheduler to always reassign topology if some of its exec…
Browse files Browse the repository at this point in the history
…utors are dead
  • Loading branch information
nathanmarz committed Mar 26, 2013
1 parent 87828f7 commit 92586a4
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 1 deletion.
4 changes: 3 additions & 1 deletion src/clj/backtype/storm/scheduler/DefaultScheduler.clj
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,12 @@
(map #(vector (.getStartTask %) (.getEndTask %)))
set)
alive-assigned (EvenScheduler/get-alive-assigned-node+port->executors cluster topology-id)
alive-executors (->> alive-assigned vals (apply concat) set)
can-reassign-slots (slots-can-reassign cluster (keys alive-assigned))
total-slots-to-use (min (.getNumWorkers topology)
(+ (count can-reassign-slots) (count available-slots)))
bad-slots (if (> total-slots-to-use (count alive-assigned))
bad-slots (if (or (> total-slots-to-use (count alive-assigned))
(not= alive-executors all-executors))
(bad-slots alive-assigned (count all-executors) total-slots-to-use)
[])]]
(.freeSlots cluster bad-slots)
Expand Down
51 changes: 51 additions & 0 deletions test/clj/backtype/storm/nimbus_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,57 @@
(check-consistency cluster "test")
)))


(deftest test-reassignment-to-constrained-cluster
(with-simulated-time-local-cluster [cluster :supervisors 0
:daemon-conf {SUPERVISOR-ENABLE false
NIMBUS-TASK-LAUNCH-SECS 60
NIMBUS-TASK-TIMEOUT-SECS 20
NIMBUS-MONITOR-FREQ-SECS 10
NIMBUS-SUPERVISOR-TIMEOUT-SECS 100
TOPOLOGY-ACKER-EXECUTORS 0}]
(letlocals
(add-supervisor cluster :ports 1 :id "a")
(add-supervisor cluster :ports 1 :id "b")
(bind conf (:daemon-conf cluster))
(bind topology (thrift/mk-topology
{"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 2)}
{}
))
(bind state (:storm-cluster-state cluster))
(submit-local-topology (:nimbus cluster) "test" {TOPOLOGY-WORKERS 2} topology)
(check-consistency cluster "test")
(bind storm-id (get-storm-id state "test"))
(bind [executor-id1 executor-id2] (topology-executors cluster storm-id))
(bind ass1 (executor-assignment cluster storm-id executor-id1))
(bind ass2 (executor-assignment cluster storm-id executor-id2))

(advance-cluster-time cluster 59)
(do-executor-heartbeat cluster storm-id executor-id1)
(do-executor-heartbeat cluster storm-id executor-id2)

(advance-cluster-time cluster 13)
(is (= ass1 (executor-assignment cluster storm-id executor-id1)))
(is (= ass2 (executor-assignment cluster storm-id executor-id2)))
(kill-supervisor cluster "b")
(do-executor-heartbeat cluster storm-id executor-id1)

(advance-cluster-time cluster 11)
(do-executor-heartbeat cluster storm-id executor-id1)

(advance-cluster-time cluster 11)
(do-executor-heartbeat cluster storm-id executor-id1)

(advance-cluster-time cluster 11)
(do-executor-heartbeat cluster storm-id executor-id1)

(advance-cluster-time cluster 11)
(do-executor-heartbeat cluster storm-id executor-id1)

(check-consistency cluster "test")
(is (= 1 (storm-num-workers state "test")))
)))

(defn check-executor-distribution [slot-executors distribution]
(check-distribution (vals slot-executors) distribution))

Expand Down

0 comments on commit 92586a4

Please sign in to comment.