diff --git a/packages/middleware-websocket/src/websocket-fetch-handler.ts b/packages/middleware-websocket/src/websocket-fetch-handler.ts index df89df13133f..129052d8b901 100644 --- a/packages/middleware-websocket/src/websocket-fetch-handler.ts +++ b/packages/middleware-websocket/src/websocket-fetch-handler.ts @@ -87,7 +87,7 @@ export class WebSocketFetchHandler { * Removes all closing/closed sockets from the socket pool for URL. */ private removeNotUsableSockets(url: string): void { - this.sockets[url] = this.sockets[url].filter( + this.sockets[url] = (this.sockets[url] ?? []).filter( (socket) => ![WebSocket.CLOSING, WebSocket.CLOSED].includes(socket.readyState) ); } @@ -115,39 +115,46 @@ export class WebSocketFetchHandler { // is returned while data keeps streaming. let streamError: Error | undefined = undefined; + // To notify onclose event that error has occurred. + let socketErrorOccurred = false; + + // initialize as no-op. + let reject: (err?: unknown) => void = () => {}; + let resolve: ({ done, value }: { done: boolean; value: Uint8Array }) => void = () => {}; + + socket.onmessage = (event) => { + resolve({ + done: false, + value: new Uint8Array(event.data), + }); + }; + + socket.onerror = (error) => { + socketErrorOccurred = true; + socket.close(); + reject(error); + }; + + socket.onclose = () => { + this.removeNotUsableSockets(socket.url); + if (socketErrorOccurred) return; + + if (streamError) { + reject(streamError); + } else { + resolve({ + done: true, + value: undefined as any, // unchecked because done=true. + }); + } + }; + const outputStream: AsyncIterable = { [Symbol.asyncIterator]: () => ({ next: () => { - return new Promise((resolve, reject) => { - // To notify onclose event that error has occurred - let socketErrorOccurred = false; - - socket.onerror = (error) => { - socketErrorOccurred = true; - socket.close(); - reject(error); - }; - - socket.onclose = () => { - this.removeNotUsableSockets(socket.url); - if (socketErrorOccurred) return; - - if (streamError) { - reject(streamError); - } else { - resolve({ - done: true, - value: undefined, - }); - } - }; - - socket.onmessage = (event) => { - resolve({ - done: false, - value: new Uint8Array(event.data), - }); - }; + return new Promise((_resolve, _reject) => { + resolve = _resolve; + reject = _reject; }); }, }),