Skip to content

Commit

Permalink
Merge pull request #4 from nathanmarz/master
Browse files Browse the repository at this point in the history
merge changes from nathanmarz/storm
  • Loading branch information
anfeng committed Mar 11, 2013
2 parents 95761f6 + 9956175 commit 9c91db7
Show file tree
Hide file tree
Showing 35 changed files with 1,482 additions and 63 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
* Have storm fall back to installed storm.yaml (thanks revans2)
* Improve error message when Storm detects bundled storm.yaml to show the URL's for offending resources (thanks revans2)
* Nimbus throws NotAliveException instead of FileNotFoundException from various query methods when topology is no longer alive (thanks revans2)
* Escape HTML and Javascript appropriately in Storm UI (thanks d2r)
* Storm's Zookeeper client now uses bounded exponential backoff strategy on failures
* Append component name to thread name of running executors so that logs are easier to read
* Bug fix: Supervisor provides full path to workers to logging config rather than relative path (thanks revans2)
* Bug fix: Call ReducerAggregator#init properly when used within persistentAggregate (thanks lorcan)
* Bug fix: Set component-specific configs correctly for Trident spouts
Expand Down
2 changes: 2 additions & 0 deletions README.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ You must not remove this notice, or any other, from this software.
* Robert Evans ([@revans2](https://github.com/revans2))
* Andy Feng ([@anfeng](https://github.com/anfeng))
* Lorcan Coyle ([@lorcan](https://github.com/lorcan))
* Derek Dagit ([@d2r](https://github.com/d2r))
* Andrew Olson ([@noslowerdna](https://github.com/noslowerdna))

## Acknowledgements

Expand Down
2 changes: 2 additions & 0 deletions conf/defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ storm.zookeeper.session.timeout: 20000
storm.zookeeper.connection.timeout: 15000
storm.zookeeper.retry.times: 5
storm.zookeeper.retry.interval: 1000
storm.zookeeper.retry.intervalceiling.millis: 30000
storm.cluster.mode: "distributed" # can be distributed or local
storm.local.mode.zmq: false
storm.thrift.transport: "backtype.storm.security.auth.SimpleTransportPlugin"

### nimbus.* configs are for the master
nimbus.host: "localhost"
Expand Down
21 changes: 21 additions & 0 deletions conf/jaas_digest.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/* This is example of JAAS Login configuration for digest authentication
*/

/*
StormServer section should contain a list of authorized users and their passwords.
*/
StormServer {
org.apache.zookeeper.server.auth.DigestLoginModule required
user_super="adminsecret"
user_bob="bobsecret";
user_john="johnsecret";
};

/*
StormClient section contains one user name and his/her password.
*/
StormClient {
org.apache.zookeeper.server.auth.DigestLoginModule required
username="bob"
password="bobsecret";
};
22 changes: 22 additions & 0 deletions logback/cluster.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,23 @@
<maxFileSize>100MB</maxFileSize>
</triggeringPolicy>

<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss} %c{1} [%p] %m%n</pattern>
</encoder>
</appender>

<appender name="ACCESS" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${storm.home}/logs/access.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
<fileNamePattern>${storm.home}/logs/${logfile.name}.%i</fileNamePattern>
<minIndex>1</minIndex>
<maxIndex>9</maxIndex>
</rollingPolicy>

<triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
<maxFileSize>100MB</maxFileSize>
</triggeringPolicy>

<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss} %c{1} [%p] %m%n</pattern>
</encoder>
Expand All @@ -21,4 +38,9 @@
<root level="INFO">
<appender-ref ref="A1"/>
</root>

<logger name="backtype.storm.security.auth.authorizer" additivity="false">
<level value="INFO" />
<appender-ref ref="ACCESS" />
</logger>
</configuration>
6 changes: 4 additions & 2 deletions src/clj/backtype/storm/daemon/executor.clj
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,8 @@
))
0))
:kill-fn (:report-error-and-die executor-data)
:factory? true)]))
:factory? true
:thread-name component-id)]))

(defn- tuple-time-delta! [^TupleImpl tuple]
(let [ms (.getProcessSampleStartTime tuple)]
Expand Down Expand Up @@ -715,7 +716,8 @@
(disruptor/consume-batch-when-available receive-queue event-handler)
0)))
:kill-fn (:report-error-and-die executor-data)
:factory? true)]))
:factory? true
:thread-name component-id)]))

(defmethod close-component :spout [executor-data spout]
(.close spout))
Expand Down
12 changes: 9 additions & 3 deletions src/clj/backtype/storm/daemon/worker.clj
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,18 @@
(current-time-secs)
(:storm-id worker)
(:executors worker)
(:port worker))]
(:port worker))
state (worker-state conf (:worker-id worker))]
(log-debug "Doing heartbeat " (pr-str hb))
;; do the local-file-system heartbeat.
(.put (worker-state conf (:worker-id worker))
(.put state
LS-WORKER-HEARTBEAT
hb)
hb
false
)
(.cleanup state 60) ; this is just in case supervisor is down so that disk doesn't fill up.
; it shouldn't take supervisor 120 seconds between listing dir and reading it

))

