Skip to content

Commit

Permalink
[new] [#420] More reliable WebSocket server->client broadcasts
Browse files Browse the repository at this point in the history
BEFORE THIS COMMIT

  - `send-buffered-server-evs>ajax-clients!` has logic to allow time for possible reconnects during broadcast.
  - `send-buffered-server-evs>ws-clients!` does not.

  I.e. Ajax broadcasts have a reliability mechanism that WS broadcasts do not.

AFTER THIS COMMIT

  The same reliability mechanism is now applied to both Ajax and WS broadcasts.
  The feature is far less critical for WS broadcasts, but still nice to have.
  • Loading branch information
ptaoussanis committed Mar 7, 2023
1 parent 5f945db commit 7dba037
Showing 1 changed file with 20 additions and 30 deletions.
50 changes: 20 additions & 30 deletions src/taoensso/sente.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -305,8 +305,7 @@
(def ^:private next-idx! (enc/counter))

(declare
^:private send-buffered-server-evs>ws-clients!
^:private send-buffered-server-evs>ajax-clients!
^:private send-buffered-server-evs>clients!
^:private default-client-side-ajax-timeout-ms)

(defn allow-origin?
Expand Down Expand Up @@ -572,11 +571,8 @@

(let [buffered-evs-ppstr (pack packer buffered-evs)]
(tracef "buffered-evs-ppstr: %s" buffered-evs-ppstr)
(case conn-type
:ws (send-buffered-server-evs>ws-clients! conns_
uid buffered-evs-ppstr upd-conn!)
:ajax (send-buffered-server-evs>ajax-clients! conns_
uid buffered-evs-ppstr))))))]
(send-buffered-server-evs>clients! conn-type
conns_ uid buffered-evs-ppstr)))))]

(if (= ev [:chsk/close]) ; Currently undocumented
(do
Expand Down Expand Up @@ -884,29 +880,23 @@
(errorf "ring-req->server-ch-resp error: %s (%s)"
error uid sch-uuid))}))))}))

(defn- send-buffered-server-evs>ws-clients!
"Actually pushes buffered events (as packed-str) to all uid's WebSocket conns."
[conns_ uid buffered-evs-pstr upd-conn!]
(tracef "send-buffered-server-evs>ws-clients!: %s" buffered-evs-pstr)
(doseq [[client-id [?sch _udt]] (get-in @conns_ [:ws uid])]
(when-let [sch ?sch]
(upd-conn! :ws uid client-id)
(interfaces/sch-send! sch :websocket buffered-evs-pstr))))

(defn- send-buffered-server-evs>ajax-clients!
"Actually pushes buffered events (as packed-str) to all uid's Ajax conns.
Allows some time for possible Ajax poller reconnects."
[conns_ uid buffered-evs-pstr]
(tracef "send-buffered-server-evs>ajax-clients!: %s" buffered-evs-pstr)
(defn- send-buffered-server-evs>clients!
"Actually pushes buffered events (as packed-str) to all uid's conns.
Allows some time for possible reconnects."
[conn-type conns_ uid buffered-evs-pstr]
(tracef "send-buffered-server-evs>clients!: %s %s" conn-type buffered-evs-pstr)
(have? [:el #{:ajax :ws}] conn-type)

(let [ms-backoffs [90 180 360 720 1440] ; Mean 2790s
;; All connected/possibly-reconnecting client uuids:
client-ids-unsatisfied (keys (get-in @conns_ [:ajax uid]))]
client-ids-unsatisfied (keys (get-in @conns_ [conn-type uid]))
websocket? (= conn-type :ws)]

(when-not (empty? client-ids-unsatisfied)
;; (tracef "client-ids-unsatisfied: %s" client-ids-unsatisfied)
(go-loop [n 0 client-ids-satisfied #{}]
(let [?pulled ; nil or {<client-id> [<?sch> <udt>]}
(swap-in! conns_ [:ajax uid]
(swap-in! conns_ [conn-type uid]
(fn [m] ; {<client-id> [<?sch> <udt>]}
(let [ks-to-pull (remove client-ids-satisfied (keys m))]
;; (tracef "ks-to-pull: %s" ks-to-pull)
Expand All @@ -915,12 +905,12 @@
(swapped
(reduce
(fn [m k]
(let [[?sch udt] (get m k)]
;; Nb don't change udt; for Ajax conns_ we only
;; want udt updated on poll or close, not on
;; activity (as with ws conns_)
(assoc m k [nil udt #_(enc/now-udt)])))

(let [[?sch udt] (get m k)
new-entry
(if websocket?
[?sch (enc/now-udt)]
[nil udt])]
(assoc m k new-entry)))
m ks-to-pull)
(select-keys m ks-to-pull))))))]

Expand All @@ -933,7 +923,7 @@
(let [sent?
(when-let [sch ?sch]
;; Will noop + return false if sch already closed:
(interfaces/sch-send! ?sch (not :websocket)
(interfaces/sch-send! sch websocket?
buffered-evs-pstr))]

(if sent? (conj s client-id) s)))
Expand Down

0 comments on commit 7dba037

Please sign in to comment.