Skip to content

Commit

Permalink
[new] Allow WebSocket constructors to delay connection
Browse files Browse the repository at this point in the history
Previously:
  WebSocket constructors returned a connected WebSocket.

After this commit:
  WebSocket constructors may return:
  - nil     (=> no WebSocket support), or
  - a delay (=> deref to return a connected WebSocket)

In other words, we've decoupled construction and connection.

The major advantage of this change is that it's now possible
for the default `ChAutoSocket` (:auto type chsk) to distinguish
between two kinds of errors:

  1. The platform doesn't support WebSockets (=> fall back to Ajax)
  2. The platform does    support WebSockets, but current
     connection attempts are failing due to transient reasons like
     the internet being temporarily unavailable (=> keep retrying)

Note: the new capability is implemented in such a way that we won't
break pre-existing custom constructors. While constructors should
ideally now return an unconnected delay, they MAY still directly
return an already-connected socket.
  • Loading branch information
ptaoussanis committed Jul 13, 2023
1 parent 6021258 commit e330ef2
Showing 1 changed file with 111 additions and 108 deletions.
219 changes: 111 additions & 108 deletions src/taoensso/sente.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -1278,26 +1278,29 @@
#?(:clj
(defn- make-client-ws-java
[{:as opts :keys [uri-str headers on-error on-message on-close]}]
(let [uri (java.net.URI. uri-str)

;; headers
;; (ImmutableMap/of
;; "Origin" "http://localhost:3200"
;; "Referer" "http://localhost:3200"
;; "Sec-WebSocket-Extensions" "permessage-deflate; client_max_window_bits"
;; )

ws-client
(proxy [WebSocketClient] [^java.net.URI uri ^java.util.Map headers]
(onOpen [^org.java_websocket.handshake.ServerHandshake handshakedata] nil)
(onError [ex] (on-error ex))
(onMessage [^String message] (on-message message))
(onClose [code reason remote] (on-close code reason remote)))]

;; JS client attempts to connect right away at construction time.
;; Java client doesn't need to, but we'll do anyway for consistency.
(.connect ws-client)
(do ws-client))))
(when-let [ws-client
(try
(let [uri (java.net.URI. uri-str)
#_headers
#_
(ImmutableMap/of
"Origin" "http://localhost:3200"
"Referer" "http://localhost:3200"
"Sec-WebSocket-Extensions" "permessage-deflate; client_max_window_bits")]

(proxy [WebSocketClient] [^java.net.URI uri ^java.util.Map headers]
(onOpen [^org.java_websocket.handshake.ServerHandshake handshakedata] nil)
(onError [ex] (on-error ex))
(onMessage [^String message] (on-message message))
(onClose [code reason remote] (on-close code reason remote))))

(catch Throwable t
(timbre/errorf t "Error creating Java WebSocket client")
nil))]

(delay
(.connect ws-client)
(do ws-client)))))

#?(:cljs
(defn- make-client-ws-js
Expand All @@ -1308,19 +1311,21 @@
(enc/oget goog/global "MozWebSocket")
(enc/oget @?node-npm-websocket_ "w3cwebsocket"))]

(let [socket (WebSocket. uri-str)]
(doto socket
(aset "onerror" on-error)
(aset "onmessage" on-message) ; Nb receives both push & cb evs!
;; Fires repeatedly (on each connection attempt) while server is down:
(aset "onclose" on-close))
(delay
(let [socket (WebSocket. uri-str)]
(doto socket
(aset "onerror" on-error)
(aset "onmessage" on-message) ; Nb receives both push & cb evs!
;; Fires repeatedly (on each connection attempt) while server is down:
(aset "onclose" on-close))

(when-let [bt binary-type] ; "arraybuffer" or "blob" (js default)
(aset socket "binaryType" bt))

socket))))
(when-let [bt binary-type] ; "arraybuffer" or "blob" (js default)
(aset socket "binaryType" bt))
socket)))))

(defn- default-client-ws-constructor
"Returns nil if WebSocket client cannot be created, or a delay
that can be derefed to get a connected client."
[{:as opts :keys [on-error on-message on-close uri-str headers]}]
#?(:cljs (make-client-ws-js opts)
:clj (make-client-ws-java opts)))
Expand Down Expand Up @@ -1551,7 +1556,7 @@

