From 59d4187af15dfc3aec818dc8983b8780af7c71e6 Mon Sep 17 00:00:00 2001 From: Alex Clemmer Date: Sat, 19 Jan 2013 15:04:57 -0700 Subject: [PATCH 01/13] Change StormTopology state serialization from Java to Thrift config and nimbus currently use the core Java serialization to store StormTopology instances. This commit will change this to use Thrift serialization instead. StormTopology is a Thrift struct so this basically involves finding all the places we call `Utils/serialize` and `Utils/deserialize` and replace them with a call to a method that serializes with Thrift instead. --- storm-core/src/clj/backtype/storm/config.clj | 2 +- .../src/clj/backtype/storm/daemon/nimbus.clj | 4 +-- .../src/jvm/backtype/storm/utils/Utils.java | 25 +++++++++++++++++++ 3 files changed, 28 insertions(+), 3 deletions(-) diff --git a/storm-core/src/clj/backtype/storm/config.clj b/storm-core/src/clj/backtype/storm/config.clj index c11074eb1..675a0bc09 100644 --- a/storm-core/src/clj/backtype/storm/config.clj +++ b/storm-core/src/clj/backtype/storm/config.clj @@ -180,7 +180,7 @@ (defn read-supervisor-topology [conf storm-id] (let [stormroot (supervisor-stormdist-root conf storm-id) topology-path (supervisor-stormcode-path stormroot)] - (Utils/deserialize (FileUtils/readFileToByteArray (File. topology-path))) + (Utils/deserializeTopology (FileUtils/readFileToByteArray (File. topology-path))) )) (defn worker-root diff --git a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj index e58aeedd0..3ef0c921c 100644 --- a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj +++ b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj @@ -295,13 +295,13 @@ (FileUtils/forceMkdir (File. stormroot)) (FileUtils/cleanDirectory (File. stormroot)) (setup-jar conf tmp-jar-location stormroot) - (FileUtils/writeByteArrayToFile (File. (master-stormcode-path stormroot)) (Utils/serialize topology)) + (FileUtils/writeByteArrayToFile (File. (master-stormcode-path stormroot)) (Utils/serializeTopology topology)) (FileUtils/writeByteArrayToFile (File. (master-stormconf-path stormroot)) (Utils/serialize storm-conf)) )) (defn- read-storm-topology [conf storm-id] (let [stormroot (master-stormdist-root conf storm-id)] - (Utils/deserialize + (Utils/deserializeTopology (FileUtils/readFileToByteArray (File. (master-stormcode-path stormroot)) )))) diff --git a/storm-core/src/jvm/backtype/storm/utils/Utils.java b/storm-core/src/jvm/backtype/storm/utils/Utils.java index a31402e5a..9d0b90013 100644 --- a/storm-core/src/jvm/backtype/storm/utils/Utils.java +++ b/storm-core/src/jvm/backtype/storm/utils/Utils.java @@ -30,11 +30,18 @@ import java.util.UUID; import org.apache.commons.lang.StringUtils; import org.apache.thrift7.TException; +import org.apache.thrift7.TSerializer; +import org.apache.thrift7.TDeserializer; +import org.apache.thrift7.protocol.TBinaryProtocol; import org.json.simple.JSONValue; import org.yaml.snakeyaml.Yaml; public class Utils { public static final String DEFAULT_STREAM_ID = "default"; + private static final TSerializer topoSerializer = + new TSerializer(new TBinaryProtocol.Factory()); + private static final TDeserializer topoDeserializer = + new TDeserializer(new TBinaryProtocol.Factory()); public static Object newInstance(String klass) { try { @@ -71,6 +78,24 @@ public static Object deserialize(byte[] serialized) { } } + public static byte[] serializeTopology(StormTopology topo) { + try { + return topoSerializer.serialize(topo); + } catch(TException te) { + throw new RuntimeException(te); + } + } + + public static StormTopology deserializeTopology(byte[] bytes) { + StormTopology topo = new StormTopology(); + try { + topoDeserializer.deserialize(topo, bytes); + return topo; + } catch(TException te) { + throw new RuntimeException(te); + } + } + public static String join(Iterable coll, String sep) { Iterator it = coll.iterator(); String ret = ""; From 46ca74980c5964d5d82769e757cc8f18dac2b1dd Mon Sep 17 00:00:00 2001 From: Alex Clemmer Date: Sat, 19 Jan 2013 16:55:17 -0700 Subject: [PATCH 02/13] Move serialization of Storm conf from Java default to Clojure default config.clj and nimbus serialize Storm configuration using the default Java implementation. This commit will phase out Java default serialization in favor of Clojure default serialization. There are obviously many reasons to phase out Java serialization, but the main rationale is that Java serialization will complain about version mismatch even if the change is semantically backward compatible. --- storm-core/src/clj/backtype/storm/config.clj | 11 ++++++++++- storm-core/src/clj/backtype/storm/daemon/nimbus.clj | 4 ++-- storm-core/src/clj/backtype/storm/util.clj | 1 + 3 files changed, 13 insertions(+), 3 deletions(-) diff --git a/storm-core/src/clj/backtype/storm/config.clj b/storm-core/src/clj/backtype/storm/config.clj index 675a0bc09..1e4072b82 100644 --- a/storm-core/src/clj/backtype/storm/config.clj +++ b/storm-core/src/clj/backtype/storm/config.clj @@ -9,6 +9,14 @@ (def RESOURCES-SUBDIR "resources") +(def ^:const def-ser-enc "UTF-8") + +(defn serialize-conf [form] + (.getBytes (pr-str form) def-ser-enc)) + +(defn deserialize-conf [form] + (read-string (String. form def-ser-enc))) + (defn- clojure-config-name [name] (.replace (.toUpperCase name) "_" "-")) @@ -174,7 +182,7 @@ (let [stormroot (supervisor-stormdist-root conf storm-id) conf-path (supervisor-stormconf-path stormroot) topology-path (supervisor-stormcode-path stormroot)] - (merge conf (Utils/deserialize (FileUtils/readFileToByteArray (File. conf-path)))) + (merge conf (deserialize-conf (FileUtils/readFileToByteArray (File. conf-path)))) )) (defn read-supervisor-topology [conf storm-id] @@ -205,3 +213,4 @@ ;; in local mode, keep a global map of ids to threads for simulating process management (defn ^LocalState worker-state [conf id] (LocalState. (worker-heartbeats-root conf id))) + diff --git a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj index 3ef0c921c..22cec8e9f 100644 --- a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj +++ b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj @@ -68,7 +68,7 @@ (defn- read-storm-conf [conf storm-id] (let [stormroot (master-stormdist-root conf storm-id)] (merge conf - (Utils/deserialize + (deserialize-conf (FileUtils/readFileToByteArray (File. (master-stormconf-path stormroot)) ))))) @@ -296,7 +296,7 @@ (FileUtils/cleanDirectory (File. stormroot)) (setup-jar conf tmp-jar-location stormroot) (FileUtils/writeByteArrayToFile (File. (master-stormcode-path stormroot)) (Utils/serializeTopology topology)) - (FileUtils/writeByteArrayToFile (File. (master-stormconf-path stormroot)) (Utils/serialize storm-conf)) + (FileUtils/writeByteArrayToFile (File. (master-stormconf-path stormroot)) (serialize-conf storm-conf)) )) (defn- read-storm-topology [conf storm-id] diff --git a/storm-core/src/clj/backtype/storm/util.clj b/storm-core/src/clj/backtype/storm/util.clj index ecc87ef77..dfe4c9e6e 100644 --- a/storm-core/src/clj/backtype/storm/util.clj +++ b/storm-core/src/clj/backtype/storm/util.clj @@ -844,3 +844,4 @@ (meta form)) (list form x))) ([x form & more] `(-<> (-<> ~x ~form) ~@more))) + From 44cfa944bea1a464e319b06af12c0ae237437ef3 Mon Sep 17 00:00:00 2001 From: Alex Clemmer Date: Sat, 19 Jan 2013 17:08:07 -0700 Subject: [PATCH 03/13] Fix serialization concurrency bug TSerializer is not threadsafe. In `Utils` we instantiate a static final Tserializer, but this can (and will) cause odd bugs if we start calling `serialize()` in different threads. Thus, every time we call `Utils/serializeTopology`, we create a new TSerializer. Another way to do this would be to lock it, which performance may or may not merit. --- storm-core/src/jvm/backtype/storm/utils/Utils.java | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/storm-core/src/jvm/backtype/storm/utils/Utils.java b/storm-core/src/jvm/backtype/storm/utils/Utils.java index 9d0b90013..57875aad2 100644 --- a/storm-core/src/jvm/backtype/storm/utils/Utils.java +++ b/storm-core/src/jvm/backtype/storm/utils/Utils.java @@ -38,10 +38,6 @@ public class Utils { public static final String DEFAULT_STREAM_ID = "default"; - private static final TSerializer topoSerializer = - new TSerializer(new TBinaryProtocol.Factory()); - private static final TDeserializer topoDeserializer = - new TDeserializer(new TBinaryProtocol.Factory()); public static Object newInstance(String klass) { try { @@ -79,17 +75,23 @@ public static Object deserialize(byte[] serialized) { } public static byte[] serializeTopology(StormTopology topo) { + // TSerializer not threadsafe; lock or new instance every time + TSerializer serializer = + new TSerializer(new TBinaryProtocol.Factory()); try { - return topoSerializer.serialize(topo); + return serializer.serialize(topo); } catch(TException te) { throw new RuntimeException(te); } } public static StormTopology deserializeTopology(byte[] bytes) { + // TDeserializer not threadsafe; lock or new instance every time + TDeserializer deserializer = + new TDeserializer(new TBinaryProtocol.Factory()); StormTopology topo = new StormTopology(); try { - topoDeserializer.deserialize(topo, bytes); + deserializer.deserialize(topo, bytes); return topo; } catch(TException te) { throw new RuntimeException(te); From 50070c77901b779dc6d2ac2a0ccd1fad9a9dcdc0 Mon Sep 17 00:00:00 2001 From: Alex Clemmer Date: Sun, 20 Jan 2013 11:39:29 -0700 Subject: [PATCH 04/13] Make serialization funcitons more generic Serialization of configuration is handled in config, but it is not different from a generic method for Clojure form serialization. This commit will move this method to utils so that we can use it for other things, like serialization in cluster. --- storm-core/src/clj/backtype/storm/config.clj | 10 +--------- storm-core/src/clj/backtype/storm/daemon/nimbus.clj | 4 ++-- storm-core/src/clj/backtype/storm/util.clj | 10 ++++++++++ 3 files changed, 13 insertions(+), 11 deletions(-) diff --git a/storm-core/src/clj/backtype/storm/config.clj b/storm-core/src/clj/backtype/storm/config.clj index 1e4072b82..0796722f4 100644 --- a/storm-core/src/clj/backtype/storm/config.clj +++ b/storm-core/src/clj/backtype/storm/config.clj @@ -9,14 +9,6 @@ (def RESOURCES-SUBDIR "resources") -(def ^:const def-ser-enc "UTF-8") - -(defn serialize-conf [form] - (.getBytes (pr-str form) def-ser-enc)) - -(defn deserialize-conf [form] - (read-string (String. form def-ser-enc))) - (defn- clojure-config-name [name] (.replace (.toUpperCase name) "_" "-")) @@ -182,7 +174,7 @@ (let [stormroot (supervisor-stormdist-root conf storm-id) conf-path (supervisor-stormconf-path stormroot) topology-path (supervisor-stormcode-path stormroot)] - (merge conf (deserialize-conf (FileUtils/readFileToByteArray (File. conf-path)))) + (merge conf (deserialize-clj-bytes (FileUtils/readFileToByteArray (File. conf-path)))) )) (defn read-supervisor-topology [conf storm-id] diff --git a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj index 22cec8e9f..1da3691de 100644 --- a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj +++ b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj @@ -68,7 +68,7 @@ (defn- read-storm-conf [conf storm-id] (let [stormroot (master-stormdist-root conf storm-id)] (merge conf - (deserialize-conf + (deserialize-clj-bytes (FileUtils/readFileToByteArray (File. (master-stormconf-path stormroot)) ))))) @@ -296,7 +296,7 @@ (FileUtils/cleanDirectory (File. stormroot)) (setup-jar conf tmp-jar-location stormroot) (FileUtils/writeByteArrayToFile (File. (master-stormcode-path stormroot)) (Utils/serializeTopology topology)) - (FileUtils/writeByteArrayToFile (File. (master-stormconf-path stormroot)) (serialize-conf storm-conf)) + (FileUtils/writeByteArrayToFile (File. (master-stormconf-path stormroot)) (serialize-clj-bytes storm-conf)) )) (defn- read-storm-topology [conf storm-id] diff --git a/storm-core/src/clj/backtype/storm/util.clj b/storm-core/src/clj/backtype/storm/util.clj index dfe4c9e6e..383653a71 100644 --- a/storm-core/src/clj/backtype/storm/util.clj +++ b/storm-core/src/clj/backtype/storm/util.clj @@ -845,3 +845,13 @@ (list form x))) ([x form & more] `(-<> (-<> ~x ~form) ~@more))) +(def ^:const def-ser-enc "UTF-8") + +(defn serialize-clj-bytes [form] + "serializes Clojure form to UTF-8 byte array" + (.getBytes (pr-str form) def-ser-enc)) + +(defn deserialize-clj-bytes [form] + "deserializes Clojure form fom UTF-8 byte array" + (read-string (String. form def-ser-enc))) + From 9492da30b31809987a25d347c86f825e66deba6b Mon Sep 17 00:00:00 2001 From: Alex Clemmer Date: Sun, 20 Jan 2013 12:09:20 -0700 Subject: [PATCH 05/13] Transition cluster state serialization to Clojure form serialization cluster.clj uses the stock Java serialization implementation. There are obviously many reason to not use standard Java serialization, but our main motivation is that Java will complain about serialized state when the versions don't match even if they're semantically backwards compatible. --- storm-core/src/clj/backtype/storm/cluster.clj | 26 +++++++++---------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/storm-core/src/clj/backtype/storm/cluster.clj b/storm-core/src/clj/backtype/storm/cluster.clj index 7231b15d6..02fae73cb 100644 --- a/storm-core/src/clj/backtype/storm/cluster.clj +++ b/storm-core/src/clj/backtype/storm/cluster.clj @@ -173,9 +173,9 @@ (cb id)) )) -(defn- maybe-deserialize [ser] +(defn- maybe-deserialize-clj-bytes [ser] (when ser - (Utils/deserialize ser))) + (deserialize-clj-bytes ser))) (defstruct TaskError :error :time-secs) @@ -230,7 +230,7 @@ (assignment-info [this storm-id callback] (when callback (swap! assignment-info-callback assoc storm-id callback)) - (maybe-deserialize (get-data cluster-state (assignment-path storm-id) (not-nil? callback))) + (maybe-deserialize-clj-bytes (get-data cluster-state (assignment-path storm-id) (not-nil? callback))) ) (active-storms [this] @@ -248,7 +248,7 @@ (get-worker-heartbeat [this storm-id node port] (-> cluster-state (get-data (workerbeat-path storm-id node port) false) - maybe-deserialize)) + maybe-deserialize-clj-bytes)) (executor-beats [this storm-id executor->node+port] ;; need to take executor->node+port in explicitly so that we don't run into a situation where a @@ -269,11 +269,11 @@ ) (supervisor-info [this supervisor-id] - (maybe-deserialize (get-data cluster-state (supervisor-path supervisor-id) false)) + (maybe-deserialize-clj-bytes (get-data cluster-state (supervisor-path supervisor-id) false)) ) (worker-heartbeat! [this storm-id node port info] - (set-data cluster-state (workerbeat-path storm-id node port) (Utils/serialize info))) + (set-data cluster-state (workerbeat-path storm-id node port) (serialize-clj-bytes info))) (remove-worker-heartbeat! [this storm-id node port] (delete-node cluster-state (workerbeat-path storm-id node port)) @@ -297,11 +297,11 @@ ))) (supervisor-heartbeat! [this supervisor-id info] - (set-ephemeral-node cluster-state (supervisor-path supervisor-id) (Utils/serialize info)) + (set-ephemeral-node cluster-state (supervisor-path supervisor-id) (serialize-clj-bytes info)) ) (activate-storm! [this storm-id storm-base] - (set-data cluster-state (storm-path storm-id) (Utils/serialize storm-base)) + (set-data cluster-state (storm-path storm-id) (serialize-clj-bytes storm-base)) ) (update-storm! [this storm-id new-elems] @@ -311,12 +311,12 @@ (set-data cluster-state (storm-path storm-id) (-> base (merge new-elems) - Utils/serialize)))) + serialize-clj-bytes)))) (storm-base [this storm-id callback] (when callback (swap! storm-base-callback assoc storm-id callback)) - (maybe-deserialize (get-data cluster-state (storm-path storm-id) (not-nil? callback))) + (maybe-deserialize-clj-bytes (get-data cluster-state (storm-path storm-id) (not-nil? callback))) ) (remove-storm-base! [this storm-id] @@ -324,7 +324,7 @@ ) (set-assignment! [this storm-id info] - (set-data cluster-state (assignment-path storm-id) (Utils/serialize info)) + (set-data cluster-state (assignment-path storm-id) (serialize-clj-bytes info)) ) (remove-storm! [this storm-id] @@ -335,7 +335,7 @@ (let [path (error-path storm-id component-id) data {:time-secs (current-time-secs) :error (stringify-error error)} _ (mkdirs cluster-state path) - _ (create-sequential cluster-state (str path "/e") (Utils/serialize data)) + _ (create-sequential cluster-state (str path "/e") (serialize-clj-bytes data)) to-kill (->> (get-children cluster-state path false) (sort-by parse-error-path) reverse @@ -349,7 +349,7 @@ children (get-children cluster-state path false) errors (dofor [c children] (let [data (-> (get-data cluster-state (str path "/" c) false) - maybe-deserialize)] + maybe-deserialize-clj-bytes)] (when data (struct TaskError (:error data) (:time-secs data)) ))) From 00f0274198efd39c5410e1c7adb06f3b913f5abe Mon Sep 17 00:00:00 2001 From: Alex Clemmer Date: Fri, 22 Feb 2013 11:21:36 -0700 Subject: [PATCH 06/13] Source LocalState serialization logic to state serialization interface Supervisor currently just uses stock jvm serialization to communicate LocalState. This is undesirable for many reasons, so this commit will introduce a serialization interface which makes code cleaner, letting us specify which type of serializer to use without populating Supervisor with unnecessary boxing/unboxing behavior, or LocalState with too much knowledge about what's happening in the Supervisor. Also this commit will introduce a basic implementation sketch for a serializer for LocalState (though it will just use the jvm serialization at this point). --- .../backtype/storm/utils/LocalStateSerializer.java | 14 ++++++++++++++ src/jvm/backtype/storm/utils/StateSerializer.java | 12 ++++++++++++ .../src/jvm/backtype/storm/utils/LocalState.java | 13 ++++++++++--- 3 files changed, 36 insertions(+), 3 deletions(-) create mode 100644 src/jvm/backtype/storm/utils/LocalStateSerializer.java create mode 100644 src/jvm/backtype/storm/utils/StateSerializer.java diff --git a/src/jvm/backtype/storm/utils/LocalStateSerializer.java b/src/jvm/backtype/storm/utils/LocalStateSerializer.java new file mode 100644 index 000000000..ef2cc2a88 --- /dev/null +++ b/src/jvm/backtype/storm/utils/LocalStateSerializer.java @@ -0,0 +1,14 @@ +package backtype.storm.utils; + +import java.util.Map; + +public class LocalStateSerializer implements StateSerializer { + public byte[] serializeState (Map val) { + return Utils.serialize(val); + } + + public Map deserializeState (byte[] ser) { + return (Map) Utils.deserialize(ser); + } + +} \ No newline at end of file diff --git a/src/jvm/backtype/storm/utils/StateSerializer.java b/src/jvm/backtype/storm/utils/StateSerializer.java new file mode 100644 index 000000000..ad111359a --- /dev/null +++ b/src/jvm/backtype/storm/utils/StateSerializer.java @@ -0,0 +1,12 @@ +package backtype.storm.utils; + +import java.util.Map; + +/** + * Interface for serializing state, for example, when the `Supervisor` + * serializes `LocalState`. + */ +public interface StateSerializer { + public byte[] serializeState (Map val); + public Map deserializeState (byte[] ser); +} \ No newline at end of file diff --git a/storm-core/src/jvm/backtype/storm/utils/LocalState.java b/storm-core/src/jvm/backtype/storm/utils/LocalState.java index 73560392c..905ea98a9 100644 --- a/storm-core/src/jvm/backtype/storm/utils/LocalState.java +++ b/storm-core/src/jvm/backtype/storm/utils/LocalState.java @@ -14,9 +14,16 @@ */ public class LocalState { private VersionedStore _vs; + private StateSerializer _stSer; public LocalState(String backingDir) throws IOException { _vs = new VersionedStore(backingDir); + _stSer = new LocalStateSerializer(); + } + + public LocalState (String backingDir, StateSerializer stSer) throws IOException { + _vs = new VersionedStore(backingDir); + _stSer = stSer; } public synchronized Map snapshot() throws IOException { @@ -25,7 +32,7 @@ public synchronized Map snapshot() throws IOException { String latestPath = _vs.mostRecentVersionPath(); if(latestPath==null) return new HashMap(); try { - return (Map) Utils.deserialize(FileUtils.readFileToByteArray(new File(latestPath))); + return (Map) _stSer.deserializeState(FileUtils.readFileToByteArray(new File(latestPath))); } catch(IOException e) { attempts++; if(attempts >= 10) { @@ -64,10 +71,10 @@ public synchronized void cleanup(int keepVersions) throws IOException { } private void persist(Map val, boolean cleanup) throws IOException { - byte[] toWrite = Utils.serialize(val); + byte[] toWrite = _stSer.serializeState(val); String newPath = _vs.createVersion(); FileUtils.writeByteArrayToFile(new File(newPath), toWrite); _vs.succeedVersion(newPath); if(cleanup) _vs.cleanup(4); } -} \ No newline at end of file +} From 9d673a872b79071df9c114ac05d0726358a38221 Mon Sep 17 00:00:00 2001 From: Alex Clemmer Date: Fri, 22 Feb 2013 21:50:59 -0700 Subject: [PATCH 07/13] Source LocalState constants to Constants.java To build the serialization interface in Java, we need to put these constants in a Java file. This commit will put them in Constants.java --- .../src/clj/backtype/storm/daemon/common.clj | 6 ----- .../clj/backtype/storm/daemon/supervisor.clj | 24 ++++++++++--------- .../src/clj/backtype/storm/daemon/worker.clj | 4 ++-- storm-core/src/clj/backtype/storm/testing.clj | 5 ++-- .../src/jvm/backtype/storm/Constants.java | 8 ++++++- 5 files changed, 25 insertions(+), 22 deletions(-) diff --git a/storm-core/src/clj/backtype/storm/daemon/common.clj b/storm-core/src/clj/backtype/storm/daemon/common.clj index dd8b12f85..321e6495f 100644 --- a/storm-core/src/clj/backtype/storm/daemon/common.clj +++ b/storm-core/src/clj/backtype/storm/daemon/common.clj @@ -40,12 +40,6 @@ (defprotocol DaemonCommon (waiting? [this])) -(def LS-WORKER-HEARTBEAT "worker-heartbeat") - -;; LocalState constants -(def LS-ID "supervisor-id") -(def LS-LOCAL-ASSIGNMENTS "local-assignments") -(def LS-APPROVED-WORKERS "approved-workers") diff --git a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj index 150443165..9e0ee32ce 100644 --- a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj +++ b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj @@ -1,5 +1,6 @@ (ns backtype.storm.daemon.supervisor (:import [backtype.storm.scheduler ISupervisor]) + (:import [backtype.storm Constants]) (:use [backtype.storm bootstrap]) (:use [backtype.storm.daemon common]) (:require [backtype.storm.daemon [worker :as worker]]) @@ -59,7 +60,7 @@ (defn read-worker-heartbeat [conf id] (let [local-state (worker-state conf id)] - (.get local-state LS-WORKER-HEARTBEAT) + (.get local-state Constants/LS_WORKER_HEARTBEAT) )) @@ -89,7 +90,7 @@ (let [conf (:conf supervisor) ^LocalState local-state (:local-state supervisor) id->heartbeat (read-worker-heartbeats conf) - approved-ids (set (keys (.get local-state LS-APPROVED-WORKERS)))] + approved-ids (set (keys (.get local-state Constants/LS_APPROVED_WORKERS)))] (into {} (dofor [[id hb] id->heartbeat] @@ -112,7 +113,7 @@ (defn- wait-for-worker-launch [conf id start-time] (let [state (worker-state conf id)] (loop [] - (let [hb (.get state LS-WORKER-HEARTBEAT)] + (let [hb (.get state Constants/LS_WORKER_HEARTBEAT)] (when (and (not hb) (< @@ -123,7 +124,7 @@ (Time/sleep 500) (recur) ))) - (when-not (.get state LS-WORKER-HEARTBEAT) + (when-not (.get state Constants/LS_WORKER_HEARTBEAT) (log-message "Worker " id " failed to start") ))) @@ -184,7 +185,7 @@ (defn sync-processes [supervisor] (let [conf (:conf supervisor) ^LocalState local-state (:local-state supervisor) - assigned-executors (defaulted (.get local-state LS-LOCAL-ASSIGNMENTS) {}) + assigned-executors (defaulted (.get local-state Constants/LS_LOCAL_ASSIGNMENTS) {}) now (current-time-secs) allocated (read-allocated-workers supervisor assigned-executors now) keepers (filter-val @@ -221,11 +222,12 @@ )) (doseq [id (vals new-worker-ids)] (local-mkdirs (worker-pids-root conf id))) - (.put local-state LS-APPROVED-WORKERS + (.put local-state Constants/LS_APPROVED_WORKERS (merge - (select-keys (.get local-state LS-APPROVED-WORKERS) + (select-keys (.get local-state Constants/LS_APPROVED_WORKERS) (keys keepers)) (zipmap (vals new-worker-ids) (keys new-worker-ids)) + ;))) )) (wait-for-workers-launch conf @@ -269,7 +271,7 @@ new-assignment (->> all-assignment (filter-key #(.confirmAssigned isupervisor %))) assigned-storm-ids (assigned-storm-ids-from-port-assignments new-assignment) - existing-assignment (.get local-state LS-LOCAL-ASSIGNMENTS)] + existing-assignment (.get local-state Constants/LS_LOCAL_ASSIGNMENTS)] (log-debug "Synchronizing supervisor") (log-debug "Storm code map: " storm-code-map) (log-debug "Downloaded storm ids: " downloaded-storm-ids) @@ -301,7 +303,7 @@ (.killedWorker isupervisor (int p))) (.assigned isupervisor (keys new-assignment)) (.put local-state - LS-LOCAL-ASSIGNMENTS + Constants/LS_LOCAL_ASSIGNMENTS new-assignment) (reset! (:curr-assignment supervisor) new-assignment) ;; remove any downloaded code that's no longer assigned or active @@ -482,10 +484,10 @@ (prepare [this conf local-dir] (reset! conf-atom conf) (let [state (LocalState. local-dir) - curr-id (if-let [id (.get state LS-ID)] + curr-id (if-let [id (.get state Constants/LS_ID)] id (generate-supervisor-id))] - (.put state LS-ID curr-id) + (.put state Constants/LS_ID curr-id) (reset! id-atom curr-id)) ) (confirmAssigned [this port] diff --git a/storm-core/src/clj/backtype/storm/daemon/worker.clj b/storm-core/src/clj/backtype/storm/daemon/worker.clj index 5182027c2..9e469feb1 100644 --- a/storm-core/src/clj/backtype/storm/daemon/worker.clj +++ b/storm-core/src/clj/backtype/storm/daemon/worker.clj @@ -5,6 +5,7 @@ (:import [java.util.concurrent Executors]) (:import [backtype.storm.messaging TransportFactory]) (:import [backtype.storm.messaging IContext IConnection]) + (:import [backtype.storm Constants]) (:gen-class)) (bootstrap) @@ -49,13 +50,12 @@ (log-debug "Doing heartbeat " (pr-str hb)) ;; do the local-file-system heartbeat. (.put state - LS-WORKER-HEARTBEAT + Constants/LS-WORKER-HEARTBEAT hb false ) (.cleanup state 60) ; this is just in case supervisor is down so that disk doesn't fill up. ; it shouldn't take supervisor 120 seconds between listing dir and reading it - )) (defn worker-outbound-tasks diff --git a/storm-core/src/clj/backtype/storm/testing.clj b/storm-core/src/clj/backtype/storm/testing.clj index 2eb92d814..d90b08d78 100644 --- a/storm-core/src/clj/backtype/storm/testing.clj +++ b/storm-core/src/clj/backtype/storm/testing.clj @@ -11,6 +11,7 @@ (:import [java.util HashMap ArrayList]) (:import [java.util.concurrent.atomic AtomicInteger]) (:import [java.util.concurrent ConcurrentHashMap]) + (:import [backtype.storm Constants]) (:import [backtype.storm.utils Time Utils RegisteredGlobalState]) (:import [backtype.storm.tuple Fields Tuple TupleImpl]) (:import [backtype.storm.task TopologyContext]) @@ -258,13 +259,13 @@ (defn find-worker-id [supervisor-conf port] (let [supervisor-state (supervisor-state supervisor-conf) - worker->port (.get supervisor-state common/LS-APPROVED-WORKERS)] + worker->port (.get supervisor-state Constants/LS_APPROVED_WORKERS)] (first ((reverse-map worker->port) port)) )) (defn find-worker-port [supervisor-conf worker-id] (let [supervisor-state (supervisor-state supervisor-conf) - worker->port (.get supervisor-state common/LS-APPROVED-WORKERS) + worker->port (.get supervisor-state Constants/LS_APPROVED_WORKERS) ] (worker->port worker-id) )) diff --git a/storm-core/src/jvm/backtype/storm/Constants.java b/storm-core/src/jvm/backtype/storm/Constants.java index a8ade3c53..ab63db0dc 100644 --- a/storm-core/src/jvm/backtype/storm/Constants.java +++ b/storm-core/src/jvm/backtype/storm/Constants.java @@ -14,5 +14,11 @@ public class Constants { public static final String METRICS_COMPONENT_ID_PREFIX = "__metrics"; public static final String METRICS_STREAM_ID = "__metrics"; public static final String METRICS_TICK_STREAM_ID = "__metrics_tick"; + + // LocalState constants + public static final String LS_WORKER_HEARTBEAT = "worker-heartbeat"; + public static final String LS_ID = "supervisor-id"; + public static final String LS_LOCAL_ASSIGNMENTS = "local-assignments"; + public static final String LS_APPROVED_WORKERS = "approved-workers"; } - \ No newline at end of file + From 55682d85ffd8bb737bef7c941205134cb5ac4899 Mon Sep 17 00:00:00 2001 From: Alex Clemmer Date: Mon, 4 Mar 2013 11:19:19 -0700 Subject: [PATCH 08/13] Add sketch of LocalStateSerializer --- .../storm/utils/LocalStateSerializer.java | 88 ++++++++++++++++++- 1 file changed, 87 insertions(+), 1 deletion(-) diff --git a/src/jvm/backtype/storm/utils/LocalStateSerializer.java b/src/jvm/backtype/storm/utils/LocalStateSerializer.java index ef2cc2a88..cb3f9c052 100644 --- a/src/jvm/backtype/storm/utils/LocalStateSerializer.java +++ b/src/jvm/backtype/storm/utils/LocalStateSerializer.java @@ -1,14 +1,100 @@ package backtype.storm.utils; +import backtype.storm.Constants; +import clojure.lang.PersistentArrayMap; +import java.util.Iterator; +import java.util.HashMap; import java.util.Map; +import org.json.simple.JSONObject; +import org.json.simple.JSONValue; + public class LocalStateSerializer implements StateSerializer { public byte[] serializeState (Map val) { + /* + for (Map.Entry entry : val.entrySet()) { + Object v = entry.getValue(); + + if (v instanceof String) { + JSONObject ser = new JSONObject(); + } + else if(v instanceof clojure.lang.PersistentArrayMap) { + if (((Map) v).size() > 0) { + System.err.println(((Map) v).size()); + System.err.println("cow"); + System.err.println(JSONValue.toJSONString(val)); + System.err.println(JSONValue.parse(JSONValue.toJSONString(val))); + throw new RuntimeException(((Map) v).size() + " " + entry.getValue().getClass().toString()); + } + } + else + throw new RuntimeException(v.getClass().toString()); + } + */ + // LS_ID : String + // LS_WORKER_HEARTBEAT : backtype.storm.daemon.common.WorkerHeartbeat + // LS_LOCAL_ASSIGNMENTS : clojure.lang.PersistentArrayMap + // LS_APPROVED_WORKERS : clojure.lang.PersistentArrayMap + HashMap toSerialize = new HashMap(); + for (Map.Entry entry : val.entrySet()) { + String k = (String) entry.getKey(); + Object v = entry.getValue(); + + if (k.equals(Constants.LS_WORKER_HEARTBEAT)) + // ser worker heartbeat + ; + else if (k.equals(Constants.LS_APPROVED_WORKERS)) + toSerialize.put(k, serLsApprovedWorkers((PersistentArrayMap) v)); + else if (k.equals(Constants.LS_ID)) + toSerialize.put(k, v); + else if (k.equals(Constants.LS_LOCAL_ASSIGNMENTS)) + toSerialize.put(k, serLsLocalAssignments((PersistentArrayMap) v)); + else + throw new RuntimeException("LocalState could not be " + + "serialized; procedure for key " + + k + " has not been implemented"); + } + //throw new RuntimeException("not implemented"); return Utils.serialize(val); } + public String serLsLocalAssignments (PersistentArrayMap assg) { + return JSONValue.toJSONString(assg); + } + + public String serLsApprovedWorkers (PersistentArrayMap workers) { + /* + Iterator iter = workers.iterator(); + while (iter.hasNext()) { + clojure.lang.IMapEntry kv = iter.next(); + System.err.println(kv); + System.err.println(kv.key()); + System.err.println(kv.val()); + } + */ + return JSONValue.toJSONString(workers); + } + public Map deserializeState (byte[] ser) { + /* + String s = new String(serialized); + //System.err.println("DESER " + JSONValue.parse(s)); + Object parsed = JSONValue.parse(s); + if (parsed != null) + return parsed; + else + return new HashMap(); + */ return (Map) Utils.deserialize(ser); } - + /* + private static byte[] serializeLocalState(Map val) { + //System.err.println(JSONValue.toJSONString(val)); + //System.err.println("SER " + JSONValue.toJSONString(val)); + //return JSONValue.toJSONString(val).getBytes(); + } + + private static Object deserializeLocalState(byte[] serialized) { + } + */ } \ No newline at end of file From 1aba69c66f5443916ca6d47b84627537bebbf4b9 Mon Sep 17 00:00:00 2001 From: Alex Clemmer Date: Fri, 8 Mar 2013 21:03:27 -0700 Subject: [PATCH 09/13] Add clojure-based serialization implementation for LocalState --- .../storm/utils/localstate_serializer.clj | 34 +++++++++++++++++++ 1 file changed, 34 insertions(+) create mode 100644 src/clj/backtype/storm/utils/localstate_serializer.clj diff --git a/src/clj/backtype/storm/utils/localstate_serializer.clj b/src/clj/backtype/storm/utils/localstate_serializer.clj new file mode 100644 index 000000000..ed5822b76 --- /dev/null +++ b/src/clj/backtype/storm/utils/localstate_serializer.clj @@ -0,0 +1,34 @@ +;; Use to serialize LocalState. We assume that LocalState is a k/v store that +;; uses java.util.HashMap to store the following keys +;; LS_ID : String +;; LS_WORKER_HEARTBEAT : backtype.storm.daemon.common.WorkerHeartbeat +;; LS_LOCAL_ASSIGNMENTS : clojure.lang.PersistentArrayMap +;; LS_APPROVED_WORKERS : clojure.lang.PersistentArrayMap + +(ns backtype.storm.utils.localstate-serializer + (:import [backtype.storm.utils Utils]) + (:import [backtype.storm Constants]) + (:use [backtype.storm util]) + ) + +; java.util.HashMap -> byte[] +(defn serialize-localstate [form] + (serialize-clj-bytes (into {} form))) + +; byte[] -> java.util.HashMap +(defn deserialize-localstate [form] + (let [newm (java.util.HashMap.)] + (.putAll newm (deserialize-clj-bytes form)) + newm)) + +(defn localstate-serializer [] + (reify + backtype.storm.utils.StateSerializer + (serializeState [this val] (serialize-localstate val)) + (deserializeState [this ser] (deserialize-localstate ser)))) + +(defn localstate-default-serializer [] + (reify + backtype.storm.utils.StateSerializer + (serializeState [this val] (Utils/serialize val)) + (deserializeState [this ser] (Utils/deserialize ser)))) \ No newline at end of file From 8a836642acbac95df224a5b79c38f752d3665488 Mon Sep 17 00:00:00 2001 From: Alex Clemmer Date: Fri, 8 Mar 2013 21:09:15 -0700 Subject: [PATCH 10/13] Update Supervisor to use clojure serialization --- storm-core/src/clj/backtype/storm/config.clj | 6 ++++-- storm-core/src/clj/backtype/storm/daemon/common.clj | 2 +- storm-core/src/clj/backtype/storm/daemon/supervisor.clj | 5 +++-- storm-core/src/clj/backtype/storm/daemon/task.clj | 2 +- storm-core/test/clj/backtype/storm/local_state_test.clj | 8 +++++--- 5 files changed, 14 insertions(+), 9 deletions(-) diff --git a/storm-core/src/clj/backtype/storm/config.clj b/storm-core/src/clj/backtype/storm/config.clj index 0796722f4..cedc5b7bc 100644 --- a/storm-core/src/clj/backtype/storm/config.clj +++ b/storm-core/src/clj/backtype/storm/config.clj @@ -4,6 +4,7 @@ (:import [backtype.storm.utils Utils LocalState]) (:import [org.apache.commons.io FileUtils]) (:require [clojure [string :as str]]) + (:use [backtype.storm.utils localstate-serializer]) (:use [backtype.storm util]) ) @@ -168,7 +169,8 @@ (str stormroot "/" RESOURCES-SUBDIR)) (defn ^LocalState supervisor-state [conf] - (LocalState. (str (supervisor-local-dir conf) "/localstate"))) + (LocalState. (str (supervisor-local-dir conf) "/localstate") + (localstate-serializer))) (defn read-supervisor-storm-conf [conf storm-id] (let [stormroot (supervisor-stormdist-root conf storm-id) @@ -204,5 +206,5 @@ ;; if supervisor stops receiving heartbeat, it kills and restarts the process ;; in local mode, keep a global map of ids to threads for simulating process management (defn ^LocalState worker-state [conf id] - (LocalState. (worker-heartbeats-root conf id))) + (LocalState. (worker-heartbeats-root conf id) (localstate-serializer))) diff --git a/storm-core/src/clj/backtype/storm/daemon/common.clj b/storm-core/src/clj/backtype/storm/daemon/common.clj index 321e6495f..72f7bffba 100644 --- a/storm-core/src/clj/backtype/storm/daemon/common.clj +++ b/storm-core/src/clj/backtype/storm/daemon/common.clj @@ -315,7 +315,7 @@ (supervisor-storm-resources-path (supervisor-stormdist-root (:conf worker) (:storm-id worker))) (worker-pids-root (:conf worker) (:worker-id worker)) - (:port worker) + (int (:port worker)) (:task-ids worker) (:default-shared-resources worker) (:user-shared-resources worker) diff --git a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj index 9e0ee32ce..2fc6e7de0 100644 --- a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj +++ b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj @@ -3,6 +3,7 @@ (:import [backtype.storm Constants]) (:use [backtype.storm bootstrap]) (:use [backtype.storm.daemon common]) + (:use [backtype.storm.utils localstate-serializer]) (:require [backtype.storm.daemon [worker :as worker]]) (:gen-class :methods [^{:static true} [launch [backtype.storm.scheduler.ISupervisor] void]])) @@ -111,7 +112,7 @@ ))) (defn- wait-for-worker-launch [conf id start-time] - (let [state (worker-state conf id)] + (let [state (worker-state conf id)] (loop [] (let [hb (.get state Constants/LS_WORKER_HEARTBEAT)] (when (and @@ -483,7 +484,7 @@ (reify ISupervisor (prepare [this conf local-dir] (reset! conf-atom conf) - (let [state (LocalState. local-dir) + (let [state (LocalState. local-dir (localstate-serializer)) curr-id (if-let [id (.get state Constants/LS_ID)] id (generate-supervisor-id))] diff --git a/storm-core/src/clj/backtype/storm/daemon/task.clj b/storm-core/src/clj/backtype/storm/daemon/task.clj index a517e6f43..77937ddbc 100644 --- a/storm-core/src/clj/backtype/storm/daemon/task.clj +++ b/storm-core/src/clj/backtype/storm/daemon/task.clj @@ -24,7 +24,7 @@ (supervisor-stormdist-root conf (:storm-id worker))) (worker-pids-root conf (:worker-id worker)) (int %) - (:port worker) + (int (:port worker)) (:task-ids worker) (:default-shared-resources worker) (:user-shared-resources worker) diff --git a/storm-core/test/clj/backtype/storm/local_state_test.clj b/storm-core/test/clj/backtype/storm/local_state_test.clj index 71e73631d..a7c08ff37 100644 --- a/storm-core/test/clj/backtype/storm/local_state_test.clj +++ b/storm-core/test/clj/backtype/storm/local_state_test.clj @@ -1,12 +1,13 @@ (ns backtype.storm.local-state-test (:use [clojure test]) (:use [backtype.storm testing]) + (:use [backtype.storm.utils localstate-serializer]) (:import [backtype.storm.utils LocalState])) (deftest test-local-state (with-local-tmp [dir1 dir2] - (let [ls1 (LocalState. dir1) - ls2 (LocalState. dir2)] + (let [ls1 (LocalState. dir1 (localstate-serializer)) + ls2 (LocalState. dir2 (localstate-serializer))] (is (= {} (.snapshot ls1))) (.put ls1 "a" 1) (.put ls1 "b" 2) @@ -15,7 +16,8 @@ (is (= 1 (.get ls1 "a"))) (is (= nil (.get ls1 "c"))) (is (= 2 (.get ls1 "b"))) - (is (= {"a" 1 "b" 2} (.snapshot (LocalState. dir1)))) + (is (= {"a" 1 "b" 2} (.snapshot (LocalState. + dir1 (localstate-serializer))))) (.put ls2 "b" 1) (.put ls2 "b" 2) (.put ls2 "b" 3) From a70c170d31ed67e87a3562b304401a0d67132525 Mon Sep 17 00:00:00 2001 From: Alex Clemmer Date: Fri, 8 Mar 2013 21:10:16 -0700 Subject: [PATCH 11/13] Require LocalState to take a serializer in constructor --- storm-core/src/jvm/backtype/storm/utils/LocalState.java | 5 ----- 1 file changed, 5 deletions(-) diff --git a/storm-core/src/jvm/backtype/storm/utils/LocalState.java b/storm-core/src/jvm/backtype/storm/utils/LocalState.java index 905ea98a9..439625408 100644 --- a/storm-core/src/jvm/backtype/storm/utils/LocalState.java +++ b/storm-core/src/jvm/backtype/storm/utils/LocalState.java @@ -15,11 +15,6 @@ public class LocalState { private VersionedStore _vs; private StateSerializer _stSer; - - public LocalState(String backingDir) throws IOException { - _vs = new VersionedStore(backingDir); - _stSer = new LocalStateSerializer(); - } public LocalState (String backingDir, StateSerializer stSer) throws IOException { _vs = new VersionedStore(backingDir); From 6366e7569dc23f8e2dd86621f9a120b7a48a58dc Mon Sep 17 00:00:00 2001 From: Alex Clemmer Date: Fri, 8 Mar 2013 21:10:49 -0700 Subject: [PATCH 12/13] Remove Java implementation of LocalStateSerializer --- .../storm/utils/LocalStateSerializer.java | 100 ------------------ 1 file changed, 100 deletions(-) delete mode 100644 src/jvm/backtype/storm/utils/LocalStateSerializer.java diff --git a/src/jvm/backtype/storm/utils/LocalStateSerializer.java b/src/jvm/backtype/storm/utils/LocalStateSerializer.java deleted file mode 100644 index cb3f9c052..000000000 --- a/src/jvm/backtype/storm/utils/LocalStateSerializer.java +++ /dev/null @@ -1,100 +0,0 @@ -package backtype.storm.utils; - -import backtype.storm.Constants; -import clojure.lang.PersistentArrayMap; -import java.util.Iterator; -import java.util.HashMap; -import java.util.Map; -import org.json.simple.JSONObject; -import org.json.simple.JSONValue; - - -public class LocalStateSerializer implements StateSerializer { - public byte[] serializeState (Map val) { - /* - for (Map.Entry entry : val.entrySet()) { - Object v = entry.getValue(); - - if (v instanceof String) { - JSONObject ser = new JSONObject(); - } - else if(v instanceof clojure.lang.PersistentArrayMap) { - if (((Map) v).size() > 0) { - System.err.println(((Map) v).size()); - System.err.println("cow"); - System.err.println(JSONValue.toJSONString(val)); - System.err.println(JSONValue.parse(JSONValue.toJSONString(val))); - throw new RuntimeException(((Map) v).size() + " " + entry.getValue().getClass().toString()); - } - } - else - throw new RuntimeException(v.getClass().toString()); - } - */ - // LS_ID : String - // LS_WORKER_HEARTBEAT : backtype.storm.daemon.common.WorkerHeartbeat - // LS_LOCAL_ASSIGNMENTS : clojure.lang.PersistentArrayMap - // LS_APPROVED_WORKERS : clojure.lang.PersistentArrayMap - HashMap toSerialize = new HashMap(); - for (Map.Entry entry : val.entrySet()) { - String k = (String) entry.getKey(); - Object v = entry.getValue(); - - if (k.equals(Constants.LS_WORKER_HEARTBEAT)) - // ser worker heartbeat - ; - else if (k.equals(Constants.LS_APPROVED_WORKERS)) - toSerialize.put(k, serLsApprovedWorkers((PersistentArrayMap) v)); - else if (k.equals(Constants.LS_ID)) - toSerialize.put(k, v); - else if (k.equals(Constants.LS_LOCAL_ASSIGNMENTS)) - toSerialize.put(k, serLsLocalAssignments((PersistentArrayMap) v)); - else - throw new RuntimeException("LocalState could not be " + - "serialized; procedure for key " + - k + " has not been implemented"); - } - //throw new RuntimeException("not implemented"); - return Utils.serialize(val); - } - - public String serLsLocalAssignments (PersistentArrayMap assg) { - return JSONValue.toJSONString(assg); - } - - public String serLsApprovedWorkers (PersistentArrayMap workers) { - /* - Iterator iter = workers.iterator(); - while (iter.hasNext()) { - clojure.lang.IMapEntry kv = iter.next(); - System.err.println(kv); - System.err.println(kv.key()); - System.err.println(kv.val()); - } - */ - return JSONValue.toJSONString(workers); - } - - public Map deserializeState (byte[] ser) { - /* - String s = new String(serialized); - //System.err.println("DESER " + JSONValue.parse(s)); - Object parsed = JSONValue.parse(s); - if (parsed != null) - return parsed; - else - return new HashMap(); - */ - return (Map) Utils.deserialize(ser); - } - /* - private static byte[] serializeLocalState(Map val) { - //System.err.println(JSONValue.toJSONString(val)); - //System.err.println("SER " + JSONValue.toJSONString(val)); - //return JSONValue.toJSONString(val).getBytes(); - } - - private static Object deserializeLocalState(byte[] serialized) { - } - */ -} \ No newline at end of file From b99a90bee654d5862736dc1d14999f36f8141783 Mon Sep 17 00:00:00 2001 From: "Philip (flip) Kromer" Date: Sat, 13 Jul 2013 15:29:03 -0500 Subject: [PATCH 13/13] Fixup for rebase of hausdorff/ser_dev onto nathanmarz/master --- storm-core/src/clj/backtype/storm/daemon/worker.clj | 2 +- .../src}/clj/backtype/storm/utils/localstate_serializer.clj | 0 .../src}/jvm/backtype/storm/utils/StateSerializer.java | 0 3 files changed, 1 insertion(+), 1 deletion(-) rename {src => storm-core/src}/clj/backtype/storm/utils/localstate_serializer.clj (100%) rename {src => storm-core/src}/jvm/backtype/storm/utils/StateSerializer.java (100%) diff --git a/storm-core/src/clj/backtype/storm/daemon/worker.clj b/storm-core/src/clj/backtype/storm/daemon/worker.clj index 9e469feb1..2252229a6 100644 --- a/storm-core/src/clj/backtype/storm/daemon/worker.clj +++ b/storm-core/src/clj/backtype/storm/daemon/worker.clj @@ -50,7 +50,7 @@ (log-debug "Doing heartbeat " (pr-str hb)) ;; do the local-file-system heartbeat. (.put state - Constants/LS-WORKER-HEARTBEAT + Constants/LS_WORKER_HEARTBEAT hb false ) diff --git a/src/clj/backtype/storm/utils/localstate_serializer.clj b/storm-core/src/clj/backtype/storm/utils/localstate_serializer.clj similarity index 100% rename from src/clj/backtype/storm/utils/localstate_serializer.clj rename to storm-core/src/clj/backtype/storm/utils/localstate_serializer.clj diff --git a/src/jvm/backtype/storm/utils/StateSerializer.java b/storm-core/src/jvm/backtype/storm/utils/StateSerializer.java similarity index 100% rename from src/jvm/backtype/storm/utils/StateSerializer.java rename to storm-core/src/jvm/backtype/storm/utils/StateSerializer.java