Skip to content

Commit

Permalink
merge performance related stuff
Browse files Browse the repository at this point in the history
  • Loading branch information
addisonj committed Aug 21, 2014
2 parents b2bb31d + 6e9dcc9 commit d7a8381
Show file tree
Hide file tree
Showing 15 changed files with 567 additions and 95 deletions.
1 change: 1 addition & 0 deletions doc/cyanite.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
carbon:
host: "127.0.0.1"
port: 2003
readtimeout: 30
rollups:
- period: 60480
rollup: 10
Expand Down
100 changes: 100 additions & 0 deletions perf/perfTest.jmx
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
<?xml version="1.0" encoding="UTF-8"?>
<jmeterTestPlan version="1.2" properties="2.5" jmeter="2.10 r1533061">
<hashTree>
<TestPlan guiclass="TestPlanGui" testclass="TestPlan" testname="Test Plan" enabled="true">
<stringProp name="TestPlan.comments"></stringProp>
<boolProp name="TestPlan.functional_mode">false</boolProp>
<boolProp name="TestPlan.serialize_threadgroups">false</boolProp>
<elementProp name="TestPlan.user_defined_variables" elementType="Arguments" guiclass="ArgumentsPanel" testclass="Arguments" testname="User Defined Variables" enabled="true">
<collectionProp name="Arguments.arguments"/>
</elementProp>
<stringProp name="TestPlan.user_define_classpath"></stringProp>
</TestPlan>
<hashTree>
<ThreadGroup guiclass="ThreadGroupGui" testclass="ThreadGroup" testname="Thread Group" enabled="true">
<stringProp name="ThreadGroup.on_sample_error">continue</stringProp>
<elementProp name="ThreadGroup.main_controller" elementType="LoopController" guiclass="LoopControlPanel" testclass="LoopController" testname="Loop Controller" enabled="true">
<boolProp name="LoopController.continue_forever">false</boolProp>
<stringProp name="LoopController.loops">1</stringProp>
</elementProp>
<stringProp name="ThreadGroup.num_threads">5</stringProp>
<stringProp name="ThreadGroup.ramp_time">1</stringProp>
<longProp name="ThreadGroup.start_time">1400616222000</longProp>
<longProp name="ThreadGroup.end_time">1400616222000</longProp>
<boolProp name="ThreadGroup.scheduler">false</boolProp>
<stringProp name="ThreadGroup.duration"></stringProp>
<stringProp name="ThreadGroup.delay"></stringProp>
</ThreadGroup>
<hashTree>
<RandomVariableConfig guiclass="TestBeanGUI" testclass="RandomVariableConfig" testname="Random Variable" enabled="true">
<stringProp name="maximumValue">1</stringProp>
<stringProp name="minimumValue">1</stringProp>
<stringProp name="outputFormat"></stringProp>
<boolProp name="perThread">true</boolProp>
<stringProp name="randomSeed"></stringProp>
<stringProp name="variableName">randy</stringProp>
</RandomVariableConfig>
<hashTree/>
<TCPSampler guiclass="TCPSamplerGui" testclass="TCPSampler" testname="TCP Sampler" enabled="true">
<stringProp name="TCPSampler.classname">TCPClientImpl</stringProp>
<stringProp name="TCPSampler.server">localhost</stringProp>
<boolProp name="TCPSampler.reUseConnection">true</boolProp>
<stringProp name="TCPSampler.port">2003</stringProp>
<boolProp name="TCPSampler.nodelay">false</boolProp>
<stringProp name="TCPSampler.timeout">1</stringProp>
<stringProp name="TCPSampler.ctimeout">1000</stringProp>
<stringProp name="TCPSampler.request">foo.bar.baz.${randy} 1 1400616260
foo.bar.bazz.${randy} 1 1400616260
foo.bar.bazz2.${randy} 1 1400616260
foo.bar.bazz3.${randy} 1 1400616260
foo.bar.bazz4.${randy} 1 1400616260
foo.bar.bazz5.${randy} 1 1400616260
foo1.bar.bazz.${randy} 1 1400616260
foo2.bar.bazz2.${randy} 1 1400616260
foo3.bar.bazz3.${randy} 1 1400616260
foo4.bar.bazz4.${randy} 1 1400616260
foo5.bar.bazz5.${randy} 1 1400616260
</stringProp>
<boolProp name="TCPSampler.closeConnection">false</boolProp>
<stringProp name="TCPSampler.EolByte">0A</stringProp>
<stringProp name="ConfigTestElement.username"></stringProp>
<stringProp name="ConfigTestElement.password"></stringProp>
</TCPSampler>
<hashTree>
<ResultCollector guiclass="SummaryReport" testclass="ResultCollector" testname="Summary Report" enabled="true">
<boolProp name="ResultCollector.error_logging">false</boolProp>
<objProp>
<name>saveConfig</name>
<value class="SampleSaveConfiguration">
<time>true</time>
<latency>true</latency>
<timestamp>true</timestamp>
<success>true</success>
<label>true</label>
<code>true</code>
<message>true</message>
<threadName>true</threadName>
<dataType>true</dataType>
<encoding>false</encoding>
<assertions>true</assertions>
<subresults>true</subresults>
<responseData>false</responseData>
<samplerData>false</samplerData>
<xml>false</xml>
<fieldNames>false</fieldNames>
<responseHeaders>false</responseHeaders>
<requestHeaders>false</requestHeaders>
<responseDataOnError>false</responseDataOnError>
<saveAssertionResultsFailureMessage>false</saveAssertionResultsFailureMessage>
<assertionsResultsToSave>0</assertionsResultsToSave>
<bytes>true</bytes>
</value>
</objProp>
<stringProp name="filename"></stringProp>
</ResultCollector>
<hashTree/>
</hashTree>
</hashTree>
</hashTree>
</hashTree>
</jmeterTestPlan>
30 changes: 30 additions & 0 deletions perf/soakTest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#!/usr/bin/env python