(retry-fn))))

?new-socket
?new-socket_
(try
(ws-constructor
(merge ws-opts
Expand All @@ -1567,51 +1572,54 @@
(:csrf-token @state_))}))}))

(catch #?(:clj Throwable :cljs :default) t
(timbre/errorf t "Client WebSocket constructor error")
(timbre/errorf t "Error creating WebSocket client")
nil))]

(if-let [new-socket ?new-socket]
(do
(when-let [[old-s _old-sid]
(reset-in! socket_
[new-socket this-socket-id])]
;; Close old socket if one exists
(timbre/tracef "Old client WebSocket will be closed")
#?(:clj (.close ^WebSocketClient old-s 1000 "CLOSE_NORMAL")
:cljs (.close old-s 1000 "CLOSE_NORMAL")))
new-socket)

;; Couldn't create a socket
(retry-fn)))))]
(when-let [new-socket_ ?new-socket_]
(if-let [new-socket
(try
(force new-socket_)
(catch #?(:clj Throwable :cljs :default) t
(timbre/errorf t "Error realizing WebSocket client")
nil))]
(do
(when-let [[old-s _old-sid] (reset-in! socket_ [new-socket this-socket-id])]
;; Close old socket if one exists
(timbre/tracef "Old client WebSocket will be closed")
#?(:clj (.close ^WebSocketClient old-s 1000 "CLOSE_NORMAL")
:cljs (.close old-s 1000 "CLOSE_NORMAL")))
new-socket)
(retry-fn))))))]

(reset! retry-count_ 0)
(connect-fn)

;; Client-side loop to detect broken conns, Ref. #259
(when-let [ms ws-kalive-ms]
(go-loop []
(let [udt-t0 @udt-last-comms_]
(<! (async/timeout ms))
(when (own-conn?)
(let [udt-t1 @udt-last-comms_]
(when-let [;; No conn send/recv activity w/in kalive window?
no-activity? (= udt-t0 udt-t1)]

(timbre/debugf "Client will send ws-ping to server: %s"
{:ms-since-last-activity (- (enc/now-udt) udt-t1)
:timeout-ms ws-ping-timeout-ms})

(-chsk-send! chsk [:chsk/ws-ping]
{:flush? true
:timeout-ms ws-ping-timeout-ms
:cb ; Server will auto reply
(fn [reply]
(when (and (own-conn?) (not= reply "pong") #_(= reply :chsk/timeout))
(timbre/debugf "Client ws-ping to server timed-out, will cycle WebSocket now")
(-chsk-reconnect! chsk :ws-ping-timeout)))})))
(recur)))))

chsk)))
(when (connect-fn)

;; Client-side loop to detect broken conns, Ref. #259
(when-let [ms ws-kalive-ms]
(go-loop []
(let [udt-t0 @udt-last-comms_]
(<! (async/timeout ms))
(when (own-conn?)
(let [udt-t1 @udt-last-comms_]
(when-let [;; No conn send/recv activity w/in kalive window?
no-activity? (= udt-t0 udt-t1)]

(timbre/debugf "Client will send ws-ping to server: %s"
{:ms-since-last-activity (- (enc/now-udt) udt-t1)
:timeout-ms ws-ping-timeout-ms})

(-chsk-send! chsk [:chsk/ws-ping]
{:flush? true
:timeout-ms ws-ping-timeout-ms
:cb ; Server will auto reply
(fn [reply]
(when (and (own-conn?) (not= reply "pong") #_(= reply :chsk/timeout))
(timbre/debugf "Client ws-ping to server timed-out, will cycle WebSocket now")
(-chsk-reconnect! chsk :ws-ping-timeout)))})))
(recur)))))

chsk))))

