Skip to content

Commit

Permalink
engine: simplify in-memory aggregation logic. add instrumentation.
Browse files Browse the repository at this point in the history
This commit brings in a number of changes:

- [X] Handle input drift globally. A new component is introduced.
  to facilitate testing, the wall-clock is abstrated into
  its own component as well which the drift component depends
  on.
- [X] Simplify the way the engine holds on to values in memory.
  Values are stored in an atom and updates trigger a
  `swap!` on the atom, yielding snapshots.
- [X] Move queueing mechanism into a proper component
- [X] Allow per-queue capacity and pool-size parameters. A good
  rule of thumb is to make room for twice your average input
  size, which gives the system some time to cope with bursts.
- [X] Instrument critical operations, such as taking snapshots
  these are available as JMX by default. Configuration support
  will be provided to handle reporting destinations.
  • Loading branch information
pyr committed Oct 6, 2015
1 parent 155e90f commit 60b5df7
Show file tree
Hide file tree
Showing 6 changed files with 190 additions and 160 deletions.
3 changes: 2 additions & 1 deletion project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
[spootnik/unilog "0.7.8"]
[spootnik/uncaught "0.5.2"]
[instaparse "1.4.1"]
[metrics-clojure "2.5.1"]
[ring/ring-codec "1.0.0"]
[clj-yaml "0.4.0"]
[cc.qbits/jet "0.6.6"]
Expand All @@ -22,5 +23,5 @@
[net.jpountz.lz4/lz4 "1.3"]
[org.xerial.snappy/snappy-java "1.1.1.7"]
[com.stuartsierra/component "0.2.3"]
[io.netty/netty-all "4.0.30.Final"]
[io.netty/netty-all "4.0.32.Final"]
[org.clojure/core.async "0.1.346.0-17112a-alpha"]])
15 changes: 13 additions & 2 deletions src/io/cyanite.clj
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@
[io.cyanite.engine.queue :as queue]
[io.cyanite.store :as store]
[com.stuartsierra.component :as component]
[metrics.reporters.console :as console]
[metrics.reporters.csv :as csv]
[metrics.reporters.jmx :as jmx]
[io.cyanite.engine :refer [map->Engine]]
[io.cyanite.engine.drift :refer [map->SystemClock map->AgentDrift]]
[io.cyanite.api :refer [map->Api]]
[unilog.config :refer [start-logging!]]
[spootnik.uncaught :refer [uncaught]]
Expand Down Expand Up @@ -55,10 +59,13 @@
(-> config
(dissoc :logging)
(build-components :input input/build-input)
(update :clock #(map->SystemClock %))
(update :drift #(component/using (map->AgentDrift %) [:clock]))
(update :engine #(component/using (map->Engine %) [:index
:store
:drift
:queues]))
(update :queues queue/queue-engine)
(update :queues queue/map->BlockingMemoryQueue)
(update :api #(component/using (map->Api {:options %}) [:index
:store
:queues
Expand All @@ -69,8 +76,12 @@
(defn -main
"Our main function, parses args and launches appropriate services"
[& args]
(let [[{:keys [path help quiet]} args banner] (get-cli args)]
(let [[{:keys [path help quiet]} args banner] (get-cli args)
cr (csv/reporter "/tmp/csv" {})
jr (jmx/reporter {})]

(csv/start cr 5)
(jmx/start jr)
(when help
(println banner)
(System/exit 0))
Expand Down
36 changes: 24 additions & 12 deletions src/io/cyanite/engine.clj
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,17 @@
[io.cyanite.index :as index]
[io.cyanite.store :as store]
[io.cyanite.utils :refer [nbhm assoc-if-absent! now!]]
[clojure.tools.logging :refer [info debug]]))
[io.cyanite.engine.drift :refer [drift! skewed-epoch!]]
[clojure.tools.logging :refer [info debug]]
[metrics.timers :refer [deftimer time!]]
[metrics.core :refer [new-registry]]
[metrics.counters :refer [inc! dec! defcounter]]))

(deftimer t-tuple)
(deftimer t-mkey)
(deftimer t-snap)
(deftimer t-assc)
(deftimer t-enq)

(defprotocol Acceptor
(accept! [this metric]))
Expand All @@ -16,38 +26,40 @@
(resolution [this oldest path]))