import sys, os, time
from socket import socket
from random import random

try:
host = sys.argv[1]
port = int(sys.argv[2])
mpm = int(sys.argv[3])
except:
print 'Usage: %s host port metrics-per-minute' % os.path.basename(sys.argv[0])
sys.exit(1)

s = socket()
s.connect( (host,port) )
now = int( time.time() )
now -= now % 60

while True:
start = time.time()
count = 0
for i in xrange(0, mpm):
metric = 'TEST.%d' % i
value = random()
s.sendall('%s %s %s\n' % (metric, value, now))
count += 1

print 'sent %d metrics in %.3f seconds' % (count, time.time() - start)
now += 60
3 changes: 3 additions & 0 deletions project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
[org.xerial.snappy/snappy-java "1.0.5"]
[org.slf4j/slf4j-log4j12 "1.6.4"]
[log4j/apache-log4j-extras "1.0"]
[io.netty/netty-all "4.0.19.Final"]
[org.clojure/core.async "0.1.278.0-76b25b-alpha"]
[http-kit "2.1.16"]
[log4j/log4j "1.2.16"
:exclusions [javax.mail/mail
javax.jms/jms
Expand Down
2 changes: 2 additions & 0 deletions src/org/spootnik/cyanite.clj
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
[org.spootnik.cyanite.config :as config]
[clojure.tools.cli :refer [cli]]))

(set! *warn-on-reflection* true)

(defn get-cli
"Call cli parsing with our known options"
[args]
Expand Down
68 changes: 43 additions & 25 deletions src/org/spootnik/cyanite/carbon.clj
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,14 @@
[clojure.string :as s]
[org.spootnik.cyanite.store :as store]
[org.spootnik.cyanite.path :as path]
[org.spootnik.cyanite.tcp :as tc]
[org.spootnik.cyanite.util :refer [partition-or-time]]
[clojure.tools.logging :refer [info debug]]
[gloss.core :refer [string]]
[lamina.core :refer [receive-all map* siphon]]))
[lamina.core :refer :all]
[clojure.core.async :as async :refer [<! >! >!! go chan]]))

(set! *warn-on-reflection* true)