(defn worker-outbound-tasks
Expand Down
4 changes: 3 additions & 1 deletion src/clj/backtype/storm/disruptor.clj
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,14 @@
(defn halt-with-interrupt! [^DisruptorQueue queue]
(.haltWithInterrupt queue))

(defnk consume-loop* [^DisruptorQueue queue handler :kill-fn (fn [error] (halt-process! 1 "Async loop died!"))]
(defnk consume-loop* [^DisruptorQueue queue handler :kill-fn (fn [error] (halt-process! 1 "Async loop died!"))
:thread-name nil]
(let [ret (async-loop
(fn []
(consume-batch-when-available queue handler)
0 )
:kill-fn kill-fn
:thread-name thread-name
)]
(consumer-started! queue)
ret
Expand Down
20 changes: 12 additions & 8 deletions src/clj/backtype/storm/ui/core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
[compojure.handler :as handler]
[ring.util.response :as resp]
[backtype.storm [thrift :as thrift]])
(:import [org.apache.commons.lang StringEscapeUtils])
(:gen-class))

(def ^:dynamic *STORM-CONF* (read-storm-config))
Expand Down Expand Up @@ -85,7 +86,7 @@
(defn topology-link
([id] (topology-link id id))
([id content]
(link-to (url-format "/topology/%s" id) content)))
(link-to (url-format "/topology/%s" id) (escape-html content))))