(defn- new-ChWebSocket [opts csrf-token]
(map->ChWebSocket
Expand Down Expand Up @@ -1808,19 +1816,11 @@
]

IChSocket
(-chsk-disconnect! [chsk reason]
(when-let [impl @impl_]
(-chsk-disconnect! impl reason)))

;; Possibly reset impl type:
(-chsk-reconnect! [chsk reason]
(when-let [impl @impl_]
(-chsk-disconnect! impl reason)
(-chsk-connect! chsk)))

(-chsk-break-connection! [chsk opts]
(when-let [impl @impl_]
(-chsk-break-connection! impl opts)))
(-chsk-break-connection! [chsk opts] (when-let [impl @impl_] (-chsk-break-connection! impl opts)))
(-chsk-disconnect! [chsk reason] (when-let [impl @impl_] (-chsk-disconnect! impl reason)))
(-chsk-reconnect! [chsk reason]
(-chsk-disconnect! chsk reason)
(-chsk-connect! chsk))

(-chsk-send! [chsk ev opts]
(if-let [impl @impl_]
Expand All @@ -1830,35 +1830,38 @@
(chsk-send->closed! ?cb-fn))))

(-chsk-connect! [chsk]
;; Starting with a simple downgrade-only strategy here as a proof of concept
;; TODO Later consider smarter downgrade or downgrade+upgrade strategies?
;; Currently using a simplistic downgrade-only strategy.
;; TODO Consider smarter strategy that can also upgrade?
(let [ajax-chsk-opts (assoc ajax-chsk-opts :state_ state_)
ws-chsk-opts (assoc ws-chsk-opts :state_ state_)
ws-chsk-opts (assoc ws-chsk-opts :state_ state_)

ajax-conn!
ajax-chsk!
(fn []
;; Remove :auto->:ajax downgrade watch
(remove-watch state_ :chsk/auto-ajax-downgrade)
(-chsk-connect! (new-ChAjaxSocket ajax-chsk-opts (:csrf-token @state_))))
(let [ajax-chsk (new-ChAjaxSocket ajax-chsk-opts (:csrf-token @state_))]
(remove-watch state_ :chsk/auto-ajax-downgrade)
(-chsk-connect! ajax-chsk)))

ws-conn!
ws-chsk!
(fn []
;; Configure :auto->:ajax downgrade watch
(let [downgraded?_ (atom false)]
(let [ws-chsk (new-ChWebSocket ws-chsk-opts (:csrf-token @state_))
downgraded?_ (atom false)]

(add-watch state_ :chsk/auto-ajax-downgrade
(fn [_ _ old-state new-state]
(when-let [impl @impl_]
(when-let [ever-opened?_ (:ever-opened?_ impl)]
(when-not @ever-opened?_
(when (:last-ws-error new-state)
(when (compare-and-set! downgraded?_ false true)
(timbre/warnf "Client permanently downgrading chsk mode: :auto -> :ajax")
(-chsk-disconnect! impl :downgrading-ws-to-ajax)
(reset! impl_ (ajax-conn!))))))))))

(-chsk-connect! (new-ChWebSocket ws-chsk-opts (:csrf-token @state_))))]

(reset! impl_ (or (ws-conn!) (ajax-conn!)))
(enc/when-let [state-changed? (not= old-state new-state)
impl @impl_
ever-opened?_ (:ever-opened?_ impl)
never-opened? (not @ever-opened?_)
ws-error (:last-ws-error new-state)]

(when (compare-and-set! downgraded?_ false true)
(timbre/warnf "Client permanently downgrading chsk mode: :auto -> :ajax")
(-chsk-disconnect! impl :downgrading-ws-to-ajax)
(reset! impl_ (ajax-chsk!))))))

(-chsk-connect! ws-chsk)))]

(reset! impl_ (or (ws-chsk!) (ajax-chsk!)))
chsk))))

#?(:cljs
Expand Down Expand Up @@ -1911,8 +1914,8 @@
; await reply before regarding the connection as broken
:ws-constructor ; Advanced, (fn [{:keys [uri-str headers on-message on-error on-close]}]
; => connected WebSocket, see `default-client-ws-constructor` code for
; details."
; => nil, or delay that can be dereffed to get a connected WebSocket.
; See `default-client-ws-constructor` code for details."

[path ?csrf-token-or-fn &
[{:as opts
Expand Down

0 comments on commit e330ef2

Please sign in to comment.