diff --git a/src/taoensso/sente.cljc b/src/taoensso/sente.cljc index 5920c19..9333baf 100644 --- a/src/taoensso/sente.cljc +++ b/src/taoensso/sente.cljc @@ -36,6 +36,7 @@ * Server-side events: [:chsk/ws-ping] ; ws-ping from client + [:chsk/ws-pong] ; ws-pong from client [:chsk/uidport-open ] [:chsk/uidport-close ] [:chsk/bad-package ] @@ -110,14 +111,15 @@ (defn- strim [^long max-len s] (if (> (count s) max-len) - (str (enc/get-substr-by-len s 0 max-len) "+") + (str (enc/get-substr-by-len s 0 max-len) #_"+") (do s))) -(defn lid "Log id" - ([uid ] (if (= uid :sente/nil-uid) "u_nil" (str "u_" (strim 6 (str uid))))) - ([uid cid] (str (lid uid) "/c_" (strim 6 (str cid))))) +(defn- lid "Log id" + ([uid ] (if (= uid :sente/nil-uid) "u_nil" (str "u_" (strim 6 (str uid))))) + ([uid client-id ] (str (lid uid) "/c_" (strim 6 (str client-id)))) + ([uid client-id conn-id] (str (lid uid client-id) "/n_" (strim 6 conn-id)))) -(comment (lid (enc/uuid-str) (enc/uuid-str))) +(comment (lid (enc/uuid-str) (enc/uuid-str) (enc/uuid-str))) ;;;; Events ;; Clients & server both send `event`s and receive (i.e. route) `event-msg`s: @@ -374,9 +376,7 @@ :send-fn ; (fn [user-id ev] for server>user push. :ajax-post-fn ; Ring handler for CSRF-POST + chsk URL. :ajax-get-or-ws-handshake-fn ; Ring handler for Ring GET + chsk URL. - - :connected-uids ; Watchable, read-only (atom {:ws #{_} :ajax #{_} :any #{_}}). - :send-buffers ; Implementation detail, read-only (atom {:ws #{_} :ajax #{_} :any #{_}}). + :connected-uids ; Watchable, read-only (atom {:ws #{_} :ajax #{_} :any #{_}}). Security options: @@ -400,14 +400,17 @@ Other common options: - :user-id-fn ; (fn [ring-req]) -> unique user-id for server>user push. - :handshake-data-fn ; (fn [ring-req]) -> arb user data to append to handshake evs. - :ws-kalive-ms ; Ping to keep a WebSocket conn alive if no activity - ; w/in given msecs. Should be different to client's :ws-kalive-ms. - :lp-timeout-ms ; Timeout (repoll) long-polling Ajax conns after given msecs. - :send-buf-ms-ajax ; [2] - :send-buf-ms-ws ; [2] - :packer ; :edn (default), or an IPacker implementation. + :user-id-fn ; (fn [ring-req]) -> unique user-id for server>user push. + :handshake-data-fn ; (fn [ring-req]) -> arb user data to append to handshake evs. + :ws-kalive-ms ; Ping to keep a WebSocket conn alive if no activity + ; w/in given msecs. Should be different to client's :ws-kalive-ms. + :lp-timeout-ms ; Timeout (repoll) long-polling Ajax conns after given msecs. + :send-buf-ms-ajax ; [2] + :send-buf-ms-ws ; [2] + :packer ; :edn (default), or an IPacker implementation. + + :ws-ping-timeout-ms ; When pinging to test WebSocket connections, msecs to + ; await reply before regarding the connection as broken ;; When a connection is closed, Sente waits a little for possible reconnection before ;; actually marking the connection as closed. This facilitates Ajax long-polling, @@ -426,7 +429,7 @@ after send call (larger values => larger batch windows)." [web-server-ch-adapter - & [{:keys [recv-buf-or-n ws-kalive-ms lp-timeout-ms + & [{:keys [recv-buf-or-n ws-kalive-ms lp-timeout-ms ws-ping-timeout-ms send-buf-ms-ajax send-buf-ms-ws user-id-fn bad-csrf-fn bad-origin-fn csrf-token-fn handshake-data-fn packer allowed-origins @@ -435,11 +438,26 @@ ms-allow-reconnect-before-close-ws ms-allow-reconnect-before-close-ajax] - :or {recv-buf-or-n (async/sliding-buffer 1000) - ws-kalive-ms (enc/ms :secs 25) ; < Heroku 55s timeout - lp-timeout-ms (enc/ms :secs 20) ; < Heroku 30s timeout - send-buf-ms-ajax 100 - send-buf-ms-ws 30 + :or {recv-buf-or-n (async/sliding-buffer 1000) + ws-kalive-ms (enc/ms :secs 25) ; < Heroku 55s timeout + lp-timeout-ms (enc/ms :secs 20) ; < Heroku 30s timeout + + ;; TODO Default initially disabled since it can take some time + ;; for clients to update in the wild. We want to ensure that all + ;; clients DO respond to pings before enabling the server to close + ;; unresponsive connections. + ;; + ;; So we're rolling this new feature out in 2 steps: + ;; 1. Update clients to respond to pings (with pongs) + ;; 2. Update servers to regard lack of pong as broken conn + ;; + ;; The feature can be enabled early by manually providing a + ;; `ws-ping-timeout-ms` val in opts. + ;; + ws-ping-timeout-ms nil #_(enc/ms :secs 5) ; TODO Enable default val + + send-buf-ms-ajax 100 + send-buf-ms-ws 30 user-id-fn (fn [ ring-req] (get-in ring-req [:session :uid])) bad-csrf-fn (fn [_ring-req] {:status 403 :body "Bad CSRF token"}) bad-origin-fn (fn [_ring-req] {:status 403 :body "Unauthorized origin"}) @@ -481,38 +499,11 @@ ;; of security implications. (or (user-id-fn (assoc ring-req :client-id client-id)) :sente/nil-uid)) - ;; :ws udts used for ws-kalive (to check for activity in window period) - ;; :ajax udts used for lp-timeout (as a way to check active conn identity) - conns_ (atom {:ws {} :ajax {}}) ; { { [ ]}} + conns_ (atom {:ws {} :ajax {}}) ; { { [ ]}} send-buffers_ (atom {:ws {} :ajax {}}) ; { [ <#{ev-uuids}>]} connected-uids_ (atom {:ws #{} :ajax #{} :any #{}}) ; Public - upd-conn! ; Update client entry in conns_ - (fn - ([conn-type uid client-id] ; Update only udt - (swap-in! conns_ [conn-type uid client-id] - (fn [?v] - (let [[?sch _?udt] ?v - new-udt (enc/now-udt)] - (enc/swapped - [?sch new-udt] - {:init? (nil? ?v) :udt new-udt :?sch ?sch}))))) - - ([conn-type uid client-id old-?sch new-?sch] ; Update sch & udt - (swap-in! conns_ [conn-type uid client-id] - (fn [?v] - (let [[?sch _?udt] ?v - new-udt (enc/now-udt) - new-?sch - (if (or (= old-?sch :any) (identical? old-?sch ?sch)) - new-?sch ; CAS successful, Ref. #417 - ?sch)] - - (enc/swapped - [new-?sch new-udt] - {:init? (nil? ?v) :udt new-udt :?sch new-?sch})))))) - - connect-uid! + connect-uid!? (fn [conn-type uid] {:pre [(have? uid)]} (let [newly-connected? (swap-in! connected-uids_ [] @@ -529,7 +520,7 @@ :newly-connected))))))] newly-connected?)) - upd-connected-uid! + maybe-disconnect-uid!? (fn [uid] {:pre [(have? uid)]} (let [newly-disconnected? (swap-in! connected-uids_ [] @@ -556,7 +547,7 @@ send-fn ; server>user (by uid) push (fn [user-id ev & [{:as opts :keys [flush?]}]] (let [uid (if (= user-id :sente/all-users-without-uid) :sente/nil-uid user-id) - _ (timbre/debugf "Server asked to send event to %s: %s" (lid uid) ev) + _ (timbre/tracef "Server asked to send event to %s: %s" (lid uid) ev) _ (assert uid (str "Support for sending to `nil` user-ids has been REMOVED. " "Please send to `:sente/all-users-without-uid` instead.")) @@ -688,10 +679,12 @@ :connected-uids connected-uids_ :send-buffers send-buffers_}] - {:ch-recv ch-recv - :send-fn send-fn - :connected-uids connected-uids_ - :send-buffers send-buffers_ + {:ch-recv ch-recv + :send-fn send-fn + :connected-uids_ connected-uids_ + :connected-uids connected-uids_ ; For back compatibility + :private {:conns_ conns_ + :send-buffers_ send-buffers_} ;; Does not participate in `conns_` (has specific req->resp) :ajax-post-fn @@ -746,265 +739,312 @@ (fn ring-handler ([ring-req] (ring-handler ring-req nil nil)) ([ring-req ?ring-async-resp-fn ?ring-async-raise-fn] - (let [;; sch-uuid (enc/uuid-str 6) + (let [;; ?ws-key (get-in ring-req [:headers "sec-websocket-key"]) + conn-id (enc/uuid-str 6) ; 1 per ws/ajax rreq, equiv to server-ch identity params (get ring-req :params) client-id (get params :client-id) uid (user-id-fn ring-req client-id) - ;; ?ws-key (get-in ring-req [:headers "sec-websocket-key"]) - - receive-event-msg! ; Partial - (fn self - ([event ] (self event nil)) - ([event ?reply-fn] - (put-server-event-msg>ch-recv! ch-recv - (merge ev-msg-const - {:client-id client-id - :ring-req ring-req - :event event - :?reply-fn ?reply-fn - :uid uid})))) - - send-handshake! - (fn [server-ch websocket?] - - (timbre/infof "Server will send %s handshake to %s" - (if websocket? :ws :ajax) - (lid uid client-id)) - - (let [?handshake-data (handshake-data-fn ring-req) - handshake-ev - (if (nil? ?handshake-data) ; Micro optimization - [:chsk/handshake [uid nil]] - [:chsk/handshake [uid nil ?handshake-data]])] - (interfaces/sch-send! server-ch websocket? - (pack packer handshake-ev))))] + lid* (lid uid client-id conn-id)] (enc/cond - (str/blank? client-id) (let [err-msg "Client's Ring request doesn't have a client id. Does your server have the necessary keyword Ring middleware (`wrap-params` & `wrap-keyword-params`)?"] (timbre/errorf (str err-msg ": %s") ring-req) ; Careful re: % in req (throw (ex-info err-msg {:ring-req ring-req}))) :if-let [resp (possible-rejection-resp ring-req)] resp - :else - (interfaces/ring-req->server-ch-resp web-server-ch-adapter ring-req - {:ring-async-resp-fn ?ring-async-resp-fn - :ring-async-raise-fn ?ring-async-raise-fn - - :on-open - (fn [server-ch websocket?] - (if websocket? - - ;; WebSocket handshake - (let [updated-conn (upd-conn! :ws uid client-id :any server-ch) - udt-open (:udt updated-conn) - send-handshake? true] - - (timbre/infof "[ws/on-open] New server WebSocket sch for %s: %s" - (lid uid client-id) - {:send-handshake? send-handshake?}) - - (when (connect-uid! :ws uid) - (receive-event-msg! [:chsk/uidport-open uid])) - - (when send-handshake? - (send-handshake! server-ch websocket?)) - - ;; Start server-side ws-kalive loop - ;; Also helps server detect broken conns earlier - (when-let [ms ws-kalive-ms] - (go-loop [udt-t0 udt-open] - (ch-recv! ch-recv + (merge ev-msg-const + {:client-id client-id + :ring-req ring-req + :event event + :?reply-fn ?reply-fn + :uid uid})))) + + send-handshake!? + (fn [server-ch websocket?] + (timbre/infof "Server will send %s handshake to %s" (if websocket? :ws :ajax) lid*) + (let [?handshake-data (handshake-data-fn ring-req) + handshake-ev + (if (nil? ?handshake-data) ; Micro optimization + [:chsk/handshake [uid nil]] + [:chsk/handshake [uid nil ?handshake-data]])] + ;; Returns true iff server-ch open during call + (interfaces/sch-send! server-ch websocket? + (pack packer handshake-ev)))) + + on-error + (fn [server-ch websocket? error] + (timbre/errorf "%s Server sch error for %s: %s" + (if websocket? "[ws/on-error]" "[ajax/on-error]") + lid* error)) + + on-msg + (fn [server-ch websocket? req-ppstr] + (assert websocket?) + (swap-in! conns_ [:ws uid client-id] + (fn [[?sch _udt conn-id]] + (when conn-id [?sch (enc/now-udt) conn-id]))) + + (let [[clj ?cb-uuid] (unpack packer req-ppstr)] + ;; clj should be ev + (cond + (= clj [:chsk/ws-pong]) (receive-event-msg! clj nil) + (= clj [:chsk/ws-ping]) + (do + ;; Auto reply to ping + (when-let [cb-uuid ?cb-uuid] + (timbre/debugf "[ws/on-msg] Server will auto-reply to ping from %s" lid*) + (interfaces/sch-send! server-ch websocket? + (pack packer "pong" cb-uuid))) + + (receive-event-msg! clj nil)) + + :else + (receive-event-msg! clj + (when ?cb-uuid + (fn reply-fn [resp-clj] ; Any clj form + (timbre/debugf "[ws/on-msg] Server will reply to message from %s: %s" lid* resp-clj) + + ;; true iff apparent success: + (interfaces/sch-send! server-ch websocket? + (pack packer resp-clj ?cb-uuid)))))))) + + on-close + (fn [server-ch websocket? _status] + ;; - We rely on `on-close` to trigger for *every* sch. + ;; - May be called *more* than once for a given sch. + ;; - `status` type varies with underlying web server. + (let [conn-type (if websocket? :ws :ajax) + log-prefix (if websocket? "[ws/on-close]" "[ajax/on-close]") + active-conn-closed? + (swap-in! conns_ [conn-type uid client-id] + (fn [[?sch _udt conn-id*]] + (if (= conn-id conn-id*) + (swapped [nil (enc/now-udt) conn-id] true) + (swapped :swap/abort false))))] + + ;; Inactive => a connection closed that's not currently in conns_ + + (timbre/debugf "%s %s server sch closed for %s" + log-prefix (if active-conn-closed? "Active" "Inactive") lid*) + + (when active-conn-closed? + ;; Allow some time for possible reconnects (repoll, + ;; sole window refresh, etc.) before regarding close + ;; as non-transient "disconnect" + (go + (let [ms-timeout + (if websocket? + ms-allow-reconnect-before-close-ws + ms-allow-reconnect-before-close-ajax)] + ( activity) in last kalive window + (interfaces/sch-close! server-ch) + {:recur? false}) + + :else + (if-let [;; If a conn has gone bad but is still marked as open, + ;; attempting to send a ping will usually trigger the + ;; conn's :on-close immediately, i.e. no need to wait + ;; for a missed pong. + ping-apparently-sent? + (interfaces/sch-send! server-ch websocket? + (pack packer :chsk/ws-ping))] + + (if ws-ping-timeout-ms + {:recur? true, :udt udt-t1, :ms-timeout ws-ping-timeout-ms, :expecting-pong? true} + {:recur? true, :udt udt-t1, :ms-timeout ws-kalive-ms, :expecting-pong? false}) + + {:recur? false, :force-close? true}))] + + (if recur? + (recur udt ms-timeout expecting-pong?) + (do + (timbre/debugf "[ws/on-open] Ending kalive loop for %s" lid*) + (when force-close? + ;; It's rare but possible for a conn's :on-close to fire + ;; *before* a handshake, leaving a closed sch in conns_ + (timbre/debugf "[ws/on-open] Force close connection for %s" lid*) + (on-close server-ch websocket? nil))))))) + + (when (connect-uid!? :ws uid) + (timbre/infof "[ws/on-open] uid port open for %s" lid*) + (receive-event-msg! [:chsk/uidport-open uid]))))) + + ;; Ajax handshake/poll + (let [send-handshake? + (or + (:handshake? params) + (nil? (get-in @conns_ [:ajax uid client-id])))] + + (timbre/logf (if send-handshake? :info :trace) + "[ajax/on-open] New server Ajax sch (poll/handshake) for %s: %s" + lid* {:send-handshake? send-handshake?}) + + (if send-handshake? + (do + (swap-in! conns_ [:ajax uid client-id] (fn [_] [nil (enc/now-udt) conn-id])) + (send-handshake!? server-ch websocket?) + ;; `server-ch` will close, and client will immediately repoll + ) + + (let [[_ udt-open] + (swap-in! conns_ [:ajax uid client-id] + (fn [_] [server-ch (enc/now-udt) conn-id]))] + + (when-let [ms lp-timeout-ms] + (go + (server-ch-resp web-server-ch-adapter ring-req + {:ring-async-resp-fn ?ring-async-resp-fn + :ring-async-raise-fn ?ring-async-raise-fn + :on-open on-open + :on-msg on-msg + :on-close on-close + :on-error on-error}))))))})) + +(def ^:dynamic *simulated-bad-conn-rate* + "Debugging tool. Proportion ∈ℝ[0,1] of connection activities to sabotage." + nil) - (receive-event-msg! clj nil)) +(defn- simulated-bad-conn? [] + (when-let [sbcr *simulated-bad-conn-rate*] + (enc/chance sbcr))) - (receive-event-msg! clj - (when ?cb-uuid - (fn reply-fn [resp-clj] ; Any clj form - (timbre/debugf "[ws/on-msg] Server will reply to message from %s: %s" - (lid uid client-id) - resp-clj) - - ;; true iff apparent success: - (interfaces/sch-send! server-ch websocket? - (pack packer resp-clj ?cb-uuid)))))))) - - :on-close ; We rely on `on-close` to trigger for _every_ conn! - (fn [server-ch websocket? _status] - ;; Note that `status` type varies with underlying web server - - (let [log-prefix (if websocket? "[ws/on-close]" "[ajax/on-close]") - _ - (timbre/debugf "%s Server sch closed for %s" - log-prefix - (lid uid client-id)) - - conn-type (if websocket? :ws :ajax) - updated-conn (upd-conn! conn-type uid client-id server-ch nil) - udt-close (:udt updated-conn)] - - ;; Allow some time for possible reconnects (repoll, - ;; sole window refresh, etc.): - (go - (let [ms-timeout - (if websocket? - ms-allow-reconnect-before-close-ws - ms-allow-reconnect-before-close-ajax)] - (clients! "Actually pushes buffered events (as packed-str) to all uid's conns. Allows some time for possible reconnects." [conn-type conns_ uid buffered-evs-pstr n-buffered-evs] (have? [:el #{:ajax :ws}] conn-type) - (let [websocket? (= conn-type :ws) - ms-backoffs [90 180 360 720 1440] ; Mean 2790s - ;; All connected/possibly-reconnecting client uuids: - client-ids-unsatisfied (keys (get-in @conns_ [conn-type uid]))] - - (when-not (empty? client-ids-unsatisfied) - (go-loop [n 0 client-ids-satisfied #{}] - (let [?pulled ; nil or { [ ]} - (swap-in! conns_ [conn-type uid] - (fn [m] ; { [ ]} - (let [ks-to-pull (remove client-ids-satisfied (keys m))] - ;; (timbre/tracef "ks-to-pull: %s" ks-to-pull) - (if (empty? ks-to-pull) - (swapped m nil) - (swapped - (reduce - (fn [m k] - (let [[?sch udt] (get m k) - new-entry - (if websocket? - [?sch (enc/now-udt)] - [nil udt])] - (assoc m k new-entry))) - m ks-to-pull) - (select-keys m ks-to-pull))))))] - - (have? [:or nil? map?] ?pulled) - - (let [?newly-satisfied - (when ?pulled - (reduce-kv - (fn [s client-id [?sch _udt]] - (let [sent? - (when-let [sch ?sch] - ;; Will noop + return false if sch already closed: - (interfaces/sch-send! sch websocket? - buffered-evs-pstr))] - - (if sent? (conj s client-id) s))) - #{} ?pulled)) - - now-satisfied (into client-ids-satisfied ?newly-satisfied)] - - ;; (timbre/tracef "now-satisfied: %s" now-satisfied) - - (if-let [ms-timeout - (when-let [ms-backoff (get ms-backoffs n)] - (when (enc/rsome (complement now-satisfied) client-ids-unsatisfied) - (+ ms-backoff (rand-int ms-backoff))))] - (do - ;; Allow some time for possible poller reconnects: - ( max 6 attempts + websocket? (= conn-type :ws) + udt-t0 (enc/now-udt)] + + (when-let [client-ids (keys (get-in @conns_ [conn-type uid]))] + (go-loop [pending (set client-ids), idx 0] + (let [pending + (reduce + (fn [pending client-id] + (if-let [sent? + (when-let [conn-id + (when-let [[?sch _udt conn-id] (get-in @conns_ [conn-type uid client-id])] + (when-let [sch ?sch] + (when-not (simulated-bad-conn?) + (when (interfaces/sch-send! sch websocket? buffered-evs-pstr) + conn-id))))] + + (swap-in! conns_ [conn-type uid client-id] + (fn [[?sch udt conn-id*]] + (if (= conn-id conn-id*) + (if websocket? + [?sch (enc/now-udt) conn-id] + [nil udt conn-id]) + :swap/abort))) + + true)] + + (disj pending client-id) + (do pending))) + pending + pending)] + + (if-let [done? (or (empty? pending) (> idx 4))] + (let [n-desired (count client-ids) + n-success (- n-desired (count pending))] + (timbre/debugf "Sent %s buffered evs to %s/%s %s clients for %s in %s attempt/s (%s msecs)" + n-buffered-evs n-success n-desired conn-type (lid uid) (inc idx) (- (enc/now-udt) udt-t0))) + + (let [ms-timeout + (let [ms-backoff (nth ms-backoffs idx)] + (+ ms-backoff (rand-int ms-backoff)))] + + ;; Allow some time for possible poller reconnects: + (