Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transition away from Java serialization for storing state on disk or in Zookeeper #625

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 13 additions & 13 deletions storm-core/src/clj/backtype/storm/cluster.clj
Original file line number Diff line number Diff line change
Expand Up @@ -173,9 +173,9 @@
(cb id))
))

(defn- maybe-deserialize [ser]
(defn- maybe-deserialize-clj-bytes [ser]
(when ser
(Utils/deserialize ser)))
(deserialize-clj-bytes ser)))

(defstruct TaskError :error :time-secs)

Expand Down Expand Up @@ -230,7 +230,7 @@
(assignment-info [this storm-id callback]
(when callback
(swap! assignment-info-callback assoc storm-id callback))
(maybe-deserialize (get-data cluster-state (assignment-path storm-id) (not-nil? callback)))
(maybe-deserialize-clj-bytes (get-data cluster-state (assignment-path storm-id) (not-nil? callback)))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we renaming these functions? If we stick the original func name, we will have less code to worry about. No?

)

(active-storms [this]
Expand All @@ -248,7 +248,7 @@
(get-worker-heartbeat [this storm-id node port]
(-> cluster-state
(get-data (workerbeat-path storm-id node port) false)
maybe-deserialize))
maybe-deserialize-clj-bytes))

(executor-beats [this storm-id executor->node+port]
;; need to take executor->node+port in explicitly so that we don't run into a situation where a
Expand All @@ -269,11 +269,11 @@
)

(supervisor-info [this supervisor-id]
(maybe-deserialize (get-data cluster-state (supervisor-path supervisor-id) false))
(maybe-deserialize-clj-bytes (get-data cluster-state (supervisor-path supervisor-id) false))
)

(worker-heartbeat! [this storm-id node port info]
(set-data cluster-state (workerbeat-path storm-id node port) (Utils/serialize info)))
(set-data cluster-state (workerbeat-path storm-id node port) (serialize-clj-bytes info)))

(remove-worker-heartbeat! [this storm-id node port]
(delete-node cluster-state (workerbeat-path storm-id node port))
Expand All @@ -297,11 +297,11 @@
)))

(supervisor-heartbeat! [this supervisor-id info]
(set-ephemeral-node cluster-state (supervisor-path supervisor-id) (Utils/serialize info))
(set-ephemeral-node cluster-state (supervisor-path supervisor-id) (serialize-clj-bytes info))
)

(activate-storm! [this storm-id storm-base]
(set-data cluster-state (storm-path storm-id) (Utils/serialize storm-base))
(set-data cluster-state (storm-path storm-id) (serialize-clj-bytes storm-base))
)

(update-storm! [this storm-id new-elems]
Expand All @@ -311,20 +311,20 @@
(set-data cluster-state (storm-path storm-id)
(-> base
(merge new-elems)
Utils/serialize))))
serialize-clj-bytes))))

(storm-base [this storm-id callback]
(when callback
(swap! storm-base-callback assoc storm-id callback))
(maybe-deserialize (get-data cluster-state (storm-path storm-id) (not-nil? callback)))
(maybe-deserialize-clj-bytes (get-data cluster-state (storm-path storm-id) (not-nil? callback)))
)

(remove-storm-base! [this storm-id]
(delete-node cluster-state (storm-path storm-id))
)

(set-assignment! [this storm-id info]
(set-data cluster-state (assignment-path storm-id) (Utils/serialize info))
(set-data cluster-state (assignment-path storm-id) (serialize-clj-bytes info))
)

(remove-storm! [this storm-id]
Expand All @@ -335,7 +335,7 @@
(let [path (error-path storm-id component-id)
data {:time-secs (current-time-secs) :error (stringify-error error)}
_ (mkdirs cluster-state path)
_ (create-sequential cluster-state (str path "/e") (Utils/serialize data))
_ (create-sequential cluster-state (str path "/e") (serialize-clj-bytes data))
to-kill (->> (get-children cluster-state path false)
(sort-by parse-error-path)
reverse
Expand All @@ -349,7 +349,7 @@
children (get-children cluster-state path false)
errors (dofor [c children]
(let [data (-> (get-data cluster-state (str path "/" c) false)
maybe-deserialize)]
maybe-deserialize-clj-bytes)]
(when data
(struct TaskError (:error data) (:time-secs data))
)))
Expand Down
11 changes: 7 additions & 4 deletions storm-core/src/clj/backtype/storm/config.clj
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
(:import [backtype.storm.utils Utils LocalState])
(:import [org.apache.commons.io FileUtils])
(:require [clojure [string :as str]])
(:use [backtype.storm.utils localstate-serializer])
(:use [backtype.storm util])
)

Expand Down Expand Up @@ -168,19 +169,20 @@
(str stormroot "/" RESOURCES-SUBDIR))

