Skip to content

Commit

Permalink
Fix group-by error when task has a group-by task and a non-group by task
Browse files Browse the repository at this point in the history
downstream.
  • Loading branch information
lbradstreet committed Apr 24, 2017
1 parent 5e26b35 commit 424a14f
Showing 1 changed file with 5 additions and 7 deletions.
12 changes: 5 additions & 7 deletions src/onyx/plugin/messaging_output.clj
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,12 @@
replica messenger]
(.clear ^java.util.ArrayList segments)
(let [;; generate this on each new replica / messenger
get-pub-fn (if (empty? task->group-by-fn)
(fn [segment dst-task-id]
(rand-nth (m/task->publishers messenger dst-task-id)))
(fn [segment dst-task-id]
(let [group-fn (task->group-by-fn dst-task-id)
hsh (hash (group-fn segment))
get-pub-fn (fn [segment dst-task-id]
(if-let [group-fn (task->group-by-fn dst-task-id)]
(let [hsh (hash (group-fn segment))
dest-pubs (m/task->publishers messenger dst-task-id)]
(get dest-pubs (mod hsh (count dest-pubs))))))
(get dest-pubs (mod hsh (count dest-pubs))))
(rand-nth (m/task->publishers messenger dst-task-id))))
;; TODO, avoid initial flattening preprocessing step
_ (run! (fn [{:keys [leaves] :as result}]
(run! (fn [seg]
Expand Down

0 comments on commit 424a14f

Please sign in to comment.