(defn main-topology-summary-table [summs]
;; make the id clickable
Expand All @@ -94,7 +95,7 @@
["Name" "Id" "Status" "Uptime" "Num workers" "Num executors" "Num tasks"]
(for [^TopologySummary t summs]
[(topology-link (.get_id t) (.get_name t))
(.get_id t)
(escape-html (.get_id t))
(.get_status t)
(pretty-uptime-sec (.get_uptime_secs t))
(.get_num_workers t)
Expand Down Expand Up @@ -301,8 +302,8 @@
(let [executors (.get_executors summ)
workers (set (for [^ExecutorSummary e executors] [(.get_host e) (.get_port e)]))]
(table ["Name" "Id" "Status" "Uptime" "Num workers" "Num executors" "Num tasks"]
[[(.get_name summ)
(.get_id summ)
[[(escape-html (.get_name summ))
(escape-html (.get_id summ))
(.get_status summ)
(pretty-uptime-sec (.get_uptime_secs summ))
(count workers)
Expand Down Expand Up @@ -376,7 +377,7 @@
)))

(defn component-link [storm-id id]
(link-to (url-format "/topology/%s/component/%s" storm-id id) id))
(link-to (url-format "/topology/%s/component/%s" storm-id id) (escape-html id)))

(defn render-capacity [capacity]
(let [capacity (nil-to-zero capacity)]
Expand Down Expand Up @@ -463,7 +464,10 @@
[:input {:type "button"
:value action
(if enabled :enabled :disabled) ""
:onclick (str "confirmAction('" id "', '" name "', '" command "', " is-wait ", " default-wait ")")}])
:onclick (str "confirmAction('"
(StringEscapeUtils/escapeJavaScript id) "', '"
(StringEscapeUtils/escapeJavaScript name) "', '"
command "', " is-wait ", " default-wait ")")}])

(defn topology-page [id window include-sys?]
(with-nimbus nimbus
Expand Down Expand Up @@ -609,7 +613,7 @@
(sorted-table
["Component" "Stream" "Execute latency (ms)" "Executed" "Process latency (ms)" "Acked" "Failed"]
(for [[^GlobalStreamId s stats] stream-summary]
[(.get_componentId s)
[(escape-html (.get_componentId s))
(.get_streamId s)
(float-str (:execute-latencies stats))
(nil-to-zero (:executed stats))
Expand Down Expand Up @@ -712,7 +716,7 @@
(concat
[[:h2 "Component summary"]
(table ["Id" "Topology" "Executors" "Tasks"]
[[component
[[(escape-html component)
(topology-link (.get_id summ) (.get_name summ))
(count summs)
(sum-tasks summs)
Expand Down
12 changes: 11 additions & 1 deletion src/clj/backtype/storm/util.clj
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,13 @@
(some (partial instance? klass))
boolean))

(defmacro thrown-cause? [klass & body]
`(try
~@body
false
(catch Throwable t#
(exception-cause? ~klass t#))))

(defmacro forcat [[args aseq] & body]
`(mapcat (fn [~args]
~@body)
Expand Down Expand Up @@ -368,7 +375,8 @@
:kill-fn (fn [error] (halt-process! 1 "Async loop died!"))
:priority Thread/NORM_PRIORITY
:factory? false
:start true]
:start true
:thread-name nil]
(let [thread (Thread.
(fn []
(try-cause
Expand All @@ -389,6 +397,8 @@
))]
(.setDaemon thread daemon)
(.setPriority thread priority)
(when thread-name
(.setName thread (str (.getName thread) "-" thread-name)))
(when start
(.start thread))
;; should return object that supports stop, interrupt, join, and waiting?
Expand Down
15 changes: 15 additions & 0 deletions src/jvm/backtype/storm/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ public class Config extends HashMap<String, Object> {
*/
public static String STORM_LOCAL_HOSTNAME = "storm.local.hostname";

/**
* The transport plug-in for Thrift client/server communication
*/
public static String STORM_THRIFT_TRANSPORT_PLUGIN = "storm.thrift.transport";

/**
* The serializer class for ListDelegate (tuple payload).
* The default serializer will be ListDelegateSerializer
Expand Down Expand Up @@ -106,6 +111,11 @@ public class Config extends HashMap<String, Object> {
*/
public static String STORM_ZOOKEEPER_RETRY_INTERVAL="storm.zookeeper.retry.interval";

/**
* The ceiling of the interval between retries of a Zookeeper operation.
*/
public static String STORM_ZOOKEEPER_RETRY_INTERVAL_CEILING="storm.zookeeper.retry.intervalceiling.millis";

/**
* The Zookeeper authentication scheme to use, e.g. "digest". Defaults to no authentication.
*/
Expand Down Expand Up @@ -206,6 +216,11 @@ public class Config extends HashMap<String, Object> {
*/
public static String NIMBUS_TOPOLOGY_VALIDATOR = "nimbus.topology.validator";

/**
* Class name for authorization plugin for Nimbus
*/
public static String NIMBUS_AUTHORIZER = "nimbus.authorizer";

/**
* Storm UI binds to this port.
*/
Expand Down
81 changes: 81 additions & 0 deletions src/jvm/backtype/storm/security/auth/AuthUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package backtype.storm.security.auth;

import backtype.storm.Config;
import javax.security.auth.login.Configuration;
import javax.security.auth.login.AppConfigurationEntry;
import java.security.NoSuchAlgorithmException;
import java.security.URIParameter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.util.Map;

public class AuthUtils {
private static final Logger LOG = LoggerFactory.getLogger(AuthUtils.class);
public static final String LOGIN_CONTEXT_SERVER = "StormServer";
public static final String LOGIN_CONTEXT_CLIENT = "StormClient";
public static final String SERVICE = "storm_thrift_server";

/**
* Construct a JAAS configuration object per storm configuration file
* @param storm_conf Storm configuration
* @return JAAS configuration object
*/
public static Configuration GetConfiguration(Map storm_conf) {
Configuration login_conf = null;

//find login file configuration from Storm configuration
String loginConfigurationFile = (String)storm_conf.get("java.security.auth.login.config");
if ((loginConfigurationFile != null) && (loginConfigurationFile.length()>0)) {
try {
URI config_uri = new File(loginConfigurationFile).toURI();
login_conf = Configuration.getInstance("JavaLoginConfig", new URIParameter(config_uri));
} catch (NoSuchAlgorithmException ex1) {
if (ex1.getCause() instanceof FileNotFoundException)
throw new RuntimeException("configuration file "+loginConfigurationFile+" could not be found");
else throw new RuntimeException(ex1);
} catch (Exception ex2) {
throw new RuntimeException(ex2);
}
}

return login_conf;
}

/**
* Construct a transport plugin per storm configuration
* @param conf storm configuration
* @return
*/
public static ITransportPlugin GetTransportPlugin(Map storm_conf, Configuration login_conf) {
ITransportPlugin transportPlugin = null;
try {
String transport_plugin_klassName = (String) storm_conf.get(Config.STORM_THRIFT_TRANSPORT_PLUGIN);
Class klass = Class.forName(transport_plugin_klassName);
transportPlugin = (ITransportPlugin)klass.newInstance();
transportPlugin.prepare(storm_conf, login_conf);
} catch(Exception e) {
throw new RuntimeException(e);
}
return transportPlugin;
}

public static String get(Configuration configuration, String section, String key) throws IOException {
AppConfigurationEntry configurationEntries[] = configuration.getAppConfigurationEntry(section);
if (configurationEntries == null) {
String errorMessage = "Could not find a '"+ section + "' entry in this configuration.";
throw new IOException(errorMessage);
}

for(AppConfigurationEntry entry: configurationEntries) {
Object val = entry.getOptions().get(key);
if (val != null)
return (String)val;
}
return null;
}
}

Loading

0 comments on commit 9c91db7

Please sign in to comment.