diff --git a/project.clj b/project.clj index d3ca7aee0..478619a53 100644 --- a/project.clj +++ b/project.clj @@ -34,7 +34,8 @@ :test-paths ["test/clj"] :resource-paths ["conf"] - :profiles {:dev {:resource-paths ["src/dev"]} + :profiles {:dev {:resource-paths ["src/dev"] + :dependencies [[org.mockito/mockito-all "1.9.5"]]} :release {} :lib {} } diff --git a/src/jvm/backtype/storm/serialization/SerializationFactory.java b/src/jvm/backtype/storm/serialization/SerializationFactory.java index 9eb69a883..36e40466c 100644 --- a/src/jvm/backtype/storm/serialization/SerializationFactory.java +++ b/src/jvm/backtype/storm/serialization/SerializationFactory.java @@ -28,10 +28,10 @@ public class SerializationFactory { public static final Logger LOG = LoggerFactory.getLogger(SerializationFactory.class); - + public static Kryo getKryo(Map conf) { IKryoFactory kryoFactory = (IKryoFactory) Utils.newInstance((String) conf.get(Config.TOPOLOGY_KRYO_FACTORY)); - Kryo k = kryoFactory.getKryo(conf); + Kryo k = kryoFactory.getKryo(conf); k.register(byte[].class); /* tuple payload serializer is specified via configuration */ @@ -41,9 +41,8 @@ public static Kryo getKryo(Map conf) { Serializer serializer = resolveSerializerInstance(k, ListDelegate.class, serializerClass, conf); k.register(ListDelegate.class, serializer); } catch (ClassNotFoundException ex) { - LOG.error("Could not load class in class path: " + payloadSerializerName.length(), ex); throw new RuntimeException(ex); - } + } k.register(ArrayList.class, new ArrayListSerializer()); k.register(HashMap.class, new HashMapSerializer()); @@ -59,11 +58,11 @@ public static Kryo getKryo(Map conf) { } catch(Exception e) { throw new RuntimeException(e); } - + Map registrations = normalizeKryoRegister(conf); - kryoFactory.preRegister(k, conf); - + kryoFactory.preRegister(k, conf); + boolean skipMissing = (Boolean) conf.get(Config.TOPOLOGY_SKIP_MISSING_KRYO_REGISTRATIONS); for(String klassName: registrations.keySet()) { String serializerClassName = registrations.get(klassName); @@ -86,7 +85,7 @@ public static Kryo getKryo(Map conf) { } } - kryoFactory.postRegister(k, conf); + kryoFactory.postRegister(k, conf); if (conf.get(Config.TOPOLOGY_KRYO_DECORATORS) != null) { for(String klassName : (List)conf.get(Config.TOPOLOGY_KRYO_DECORATORS)) { @@ -104,16 +103,16 @@ public static Kryo getKryo(Map conf) { throw new RuntimeException(e); } catch(IllegalAccessException e) { throw new RuntimeException(e); - } + } } } - kryoFactory.postDecorate(k, conf); - - return k; + kryoFactory.postDecorate(k, conf); + + return k; } - - public static class IdDictionary { + + public static class IdDictionary { Map> streamNametoId = new HashMap>(); Map> streamIdToName = new HashMap>(); @@ -121,7 +120,7 @@ public IdDictionary(StormTopology topology) { List componentNames = new ArrayList(topology.get_spouts().keySet()); componentNames.addAll(topology.get_bolts().keySet()); componentNames.addAll(topology.get_state_spouts().keySet()); - + for(String name: componentNames) { ComponentCommon common = Utils.getComponentCommon(topology, name); List streams = new ArrayList(common.get_streams().keySet()); @@ -129,11 +128,11 @@ public IdDictionary(StormTopology topology) { streamIdToName.put(name, Utils.reverseMap(streamNametoId.get(name))); } } - + public int getStreamId(String component, String stream) { return streamNametoId.get(component).get(stream); } - + public String getStreamName(String component, int stream) { return streamIdToName.get(component).get(stream); } @@ -149,7 +148,7 @@ private static Map idify(List names) { return ret; } } - + private static Serializer resolveSerializerInstance(Kryo k, Class superClass, Class serializerClass, Map conf) { try { try { @@ -184,7 +183,7 @@ private static Serializer resolveSerializerInstance(Kryo k, Class superClass, Cl + superClass.getName(), ex); } } - + private static Map normalizeKryoRegister(Map conf) { // TODO: de-duplicate this logic with the code in nimbus Object res = conf.get(Config.TOPOLOGY_KRYO_REGISTER); diff --git a/test/clj/backtype/storm/security/auth/AuthUtils_test.clj b/test/clj/backtype/storm/security/auth/AuthUtils_test.clj new file mode 100644 index 000000000..655263664 --- /dev/null +++ b/test/clj/backtype/storm/security/auth/AuthUtils_test.clj @@ -0,0 +1,50 @@ +(ns backtype.storm.security.auth.AuthUtils-test + (:import [backtype.storm.security.auth AuthUtils]) + (:import [java.io IOException]) + (:import [javax.security.auth.login AppConfigurationEntry Configuration]) + (:import [org.mockito Mockito]) + (:use [clojure test]) +) + +(deftest test-throws-on-missing-section + (is (thrown? IOException + (AuthUtils/get (Mockito/mock Configuration) "bogus-section" ""))) +) + +(defn- mk-mock-app-config-entry [] + (let [toRet (Mockito/mock AppConfigurationEntry)] + (. (Mockito/when (.getOptions toRet)) thenReturn (hash-map)) + toRet + ) +) + +(deftest test-returns-null-if-no-such-section + (let [entry (mk-mock-app-config-entry) + entries (into-array (.getClass entry) [entry]) + section "bogus-section" + conf (Mockito/mock Configuration)] + (. (Mockito/when (. conf getAppConfigurationEntry section )) + thenReturn entries) + (is (nil? (AuthUtils/get conf section "nonexistent-key"))) + ) +) + +(deftest test-returns-first-value-for-valid-key + (let [k "the-key" + expected "good-value" + empty-entry (mk-mock-app-config-entry) + bad-entry (Mockito/mock AppConfigurationEntry) + good-entry (Mockito/mock AppConfigurationEntry) + conf (Mockito/mock Configuration)] + (. (Mockito/when (.getOptions bad-entry)) thenReturn {k "bad-value"}) + (. (Mockito/when (.getOptions good-entry)) thenReturn {k expected}) + (let [entries (into-array (.getClass empty-entry) + [empty-entry good-entry bad-entry]) + section "bogus-section"] + (. (Mockito/when (. conf getAppConfigurationEntry section)) + thenReturn entries) + (is (not (nil? (AuthUtils/get conf section k)))) + (is (= (AuthUtils/get conf section k) expected)) + ) + ) +) diff --git a/test/clj/backtype/storm/security/auth/ReqContext_test.clj b/test/clj/backtype/storm/security/auth/ReqContext_test.clj new file mode 100644 index 000000000..136c9b98c --- /dev/null +++ b/test/clj/backtype/storm/security/auth/ReqContext_test.clj @@ -0,0 +1,57 @@ +(ns backtype.storm.security.auth.ReqContext-test + (:import [backtype.storm.security.auth ReqContext]) + (:import [java.net InetAddress]) + (:import [java.security AccessControlContext Principal]) + (:import [javax.security.auth Subject]) + (:use [clojure test]) +) + +(def test-subject + (let [rc (ReqContext/context) + expected (Subject.)] + (is (not (.isReadOnly expected))) + (.setSubject rc expected) + (is (= (.subject rc) expected)) + + ; Change the Subject by setting read-only. + (.setReadOnly expected) + (.setSubject rc expected) + (is (= (.subject rc) expected)) + ) +) + +(deftest test-remote-address + (let [rc (ReqContext/context) + expected (InetAddress/getByAddress (.getBytes "ABCD"))] + (.setRemoteAddress rc expected) + (is (= (.remoteAddress rc) expected)) + ) +) + +(deftest test-principal-returns-null-when-no-subject + (let [rc (ReqContext/context)] + (.setSubject rc (Subject.)) + (is (nil? (.principal rc))) + ) +) + +(def principal-name "Test Principal") + +(defn TestPrincipal [] + (reify Principal + (^String getName [this] + principal-name) + ) +) + +(deftest test-principal + (let [p (TestPrincipal) + principals (hash-set p) + creds (hash-set) + s (Subject. false principals creds, creds) + rc (ReqContext/context)] + (.setSubject rc s) + (is (not (nil? (.principal rc)))) + (is (= (-> rc .principal .getName) principal-name)) + ) +) diff --git a/test/clj/backtype/storm/security/auth/SaslTransportPlugin_test.clj b/test/clj/backtype/storm/security/auth/SaslTransportPlugin_test.clj new file mode 100644 index 000000000..b844e266a --- /dev/null +++ b/test/clj/backtype/storm/security/auth/SaslTransportPlugin_test.clj @@ -0,0 +1,28 @@ +(ns backtype.storm.security.auth.SaslTransportPlugin-test + (:use [clojure test]) + (import [backtype.storm.security.auth SaslTransportPlugin$User]) +) + +(deftest test-User-name + (let [nam "Andy" + user (SaslTransportPlugin$User. nam)] + (are [a b] (= a b) + nam (.toString user) + (.getName user) (.toString user) + (.hashCode nam) (.hashCode user) + ) + ) +) + +(deftest test-User-equals + (let [nam "Andy" + user1 (SaslTransportPlugin$User. nam) + user2 (SaslTransportPlugin$User. nam) + user3 (SaslTransportPlugin$User. "Bobby")] + (is (-> user1 (.equals user1))) + (is (-> user1 (.equals user2))) + (is (not (-> user1 (.equals nil)))) + (is (not (-> user1 (.equals "Potato")))) + (is (not (-> user1 (.equals user3)))) + ) +) diff --git a/test/clj/backtype/storm/security/auth/ThriftClient_test.clj b/test/clj/backtype/storm/security/auth/ThriftClient_test.clj new file mode 100644 index 000000000..f9c32e685 --- /dev/null +++ b/test/clj/backtype/storm/security/auth/ThriftClient_test.clj @@ -0,0 +1,26 @@ +(ns backtype.storm.security.auth.ThriftClient-test + (:use [backtype.storm config]) + (:use [clojure test]) + (:import [backtype.storm.security.auth ThriftClient]) + (:import [org.apache.thrift7.transport TTransportException]) +) + +(deftest test-ctor-throws-if-port-invalid + (let [conf (read-default-config) + timeout (Integer. 30)] + (is (thrown? java.lang.IllegalArgumentException + (ThriftClient. conf "bogushost" -1 timeout))) + (is (thrown? java.lang.IllegalArgumentException + (ThriftClient. conf "bogushost" 0 timeout))) + ) +) + +(deftest test-ctor-throws-if-host-not-set + (let [conf (read-default-config) + timeout (Integer. 60)] + (is (thrown? TTransportException + (ThriftClient. conf "" 4242 timeout))) + (is (thrown? IllegalArgumentException + (ThriftClient. conf nil 4242 timeout))) + ) +) diff --git a/test/clj/backtype/storm/security/auth/ThriftServer_test.clj b/test/clj/backtype/storm/security/auth/ThriftServer_test.clj new file mode 100644 index 000000000..0c6cad36d --- /dev/null +++ b/test/clj/backtype/storm/security/auth/ThriftServer_test.clj @@ -0,0 +1,14 @@ +(ns backtype.storm.security.auth.ThriftServer-test + (:use [backtype.storm config]) + (:use [clojure test]) + (:import [backtype.storm.security.auth ThriftServer]) + (:import [org.apache.thrift7.transport TTransportException]) +) + +(deftest test-stop-checks-for-null + (let [server (ThriftServer. (read-default-config) nil 12345)] + (.stop server))) + +(deftest test-isServing-checks-for-null + (let [server (ThriftServer. (read-default-config) nil 12345)] + (is (not (.isServing server))))) diff --git a/test/clj/backtype/storm/serialization/SerializationFactory_test.clj b/test/clj/backtype/storm/serialization/SerializationFactory_test.clj new file mode 100644 index 000000000..f39257004 --- /dev/null +++ b/test/clj/backtype/storm/serialization/SerializationFactory_test.clj @@ -0,0 +1,39 @@ +(ns backtype.storm.serialization.SerializationFactory-test + (:import [backtype.storm Config]) + (:import [backtype.storm.security.serialization BlowfishTupleSerializer]) + (:import [backtype.storm.serialization SerializationFactory]) + (:import [backtype.storm.utils ListDelegate]) + (:use [backtype.storm config]) + (:use [clojure test]) +) + + +(deftest test-registers-default-when-not-in-conf + (let [conf (read-default-config) + klass-name (get conf Config/TOPOLOGY_TUPLE_SERIALIZER) + configured-class (Class/forName klass-name) + kryo (SerializationFactory/getKryo conf)] + (is (= configured-class (.getClass (.getSerializer kryo ListDelegate)))) + ) +) + +(deftest test-throws-runtimeexception-when-no-such-class + (let [conf (merge (read-default-config) + {Config/TOPOLOGY_TUPLE_SERIALIZER "null.this.class.does.not.exist"})] + (is (thrown? RuntimeException + (SerializationFactory/getKryo conf))) + ) +) + +(deftest test-registeres-when-valid-class-name + (let [arbitrary-class-name + (String. "backtype.storm.security.serialization.BlowfishTupleSerializer") + serializer-class (Class/forName arbitrary-class-name) + arbitrary-key "0123456789abcdef" + conf (merge (read-default-config) + {Config/TOPOLOGY_TUPLE_SERIALIZER arbitrary-class-name + BlowfishTupleSerializer/SECRET_KEY arbitrary-key}) + kryo (SerializationFactory/getKryo conf)] + (is (= serializer-class (.getClass (.getSerializer kryo ListDelegate)))) + ) +) diff --git a/test/clj/backtype/storm/utils_test.clj b/test/clj/backtype/storm/utils_test.clj index 9ac1c6c31..52f64cd0b 100644 --- a/test/clj/backtype/storm/utils_test.clj +++ b/test/clj/backtype/storm/utils_test.clj @@ -1,8 +1,9 @@ (ns backtype.storm.utils-test (:import [backtype.storm Config]) - (:import [backtype.storm.utils Utils]) + (:import [backtype.storm.utils NimbusClient Utils]) (:import [com.netflix.curator.retry ExponentialBackoffRetry]) - (:use [backtype.storm util]) + (:import [org.apache.thrift7.transport TTransportException]) + (:use [backtype.storm config util]) (:use [clojure test]) ) @@ -26,3 +27,23 @@ (is (= (.getSleepTimeMs retry 10 0) expected_ceiling)) ) ) + +(deftest test-getConfiguredClient-throws-RunTimeException-on-bad-config + (let [storm-conf (merge (read-storm-config) + {STORM-THRIFT-TRANSPORT-PLUGIN + "backtype.storm.security.auth.SimpleTransportPlugin" + Config/NIMBUS_HOST "" + Config/NIMBUS_THRIFT_PORT 65535 + })] + (is (thrown? RuntimeException + (NimbusClient/getConfiguredClient storm-conf))) + ) +) + +(deftest test-getConfiguredClient-throws-RunTimeException-on-bad-args + (let [storm-conf (read-storm-config)] + (is (thrown? TTransportException + (NimbusClient. storm-conf "" 65535) + )) + ) +)