(defn ^LocalState supervisor-state [conf]
(LocalState. (str (supervisor-local-dir conf) "/localstate")))
(LocalState. (str (supervisor-local-dir conf) "/localstate")
(localstate-serializer)))

(defn read-supervisor-storm-conf [conf storm-id]
(let [stormroot (supervisor-stormdist-root conf storm-id)
conf-path (supervisor-stormconf-path stormroot)
topology-path (supervisor-stormcode-path stormroot)]
(merge conf (Utils/deserialize (FileUtils/readFileToByteArray (File. conf-path))))
(merge conf (deserialize-clj-bytes (FileUtils/readFileToByteArray (File. conf-path))))
))

(defn read-supervisor-topology [conf storm-id]
(let [stormroot (supervisor-stormdist-root conf storm-id)
topology-path (supervisor-stormcode-path stormroot)]
(Utils/deserialize (FileUtils/readFileToByteArray (File. topology-path)))
(Utils/deserializeTopology (FileUtils/readFileToByteArray (File. topology-path)))
))

(defn worker-root
Expand All @@ -204,4 +206,5 @@
;; if supervisor stops receiving heartbeat, it kills and restarts the process
;; in local mode, keep a global map of ids to threads for simulating process management
(defn ^LocalState worker-state [conf id]
(LocalState. (worker-heartbeats-root conf id)))
(LocalState. (worker-heartbeats-root conf id) (localstate-serializer)))

8 changes: 1 addition & 7 deletions storm-core/src/clj/backtype/storm/daemon/common.clj
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,6 @@
(defprotocol DaemonCommon
(waiting? [this]))

(def LS-WORKER-HEARTBEAT "worker-heartbeat")

;; LocalState constants
(def LS-ID "supervisor-id")
(def LS-LOCAL-ASSIGNMENTS "local-assignments")
(def LS-APPROVED-WORKERS "approved-workers")

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we want to move these constants to Constants.java? We have many other CLJ constants, and Constants only has a small subset. Personally, I would rather we have a separated PR on that. This PR should just focus on getting serialization issues addressed.



Expand Down Expand Up @@ -321,7 +315,7 @@
(supervisor-storm-resources-path
(supervisor-stormdist-root (:conf worker) (:storm-id worker)))
(worker-pids-root (:conf worker) (:worker-id worker))
(:port worker)
(int (:port worker))
(:task-ids worker)
(:default-shared-resources worker)
(:user-shared-resources worker)
Expand Down
8 changes: 4 additions & 4 deletions storm-core/src/clj/backtype/storm/daemon/nimbus.clj
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@
(defn- read-storm-conf [conf storm-id]
(let [stormroot (master-stormdist-root conf storm-id)]
(merge conf
(Utils/deserialize
(deserialize-clj-bytes
(FileUtils/readFileToByteArray
(File. (master-stormconf-path stormroot))
)))))
Expand Down Expand Up @@ -295,13 +295,13 @@
(FileUtils/forceMkdir (File. stormroot))
(FileUtils/cleanDirectory (File. stormroot))
(setup-jar conf tmp-jar-location stormroot)
(FileUtils/writeByteArrayToFile (File. (master-stormcode-path stormroot)) (Utils/serialize topology))
(FileUtils/writeByteArrayToFile (File. (master-stormconf-path stormroot)) (Utils/serialize storm-conf))
(FileUtils/writeByteArrayToFile (File. (master-stormcode-path stormroot)) (Utils/serializeTopology topology))
(FileUtils/writeByteArrayToFile (File. (master-stormconf-path stormroot)) (serialize-clj-bytes storm-conf))
))