(defn ingest-at-resolution
[buckets queues resolution metric]
(let [k (b/tuple resolution (:path metric))
metric-key (or (get buckets k) (b/metric-key k))
snaps (b/add! metric-key metric)]
(assoc-if-absent! buckets k metric-key)
[drift buckets queues resolution metric]
(let [k (time! t-tuple (b/tuple resolution (:path metric)))
metric-key (time! t-mkey (or (get buckets k) (b/metric-key k)))
snaps (time! t-snap (b/add-then-snap! metric-key metric
(skewed-epoch! drift)))]
(time! t-assc
(assoc-if-absent! buckets k metric-key))
(doseq [snapshot snaps]
(q/add! queues :writeq (assoc snapshot :resolution resolution)))))
(time! t-enq (q/add! queues :writeq snapshot)))))

(defrecord Engine [rules planner index store queues]
(defrecord Engine [rules planner buckets index store queues drift]
component/Lifecycle
(start [this]
(let [buckets (nbhm)
planner (map rule/->rule rules)]
(info "starting engine")
(q/consume! queues :ingestq {}
(fn [metric]
(drift! drift (:time metric))
(let [plan (rule/->exec-plan planner metric)]
(doseq [resolution plan]
(ingest-at-resolution buckets queues
(ingest-at-resolution drift buckets queues
resolution metric)))))

(q/consume! queues :writeq {}
(fn [metric]
(debug "in write callback: " (pr-str metric))
(index/register! index metric)
(store/insert! store metric)))
(assoc this :planner planner)))
(assoc this :planner planner :bucket buckets)))
(stop [this]
(when queues
(q/stop! queues :ingestq)
(q/stop! queues :writeq))
this)
(assoc this :planner nil :buckets nil))
Acceptor
(accept! [this metric]
(q/add! queues :ingestq metric))
Expand Down
173 changes: 62 additions & 111 deletions src/io/cyanite/engine/buckets.clj
Original file line number Diff line number Diff line change
@@ -1,132 +1,83 @@
(ns io.cyanite.engine.buckets
(:require [io.cyanite.utils :refer [nbhm keyset remove!]]
[clojure.tools.logging :refer [debug]])
(:import java.util.concurrent.atomic.AtomicLong
java.util.concurrent.atomic.AtomicLongArray
java.util.concurrent.atomic.AtomicReferenceArray
io.cyanite.engine.rule.Resolution))

(deftype MetricTuple [^Resolution id ^String path]
java.lang.Object
(hashCode [this]
(.hashCode [id path]))
(equals [this other]
(and
(= (.id this) (.id other))
(= (.path this) (.path other)))))

(deftype TimeSlotTuple [^String path ^Long slot]
java.lang.Object
(hashCode [this]
(.hashCode (str path ":" slot)))
(equals [this other]
(and
(= (.path this) (.path other))
(= (.slot this) (.slot other)))))
(:import io.cyanite.engine.rule.Resolution))

(defrecord MetricTuple [id path])
(defrecord MetricSnapshot [path time mean max min sum resolution])

(defprotocol Mutable
(add! [this metric]))
(add-then-snap! [this metric now]))

(defprotocol Snapshotable
(snapshot! [this]))
(defn time-slot
[^Resolution resolution ^Long now]
(let [p (:precision resolution)]
(* p (quot now p))))

(defrecord DriftSlot [drift]
Mutable
(add! [this ts]
(let [now (quot (System/currentTimeMillis) 1000)
new-drift (- now ts)]
(when (pos? new-drift)
(locking this
(if (> new-drift (.get drift))
(.set drift new-drift))))))
clojure.lang.IDeref
(deref [this]
(.get drift)))
(defn guarded
"Wrap around a function f operating on two numbers. Guard
against calling f on nil values by yielding in order of
precedence: right, left and in the worst case: nil"
[f]
(fn [left right]
(if right
(if left (f left right) right)
left)))

(defn drift-slot
[]
(DriftSlot. (AtomicLong. 0)))
(defn augment-snap
[snapshot {:keys [metric slot]}]
(if-not metric
snapshot
(-> snapshot
(assoc :slot slot)
(update :count inc)
(update :sum + metric)
(update :max (guarded max) metric)
(update :min (guarded min) metric))))