(defn parse-num
"parse a number into the given value, return the
Expand All @@ -20,33 +25,46 @@
(defn formatter
"Split each line on whitespace, discard nan metric lines
and format correct lines for each resolution"
[index rollups input]
(let [[path metric time] (s/split (.trim input) #" ")
metric (parse-num #(Double/parseDouble %) "nan" metric)
time (parse-num #(Long/parseLong %) "nan" time)]
(when (and (not= metric "nan") (not= time "nan"))
;; hardcode empty tenant for now
(when index (path/register index "" path))
(for [{:keys [rollup period rollup-to]} rollups]
{:path path
:rollup rollup
:period period
:ttl (* period rollup)
:time (rollup-to time)
:metric metric}))))
[rollups ^String input]
(try
(let [[path metric time] (s/split (.trim input) #" ")
timel (parse-num #(Long/parseLong %) "nan" time)
metricd (parse-num #(Double/parseDouble %) "nan" metric)]
(when (and (not= "nan" metricd) (not= "nan" timel))
(for [{:keys [rollup period ttl rollup-to]} rollups]
{:path path
:rollup rollup
:period period
:ttl (or ttl (* rollup period))
:time (rollup-to timel)
:metric metricd})))
(catch Exception e
(info "Exception for metric [" input "] : " e))))

(defn handler
(defn format-processor
"Send each metric over to the cassandra store"
[index rollups insertch]
(fn [ch info]
(siphon (map* (partial formatter index rollups) ch) insertch)))
[chan indexch rollups insertch]
(go
(let [input (partition-or-time 1000 chan 500 5)]
(while true
(let [metrics (<! input)]
(try
(doseq [metric metrics]
(let [formed (remove nil? (formatter rollups metric))]
(doseq [f formed]
(>! insertch f))
(doseq [p (distinct (map :path formed))]
(>! indexch p))))
(catch Exception e
(info "Exception for metric [" metrics "] : " e))))))))

(defn start
"Start a tcp carbon listener"
[{:keys [store carbon index]}]
(let [insertch (store/channel-for store)
handler (handler index (:rollups carbon) insertch)]
(info "starting carbon handler")
(tcp/start-tcp-server
handler
(merge carbon {:frame (string :utf-8 :delimiters ["\n"])}))))
(let [indexch (path/channel-for index)
insertch (store/channel-for store)
chan (chan 100000)
handler (format-processor chan indexch (:rollups carbon) insertch)]
(info "starting carbon handler: " carbon)
(tc/start-tcp-server
(merge carbon {:response-channel chan}))))
13 changes: 8 additions & 5 deletions src/org/spootnik/cyanite/config.clj
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
default-carbon
{:enabled true
:host "127.0.0.1"
:port 2003})
:port 2003
:readtimeout 30})

(def ^{:doc "let the http api listen on 8080 by default"}
default-http
Expand All @@ -50,13 +51,15 @@
(throw (ex-info (str "unknown rollup unit: " unit) {})))))

(defn convert-shorthand-rollup
"Converts an individual rollup to a {:rollup :period} pair"
"Converts an individual rollup to a {:rollup :period :ttl} tri"
[rollup]
(if (string? rollup)
(let [[rollup-string retention-string] (split rollup #":" 2)
rollup-secs (to-seconds rollup-string)
retention-secs (to-seconds retention-string)]
{:rollup rollup-secs :period (/ retention-secs rollup-secs)})
{:rollup rollup-secs
:period (/ retention-secs rollup-secs)
:ttl (* rollup-secs (/ retention-secs rollup-secs))})
rollup))

(defn convert-shorthand-rollups
Expand All @@ -80,8 +83,8 @@
(let [n (namespace (symbol s))]
(require (symbol n))
(find-var (symbol s)))
(catch Exception _
nil)))
(catch Exception e
(prn "Exception: " e))))

(defn instantiate
"Find a symbol pointing to a function of a single argument and
Expand Down
51 changes: 51 additions & 0 deletions src/org/spootnik/cyanite/es_client.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
(ns org.spootnik.cyanite.es-client
"Hacks of elastich to use async http-kit"
(:require [clojurewerkz.elastisch.rest.document :as esrd]
[clojurewerkz.elastisch.rest.bulk :as esrb]
[clojurewerkz.elastisch.rest :as rest]
[clojurewerkz.elastisch.arguments :as ar]
[clojure.string :as str]
[org.httpkit.client :as http]
[cheshire.core :as json]
[clojure.tools.logging :refer [error info debug]])
(:import clojurewerkz.elastisch.rest.Connection))

(defn multi-get
[^Connection conn index mapping-type query func]
(http/post
(rest/index-mget-url conn index mapping-type)
{:body (json/encode {:docs query})}
#(if (= 200 (:status %))
(let [bod (json/decode (:body %) true)]
(func (filter :found (:docs bod))))
(error "ES responded with non-200: " %))))

(comment "Note: i've ditched the optional args to ES, refer to orignal elastich code for how they should return")

(defn bulk-with-url
[url operations func]
(let [bulk-json (map json/encode operations)
bulk-json (-> bulk-json
(interleave (repeat "\n"))
(str/join))]
(http/post url
{:body bulk-json}
#(let [status (:status %)]
(when (not= 200 status)
(func %))))))

(defn remove-ids-from-docs
[op-or-doc]
(if (:tenant op-or-doc)
(dissoc op-or-doc :_id)
op-or-doc))

(defn multi-update
[^Connection conn index mapping-type docs func]
(bulk-with-url (rest/bulk-url conn index mapping-type)
(map
remove-ids-from-docs
(esrb/bulk-index (map #(assoc % :_id (:path %)
:_index index
:_type mapping-type) docs)))
func))
Loading

0 comments on commit d7a8381

Please sign in to comment.