(defn- read-storm-topology [conf storm-id]
(let [stormroot (master-stormdist-root conf storm-id)]
(Utils/deserialize
(Utils/deserializeTopology
(FileUtils/readFileToByteArray
(File. (master-stormcode-path stormroot))
))))
Expand Down
29 changes: 16 additions & 13 deletions storm-core/src/clj/backtype/storm/daemon/supervisor.clj
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
(ns backtype.storm.daemon.supervisor
(:import [backtype.storm.scheduler ISupervisor])
(:import [backtype.storm Constants])
(:use [backtype.storm bootstrap])
(:use [backtype.storm.daemon common])
(:use [backtype.storm.utils localstate-serializer])
(:require [backtype.storm.daemon [worker :as worker]])
(:gen-class
:methods [^{:static true} [launch [backtype.storm.scheduler.ISupervisor] void]]))
Expand Down Expand Up @@ -59,7 +61,7 @@

(defn read-worker-heartbeat [conf id]
(let [local-state (worker-state conf id)]
(.get local-state LS-WORKER-HEARTBEAT)
(.get local-state Constants/LS_WORKER_HEARTBEAT)
))


Expand Down Expand Up @@ -89,7 +91,7 @@
(let [conf (:conf supervisor)
^LocalState local-state (:local-state supervisor)
id->heartbeat (read-worker-heartbeats conf)
approved-ids (set (keys (.get local-state LS-APPROVED-WORKERS)))]
approved-ids (set (keys (.get local-state Constants/LS_APPROVED_WORKERS)))]
(into
{}
(dofor [[id hb] id->heartbeat]
Expand All @@ -110,9 +112,9 @@
)))