(defrecord MetricTimeSlot [k nvalues values]
Mutable
(add! [this metric]
(locking this
(try
(.set values (.getAndIncrement nvalues) (:metric metric))
(catch Exception e
;; Silently drop overflows
(debug e "exception while incrementing timeslot")))))
Snapshotable
(snapshot! [this]
(locking this
(loop [i 0
sz (.get nvalues)
count 0
sum 0
minval nil
maxval nil]
(let [val (.get values i)]
(cond
(zero? sz) nil
(< i sz) (recur (inc i)
sz
(inc count)
(+ val sum)
(if (nil? minval) val (min minval val))
(if (nil? maxval) val (max maxval val)))
:else (MetricSnapshot. (.path k)
(.slot k)
(double (/ sum count))
maxval
minval
sum
nil)))))))
(defn process-snap
[k {:keys [id slot count min max sum]}]
(MetricSnapshot.
(:path k)
slot
(double (/ sum count))
max
min
sum
(:id k)))

(defn time-slot
[resolution now]
(let [p (:precision resolution)]
(* p (quot now p))))
(def init-snap
{:count 0
:min nil
:max nil
:sum 0})

(defn take-snap
[[_ vals] k]
(let [sz (count vals)
path (:path k)
res (:id k)]
(when (pos? (count vals))
(process-snap k (reduce augment-snap init-snap vals)))))

(defn insert-slot!
[time-slots k]
(let [slot (MetricTimeSlot.
k
(AtomicLong. 0)
(AtomicReferenceArray. (int 10e3)))]
(assoc! time-slots k slot)
slot))
(defn add-then-snap
[[_ vals] metric k floor]
(let [slot (time-slot (:id k) (:time metric))
vals (sort-by :time (conj vals (assoc metric :slot slot)))
[old new] (split-with #(< (:slot %) floor) vals)]
[(map take-snap (group-by :slot old) (repeat k)) new]))

(defrecord MetricKey [k time-slots drift]
(defrecord MetricKey [k data]
Mutable
(add! [this metric]
(add! drift (:time metric))
(let [slot (time-slot (.id k) (:time metric))
k (TimeSlotTuple. (:path metric) slot)
mts (or (get time-slots k)
(insert-slot! time-slots k))]
(add! mts metric)
(snapshot! this)))
Snapshotable
(snapshot! [this]
(let [now (quot (System/currentTimeMillis) 1000)
drift-ts (- now @drift)
low-slot (time-slot (.id k) drift-ts)
slots (keyset time-slots)
old-slots (filter #(< (.slot %) low-slot) slots)]
(try
(vec
(for [slot-key old-slots
:let [slot (remove! time-slots slot-key)]
:when slot]
(snapshot! slot)))
(catch Exception e
(debug e "exception while snapshotting"))))))
(add-then-snap! [this metric now]
(let [slot (time-slot (:id k) (:time metric))
floor (time-slot (:id k) now)
[snapshots _] (swap! data add-then-snap metric k floor)]
snapshots)))

(defn metric-key
[^MetricTuple k]
(MetricKey. k (nbhm) (drift-slot)))
(MetricKey. k (atom nil)))

(defn tuple
[^Resolution id ^String path]
Expand Down
44 changes: 44 additions & 0 deletions src/io/cyanite/engine/drift.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
(ns io.cyanite.engine.drift
"Drift handling component. Relies on a clock implementation
which can yield an epoch with a second based resolution."
(:require [com.stuartsierra.component :as component]))

(defprotocol Drift
(drift! [this ts] "Take new drift into account for this timestamp")
(skewed-epoch! [this] "Yield an approximate epoch, accounting for drift"))

(defprotocol Clock
(epoch! [this] "Give us an epoch"))

;; System Clock is a basic wall clock
;; No configuration possible here.
(defrecord SystemClock []
component/Lifecycle
(start [this] this)
(stop [this] this)
Clock
(epoch! [this]
(quot (System/currentTimeMillis) 1000)))

;; Hold the state of our drift in an agent
;; This way we ensure that we have fast
;; execution of drift computation from
;; the caller site. this should eventually
;; rely on send-via to ensure we have our
;; own pool of threads to handle the max calls
(defrecord AgentDrift [slot clock]
component/Lifecycle
(start [this]
(assoc this :slot (agent 0)))
(stop [this]
(assoc this :slot nil))
Drift
(drift! [this ts]
(let [drift (- ts (epoch! clock))]
(when (pos? drift)
(send-off slot max drift))))
(skewed-epoch! [this]
(- (epoch! clock) @slot))
clojure.lang.IDeref
(deref [this]
@slot))
Loading

0 comments on commit 60b5df7

Please sign in to comment.