(defn- wait-for-worker-launch [conf id start-time]
(let [state (worker-state conf id)]
(let [state (worker-state conf id)]
(loop []
(let [hb (.get state LS-WORKER-HEARTBEAT)]
(let [hb (.get state Constants/LS_WORKER_HEARTBEAT)]
(when (and
(not hb)
(<
Expand All @@ -123,7 +125,7 @@
(Time/sleep 500)
(recur)
)))
(when-not (.get state LS-WORKER-HEARTBEAT)
(when-not (.get state Constants/LS_WORKER_HEARTBEAT)
(log-message "Worker " id " failed to start")
)))

Expand Down Expand Up @@ -184,7 +186,7 @@
(defn sync-processes [supervisor]
(let [conf (:conf supervisor)
^LocalState local-state (:local-state supervisor)
assigned-executors (defaulted (.get local-state LS-LOCAL-ASSIGNMENTS) {})
assigned-executors (defaulted (.get local-state Constants/LS_LOCAL_ASSIGNMENTS) {})
now (current-time-secs)
allocated (read-allocated-workers supervisor assigned-executors now)
keepers (filter-val
Expand Down Expand Up @@ -221,11 +223,12 @@
))
(doseq [id (vals new-worker-ids)]
(local-mkdirs (worker-pids-root conf id)))
(.put local-state LS-APPROVED-WORKERS
(.put local-state Constants/LS_APPROVED_WORKERS
(merge
(select-keys (.get local-state LS-APPROVED-WORKERS)
(select-keys (.get local-state Constants/LS_APPROVED_WORKERS)
(keys keepers))
(zipmap (vals new-worker-ids) (keys new-worker-ids))
;)))
))
(wait-for-workers-launch
conf
Expand Down Expand Up @@ -269,7 +272,7 @@
new-assignment (->> all-assignment
(filter-key #(.confirmAssigned isupervisor %)))
assigned-storm-ids (assigned-storm-ids-from-port-assignments new-assignment)
existing-assignment (.get local-state LS-LOCAL-ASSIGNMENTS)]
existing-assignment (.get local-state Constants/LS_LOCAL_ASSIGNMENTS)]
(log-debug "Synchronizing supervisor")
(log-debug "Storm code map: " storm-code-map)
(log-debug "Downloaded storm ids: " downloaded-storm-ids)
Expand Down Expand Up @@ -301,7 +304,7 @@
(.killedWorker isupervisor (int p)))
(.assigned isupervisor (keys new-assignment))
(.put local-state
LS-LOCAL-ASSIGNMENTS
Constants/LS_LOCAL_ASSIGNMENTS
new-assignment)
(reset! (:curr-assignment supervisor) new-assignment)
;; remove any downloaded code that's no longer assigned or active
Expand Down Expand Up @@ -481,11 +484,11 @@
(reify ISupervisor
(prepare [this conf local-dir]
(reset! conf-atom conf)
(let [state (LocalState. local-dir)
curr-id (if-let [id (.get state LS-ID)]
(let [state (LocalState. local-dir (localstate-serializer))
curr-id (if-let [id (.get state Constants/LS_ID)]
id
(generate-supervisor-id))]
(.put state LS-ID curr-id)
(.put state Constants/LS_ID curr-id)
(reset! id-atom curr-id))
)
(confirmAssigned [this port]
Expand Down
2 changes: 1 addition & 1 deletion storm-core/src/clj/backtype/storm/daemon/task.clj
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
(supervisor-stormdist-root conf (:storm-id worker)))
(worker-pids-root conf (:worker-id worker))
(int %)
(:port worker)
(int (:port worker))
(:task-ids worker)
(:default-shared-resources worker)
(:user-shared-resources worker)
Expand Down
4 changes: 2 additions & 2 deletions storm-core/src/clj/backtype/storm/daemon/worker.clj
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
(:import [java.util.concurrent Executors])
(:import [backtype.storm.messaging TransportFactory])
(:import [backtype.storm.messaging IContext IConnection])
(:import [backtype.storm Constants])
(:gen-class))

(bootstrap)
Expand Down Expand Up @@ -49,13 +50,12 @@
(log-debug "Doing heartbeat " (pr-str hb))
;; do the local-file-system heartbeat.
(.put state
LS-WORKER-HEARTBEAT
Constants/LS_WORKER_HEARTBEAT
hb
false
)
(.cleanup state 60) ; this is just in case supervisor is down so that disk doesn't fill up.
; it shouldn't take supervisor 120 seconds between listing dir and reading it

))

(defn worker-outbound-tasks
Expand Down
5 changes: 3 additions & 2 deletions storm-core/src/clj/backtype/storm/testing.clj
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
(:import [java.util HashMap ArrayList])
(:import [java.util.concurrent.atomic AtomicInteger])
(:import [java.util.concurrent ConcurrentHashMap])
(:import [backtype.storm Constants])
(:import [backtype.storm.utils Time Utils RegisteredGlobalState])
(:import [backtype.storm.tuple Fields Tuple TupleImpl])
(:import [backtype.storm.task TopologyContext])
Expand Down Expand Up @@ -258,13 +259,13 @@

(defn find-worker-id [supervisor-conf port]
(let [supervisor-state (supervisor-state supervisor-conf)
worker->port (.get supervisor-state common/LS-APPROVED-WORKERS)]
worker->port (.get supervisor-state Constants/LS_APPROVED_WORKERS)]
(first ((reverse-map worker->port) port))
))

(defn find-worker-port [supervisor-conf worker-id]
(let [supervisor-state (supervisor-state supervisor-conf)
worker->port (.get supervisor-state common/LS-APPROVED-WORKERS)
worker->port (.get supervisor-state Constants/LS_APPROVED_WORKERS)
]
(worker->port worker-id)
))
Expand Down
11 changes: 11 additions & 0 deletions storm-core/src/clj/backtype/storm/util.clj
Original file line number Diff line number Diff line change
Expand Up @@ -844,3 +844,14 @@
(meta form))
(list form x)))
([x form & more] `(-<> (-<> ~x ~form) ~@more)))

(def ^:const def-ser-enc "UTF-8")

(defn serialize-clj-bytes [form]
"serializes Clojure form to UTF-8 byte array"
(.getBytes (pr-str form) def-ser-enc))

(defn deserialize-clj-bytes [form]
"deserializes Clojure form fom UTF-8 byte array"
(read-string (String. form def-ser-enc)))

34 changes: 34 additions & 0 deletions storm-core/src/clj/backtype/storm/utils/localstate_serializer.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
;; Use to serialize LocalState. We assume that LocalState is a k/v store that
;; uses java.util.HashMap to store the following keys
;; LS_ID : String
;; LS_WORKER_HEARTBEAT : backtype.storm.daemon.common.WorkerHeartbeat
;; LS_LOCAL_ASSIGNMENTS : clojure.lang.PersistentArrayMap
;; LS_APPROVED_WORKERS : clojure.lang.PersistentArrayMap

(ns backtype.storm.utils.localstate-serializer
(:import [backtype.storm.utils Utils])
(:import [backtype.storm Constants])
(:use [backtype.storm util])
)

; java.util.HashMap -> byte[]
(defn serialize-localstate [form]
(serialize-clj-bytes (into {} form)))

; byte[] -> java.util.HashMap
(defn deserialize-localstate [form]
(let [newm (java.util.HashMap.)]
(.putAll newm (deserialize-clj-bytes form))
newm))

(defn localstate-serializer []
(reify
backtype.storm.utils.StateSerializer
(serializeState [this val] (serialize-localstate val))
(deserializeState [this ser] (deserialize-localstate ser))))

(defn localstate-default-serializer []
(reify
backtype.storm.utils.StateSerializer
(serializeState [this val] (Utils/serialize val))
(deserializeState [this ser] (Utils/deserialize ser))))
Loading