diff --git a/packages/discord.js/package.json b/packages/discord.js/package.json index e9f87d209fec1..d5dcf1b5a95b1 100644 --- a/packages/discord.js/package.json +++ b/packages/discord.js/package.json @@ -54,6 +54,7 @@ "@discordjs/formatters": "workspace:^", "@discordjs/rest": "workspace:^", "@discordjs/util": "workspace:^", + "@discordjs/ws": "workspace:^", "@sapphire/snowflake": "^3.4.0", "@types/ws": "^8.5.4", "discord-api-types": "^0.37.28", diff --git a/packages/discord.js/src/client/websocket/WebSocketManager.js b/packages/discord.js/src/client/websocket/WebSocketManager.js index f96cdf12e5aff..81de47fccb716 100644 --- a/packages/discord.js/src/client/websocket/WebSocketManager.js +++ b/packages/discord.js/src/client/websocket/WebSocketManager.js @@ -2,9 +2,13 @@ const EventEmitter = require('node:events'); const { setImmediate } = require('node:timers'); -const { setTimeout: sleep } = require('node:timers/promises'); const { Collection } = require('@discordjs/collection'); -const { GatewayCloseCodes, GatewayDispatchEvents, Routes } = require('discord-api-types/v10'); +const { + WebSocketManager: WSWebSocketManager, + WebSocketShardEvents: WSWebSocketShardEvents, + CloseCodes, +} = require('@discordjs/ws'); +const { GatewayCloseCodes, GatewayDispatchEvents } = require('discord-api-types/v10'); const WebSocketShard = require('./WebSocketShard'); const PacketHandlers = require('./handlers'); const { DiscordjsError, ErrorCodes } = require('../../errors'); @@ -22,16 +26,6 @@ const BeforeReadyWhitelist = [ GatewayDispatchEvents.GuildMemberRemove, ]; -const unrecoverableErrorCodeMap = { - [GatewayCloseCodes.AuthenticationFailed]: ErrorCodes.TokenInvalid, - [GatewayCloseCodes.InvalidShard]: ErrorCodes.ShardingInvalid, - [GatewayCloseCodes.ShardingRequired]: ErrorCodes.ShardingRequired, - [GatewayCloseCodes.InvalidIntents]: ErrorCodes.InvalidIntents, - [GatewayCloseCodes.DisallowedIntents]: ErrorCodes.DisallowedIntents, -}; - -const UNRESUMABLE_CLOSE_CODES = [1000, GatewayCloseCodes.AlreadyAuthenticated, GatewayCloseCodes.InvalidSeq]; - /** * The WebSocket manager for this client. * This class forwards raw dispatch events, @@ -56,27 +50,12 @@ class WebSocketManager extends EventEmitter { */ this.gateway = null; - /** - * The amount of shards this manager handles - * @private - * @type {number} - */ - this.totalShards = this.client.options.shards.length; - /** * A collection of all shards this manager handles * @type {Collection} */ this.shards = new Collection(); - /** - * An array of shards to be connected or that need to reconnect - * @type {Set} - * @private - * @name WebSocketManager#shardQueue - */ - Object.defineProperty(this, 'shardQueue', { value: new Set(), writable: true }); - /** * An array of queued events before this WebSocketManager became ready * @type {Object[]} @@ -98,12 +77,7 @@ class WebSocketManager extends EventEmitter { */ this.destroyed = false; - /** - * If this manager is currently reconnecting one or multiple shards - * @type {boolean} - * @private - */ - this.reconnecting = false; + this._ws = null; } /** @@ -119,11 +93,11 @@ class WebSocketManager extends EventEmitter { /** * Emits a debug message. * @param {string} message The debug message - * @param {?WebSocketShard} [shard] The shard that emitted this message, if any + * @param {?number} [shardId] The id of the shard that emitted this message, if any * @private */ - debug(message, shard) { - this.client.emit(Events.Debug, `[WS => ${shard ? `Shard ${shard.id}` : 'Manager'}] ${message}`); + debug(message, shardId) { + this.client.emit(Events.Debug, `[WS => ${shardId ? `Shard ${shardId}` : 'Manager'}] ${message}`); } /** @@ -132,11 +106,30 @@ class WebSocketManager extends EventEmitter { */ async connect() { const invalidToken = new DiscordjsError(ErrorCodes.TokenInvalid); + const { shards, shardCount, intents, ws } = this.client.options; + if (this._ws && this._ws.options.token !== this.client.token) { + await this._ws.destroy({ code: 1000, reason: 'Login with differing token requested' }); + this._ws = null; + } + if (!this._ws) { + this._ws = new WSWebSocketManager({ + intents: intents.bitfield, + rest: this.client.rest, + token: this.client.token, + largeThreshold: ws.large_threshold, + version: ws.version, + shardIds: shards === 'auto' ? null : shards, + shardCount: shards === 'auto' ? null : shardCount, + initialPresence: ws.presence, + }); + this.attachEvents(); + } + const { url: gatewayURL, shards: recommendedShards, session_start_limit: sessionStartLimit, - } = await this.client.rest.get(Routes.gatewayBot()).catch(error => { + } = await this._ws.fetchGatewayInformation().catch(error => { throw error.status === 401 ? invalidToken : error; }); @@ -152,156 +145,95 @@ class WebSocketManager extends EventEmitter { this.gateway = `${gatewayURL}/`; - let { shards } = this.client.options; - - if (shards === 'auto') { - this.debug(`Using the recommended shard count provided by Discord: ${recommendedShards}`); - this.totalShards = this.client.options.shardCount = recommendedShards; - shards = this.client.options.shards = Array.from({ length: recommendedShards }, (_, i) => i); - } - - this.totalShards = shards.length; - this.debug(`Spawning shards: ${shards.join(', ')}`); - this.shardQueue = new Set(shards.map(id => new WebSocketShard(this, id))); - - return this.createShards(); - } - - /** - * Handles the creation of a shard. - * @returns {Promise} - * @private - */ - async createShards() { - // If we don't have any shards to handle, return - if (!this.shardQueue.size) return false; - - const [shard] = this.shardQueue; - - this.shardQueue.delete(shard); + await this._ws.connect(); - if (!shard.eventsAttached) { - shard.on(WebSocketShardEvents.AllReady, unavailableGuilds => { - /** - * Emitted when a shard turns ready. - * @event Client#shardReady - * @param {number} id The shard id that turned ready - * @param {?Set} unavailableGuilds Set of unavailable guild ids, if any - */ - this.client.emit(Events.ShardReady, shard.id, unavailableGuilds); - - if (!this.shardQueue.size) this.reconnecting = false; - this.checkShardsReady(); - }); + this.totalShards = this.client.options.shardCount = await this._ws.getShardCount(); + this.client.options.shards = await this._ws.getShardIds(); + for (const id of this.client.options.shards) { + if (!this.shards.has(id)) { + const shard = new WebSocketShard(this, id); + this.shards.set(id, shard); - shard.on(WebSocketShardEvents.Close, event => { - if (event.code === 1_000 ? this.destroyed : event.code in unrecoverableErrorCodeMap) { + shard.on(WebSocketShardEvents.AllReady, unavailableGuilds => { /** - * Emitted when a shard's WebSocket disconnects and will no longer reconnect. - * @event Client#shardDisconnect - * @param {CloseEvent} event The WebSocket close event - * @param {number} id The shard id that disconnected + * Emitted when a shard turns ready. + * @event Client#shardReady + * @param {number} id The shard id that turned ready + * @param {?Set} unavailableGuilds Set of unavailable guild ids, if any */ - this.client.emit(Events.ShardDisconnect, event, shard.id); - this.debug(GatewayCloseCodes[event.code], shard); - return; - } + this.client.emit(Events.ShardReady, shard.id, unavailableGuilds); - if (UNRESUMABLE_CLOSE_CODES.includes(event.code)) { - // These event codes cannot be resumed - shard.sessionId = null; - } - - /** - * Emitted when a shard is attempting to reconnect or re-identify. - * @event Client#shardReconnecting - * @param {number} id The shard id that is attempting to reconnect - */ - this.client.emit(Events.ShardReconnecting, shard.id); - - this.shardQueue.add(shard); - - if (shard.sessionId) this.debug(`Session id is present, attempting an immediate reconnect...`, shard); - this.reconnect(); - }); - - shard.on(WebSocketShardEvents.InvalidSession, () => { - this.client.emit(Events.ShardReconnecting, shard.id); - }); - - shard.on(WebSocketShardEvents.Destroyed, () => { - this.debug('Shard was destroyed but no WebSocket connection was present! Reconnecting...', shard); - - this.client.emit(Events.ShardReconnecting, shard.id); - - this.shardQueue.add(shard); - this.reconnect(); - }); - - shard.eventsAttached = true; - } - - this.shards.set(shard.id, shard); - - try { - await shard.connect(); - } catch (error) { - if (error?.code && error.code in unrecoverableErrorCodeMap) { - throw new DiscordjsError(unrecoverableErrorCodeMap[error.code]); - // Undefined if session is invalid, error event for regular closes - } else if (!error || error.code) { - this.debug('Failed to connect to the gateway, requeueing...', shard); - this.shardQueue.add(shard); - } else { - throw error; + this.checkShardsReady(); + }); } } - // If we have more shards, add a 5s delay - if (this.shardQueue.size) { - this.debug(`Shard Queue Size: ${this.shardQueue.size}; continuing in 5 seconds...`); - await sleep(5_000); - return this.createShards(); - } - - return true; } /** - * Handles reconnects for this manager. + * Attaches event handlers to the internal WebSocketShardManager from `@discordjs/ws`. * @private - * @returns {Promise} */ - async reconnect() { - if (this.reconnecting || this.status !== Status.Ready) return false; - this.reconnecting = true; - try { - await this.createShards(); - } catch (error) { - this.debug(`Couldn't reconnect or fetch information about the gateway. ${error}`); - if (error.httpStatus !== 401) { - this.debug(`Possible network error occurred. Retrying in 5s...`); - await sleep(5_000); - this.reconnecting = false; - return this.reconnect(); + attachEvents() { + this._ws.on(WSWebSocketShardEvents.Debug, ({ message, shardId }) => this.debug(message, shardId)); + this._ws.on(WSWebSocketShardEvents.Dispatch, ({ packet, shardId }) => { + this.client.emit(Events.Raw, packet, shardId); + const shard = this.shards.get(shardId); + this.handlePacket(packet, shard); + if (shard.status === Status.WaitingForGuilds && packet.t === GatewayDispatchEvents.GuildCreate) { + shard.onGuildPacket(packet); } - // If we get an error at this point, it means we cannot reconnect anymore - if (this.client.listenerCount(Events.Invalidated)) { + }); + + this._ws.on(WSWebSocketShardEvents.Ready, ({ data, shardId }) => { + this.shards.get(shardId).onReadyPacket(data); + }); + + this._ws.on(WSWebSocketShardEvents.Closed, ({ code, reason = '', shardId }) => { + this.shards.get(shardId).status = code === CloseCodes.Resuming ? Status.Resuming : Status.Disconnected; + if (code === CloseCodes.Normal && this.destroyed) { /** - * Emitted when the client's session becomes invalidated. - * You are expected to handle closing the process gracefully and preventing a boot loop - * if you are listening to this event. - * @event Client#invalidated + * Emitted when a shard's WebSocket disconnects and will no longer reconnect. + * @event Client#shardDisconnect + * @param {CloseEvent} event The WebSocket close event + * @param {number} id The shard id that disconnected */ - this.client.emit(Events.Invalidated); - // Destroy just the shards. This means you have to handle the cleanup yourself - this.destroy(); - } else { - this.client.destroy(); + this.client.emit(Events.ShardDisconnect, { code, reason, wasClean: true }, shardId); + this.debug(GatewayCloseCodes[code], shardId); + return; } - } finally { - this.reconnecting = false; - } - return true; + + /** + * Emitted when a shard is attempting to reconnect or re-identify. + * @event Client#shardReconnecting + * @param {number} id The shard id that is attempting to reconnect + */ + this.client.emit(Events.ShardReconnecting, shardId); + }); + + this._ws.on(WSWebSocketShardEvents.Resumed, ({ shardId }) => { + /** + * Emitted when the shard resumes successfully + * @event WebSocketShard#resumed + */ + this.shards.get(shardId).emit(WebSocketShardEvents.Resumed); + }); + + this._ws.on(WSWebSocketShardEvents.HeartbeatComplete, ({ heartbeatAt, latency, shardId }) => { + const shard = this.shards.get(shardId); + shard.lastPingTimestamp = heartbeatAt; + shard.ping = latency; + }); + + // TODO: refactor once error event gets exposed publicly + this._ws.on('error', ({ err, shardId }) => { + /** + * Emitted whenever a shard's WebSocket encounters a connection error. + * @event Client#shardError + * @param {Error} error The encountered error + * @param {number} shardId The shard that encountered this error + */ + this.client.emit(Events.ShardError, err, shardId); + }); } /** @@ -310,7 +242,7 @@ class WebSocketManager extends EventEmitter { * @private */ broadcast(packet) { - for (const shard of this.shards.values()) shard.send(packet); + for (const shardId of this.shards.keys()) this._ws.send(shardId, packet); } /** @@ -322,8 +254,7 @@ class WebSocketManager extends EventEmitter { // TODO: Make a util for getting a stack this.debug(`Manager was destroyed. Called by:\n${new Error().stack}`); this.destroyed = true; - this.shardQueue.clear(); - for (const shard of this.shards.values()) shard.destroy({ closeCode: 1_000, reset: true, emit: false, log: false }); + this._ws.destroy({ code: 1_000 }); } /** diff --git a/packages/discord.js/src/client/websocket/WebSocketShard.js b/packages/discord.js/src/client/websocket/WebSocketShard.js index c41b656360a7c..7ffee867a431e 100644 --- a/packages/discord.js/src/client/websocket/WebSocketShard.js +++ b/packages/discord.js/src/client/websocket/WebSocketShard.js @@ -1,22 +1,12 @@ 'use strict'; const EventEmitter = require('node:events'); -const { setTimeout, setInterval, clearTimeout, clearInterval } = require('node:timers'); -const { GatewayDispatchEvents, GatewayIntentBits, GatewayOpcodes } = require('discord-api-types/v10'); -const WebSocket = require('../../WebSocket'); -const Events = require('../../util/Events'); +const process = require('node:process'); +const { setTimeout, clearTimeout } = require('node:timers'); +const { GatewayIntentBits } = require('discord-api-types/v10'); const Status = require('../../util/Status'); const WebSocketShardEvents = require('../../util/WebSocketShardEvents'); -const STATUS_KEYS = Object.keys(Status); -const CONNECTION_STATE = Object.keys(WebSocket.WebSocket); - -let zlib; - -try { - zlib = require('zlib-sync'); -} catch {} // eslint-disable-line no-empty - /** * Represents a Shard's WebSocket connection * @extends {EventEmitter} @@ -43,34 +33,6 @@ class WebSocketShard extends EventEmitter { */ this.status = Status.Idle; - /** - * The current sequence of the shard - * @type {number} - * @private - */ - this.sequence = -1; - - /** - * The sequence of the shard after close - * @type {number} - * @private - */ - this.closeSequence = 0; - - /** - * The current session id of the shard - * @type {?string} - * @private - */ - this.sessionId = null; - - /** - * The resume url for this shard - * @type {?string} - * @private - */ - this.resumeURL = null; - /** * The previous heartbeat ping of the shard * @type {number} @@ -83,81 +45,6 @@ class WebSocketShard extends EventEmitter { */ this.lastPingTimestamp = -1; - /** - * If we received a heartbeat ack back. Used to identify zombie connections - * @type {boolean} - * @private - */ - this.lastHeartbeatAcked = true; - - /** - * Used to prevent calling {@link WebSocketShard#event:close} twice while closing or terminating the WebSocket. - * @type {boolean} - * @private - */ - this.closeEmitted = false; - - /** - * Contains the rate limit queue and metadata - * @name WebSocketShard#ratelimit - * @type {Object} - * @private - */ - Object.defineProperty(this, 'ratelimit', { - value: { - queue: [], - total: 120, - remaining: 120, - time: 60e3, - timer: null, - }, - }); - - /** - * The WebSocket connection for the current shard - * @name WebSocketShard#connection - * @type {?WebSocket} - * @private - */ - Object.defineProperty(this, 'connection', { value: null, writable: true }); - - /** - * @external Inflate - * @see {@link https://www.npmjs.com/package/zlib-sync} - */ - - /** - * The compression to use - * @name WebSocketShard#inflate - * @type {?Inflate} - * @private - */ - Object.defineProperty(this, 'inflate', { value: null, writable: true }); - - /** - * The HELLO timeout - * @name WebSocketShard#helloTimeout - * @type {?NodeJS.Timeout} - * @private - */ - Object.defineProperty(this, 'helloTimeout', { value: null, writable: true }); - - /** - * The WebSocket timeout. - * @name WebSocketShard#wsCloseTimeout - * @type {?NodeJS.Timeout} - * @private - */ - Object.defineProperty(this, 'wsCloseTimeout', { value: null, writable: true }); - - /** - * If the manager attached its event handlers on the shard - * @name WebSocketShard#eventsAttached - * @type {boolean} - * @private - */ - Object.defineProperty(this, 'eventsAttached', { value: false, writable: true }); - /** * A set of guild ids this shard expects to receive * @name WebSocketShard#expectedGuilds @@ -173,14 +60,6 @@ class WebSocketShard extends EventEmitter { * @private */ Object.defineProperty(this, 'readyTimeout', { value: null, writable: true }); - - /** - * Time when the WebSocket connection was opened - * @name WebSocketShard#connectedAt - * @type {number} - * @private - */ - Object.defineProperty(this, 'connectedAt', { value: 0, writable: true }); } /** @@ -192,160 +71,6 @@ class WebSocketShard extends EventEmitter { this.manager.debug(message, this); } - /** - * Connects the shard to the gateway. - * @private - * @returns {Promise} A promise that will resolve if the shard turns ready successfully, - * or reject if we couldn't connect - */ - connect() { - const { client } = this.manager; - - if (this.connection?.readyState === WebSocket.OPEN && this.status === Status.Ready) { - return Promise.resolve(); - } - - const gateway = this.resumeURL ?? this.manager.gateway; - - return new Promise((resolve, reject) => { - const cleanup = () => { - this.removeListener(WebSocketShardEvents.Close, onClose); - this.removeListener(WebSocketShardEvents.Ready, onReady); - this.removeListener(WebSocketShardEvents.Resumed, onResumed); - this.removeListener(WebSocketShardEvents.InvalidSession, onInvalidOrDestroyed); - this.removeListener(WebSocketShardEvents.Destroyed, onInvalidOrDestroyed); - }; - - const onReady = () => { - cleanup(); - resolve(); - }; - - const onResumed = () => { - cleanup(); - resolve(); - }; - - const onClose = event => { - cleanup(); - reject(event); - }; - - const onInvalidOrDestroyed = () => { - cleanup(); - // eslint-disable-next-line prefer-promise-reject-errors - reject(); - }; - - this.once(WebSocketShardEvents.Ready, onReady); - this.once(WebSocketShardEvents.Resumed, onResumed); - this.once(WebSocketShardEvents.Close, onClose); - this.once(WebSocketShardEvents.InvalidSession, onInvalidOrDestroyed); - this.once(WebSocketShardEvents.Destroyed, onInvalidOrDestroyed); - - if (this.connection?.readyState === WebSocket.OPEN) { - this.debug('An open connection was found, attempting an immediate identify.'); - this.identify(); - return; - } - - if (this.connection) { - this.debug(`A connection object was found. Cleaning up before continuing. - State: ${CONNECTION_STATE[this.connection.readyState]}`); - this.destroy({ emit: false }); - } - - const wsQuery = { v: client.options.ws.version }; - - if (zlib) { - this.inflate = new zlib.Inflate({ - chunkSize: 65535, - flush: zlib.Z_SYNC_FLUSH, - to: WebSocket.encoding === 'json' ? 'string' : '', - }); - wsQuery.compress = 'zlib-stream'; - } - - this.debug( - `[CONNECT] - Gateway : ${gateway} - Version : ${client.options.ws.version} - Encoding : ${WebSocket.encoding} - Compression: ${zlib ? 'zlib-stream' : 'none'}`, - ); - - this.status = this.status === Status.Disconnected ? Status.Reconnecting : Status.Connecting; - this.setHelloTimeout(); - - this.connectedAt = Date.now(); - - // Adding a handshake timeout to just make sure no zombie connection appears. - const ws = (this.connection = WebSocket.create(gateway, wsQuery, { handshakeTimeout: 30_000 })); - ws.onopen = this.onOpen.bind(this); - ws.onmessage = this.onMessage.bind(this); - ws.onerror = this.onError.bind(this); - ws.onclose = this.onClose.bind(this); - }); - } - - /** - * Called whenever a connection is opened to the gateway. - * @private - */ - onOpen() { - this.debug(`[CONNECTED] Took ${Date.now() - this.connectedAt}ms`); - this.status = Status.Nearly; - } - - /** - * Called whenever a message is received. - * @param {MessageEvent} event Event received - * @private - */ - onMessage({ data }) { - let raw; - if (data instanceof ArrayBuffer) data = new Uint8Array(data); - if (zlib) { - const l = data.length; - const flush = - l >= 4 && data[l - 4] === 0x00 && data[l - 3] === 0x00 && data[l - 2] === 0xff && data[l - 1] === 0xff; - - this.inflate.push(data, flush && zlib.Z_SYNC_FLUSH); - if (!flush) return; - raw = this.inflate.result; - } else { - raw = data; - } - let packet; - try { - packet = WebSocket.unpack(raw); - } catch (err) { - this.manager.client.emit(Events.ShardError, err, this.id); - return; - } - this.manager.client.emit(Events.Raw, packet, this.id); - if (packet.op === GatewayOpcodes.Dispatch) this.manager.emit(packet.t, packet.d, this.id); - this.onPacket(packet); - } - - /** - * Called whenever an error occurs with the WebSocket. - * @param {ErrorEvent} event The error that occurred - * @private - */ - onError(event) { - const error = event?.error ?? event; - if (!error) return; - - /** - * Emitted whenever a shard's WebSocket encounters a connection error. - * @event Client#shardError - * @param {Error} error The encountered error - * @param {number} shardId The shard that encountered this error - */ - this.manager.client.emit(Events.ShardError, error, this.id); - } - /** * @external CloseEvent * @see {@link https://developer.mozilla.org/en-US/docs/Web/API/CloseEvent} @@ -361,33 +86,11 @@ class WebSocketShard extends EventEmitter { * @see {@link https://developer.mozilla.org/en-US/docs/Web/API/MessageEvent} */ - /** - * Called whenever a connection to the gateway is closed. - * @param {CloseEvent} event Close event that was received - * @private - */ - onClose(event) { - this.closeEmitted = true; - if (this.sequence !== -1) this.closeSequence = this.sequence; - this.sequence = -1; - this.setHeartbeatTimer(-1); - this.setHelloTimeout(-1); - // Clearing the WebSocket close timeout as close was emitted. - this.setWsCloseTimeout(-1); - // If we still have a connection object, clean up its listeners - if (this.connection) { - this._cleanupConnection(); - // Having this after _cleanupConnection to just clean up the connection and not listen to ws.onclose - this.destroy({ reset: !this.sessionId, emit: false, log: false }); - } - this.status = Status.Disconnected; - this.emitClose(event); - } - /** * This method is responsible to emit close event for this shard. * This method helps the shard reconnect. * @param {CloseEvent} [event] Close event that was received + * @deprecated */ emitClose( event = { @@ -410,93 +113,35 @@ class WebSocketShard extends EventEmitter { } /** - * Called whenever a packet is received. + * Called when the shard receives the READY payload. * @param {Object} packet The received packet * @private */ - onPacket(packet) { + onReadyPacket(packet) { if (!packet) { this.debug(`Received broken packet: '${packet}'.`); return; } - switch (packet.t) { - case GatewayDispatchEvents.Ready: - /** - * Emitted when the shard receives the READY payload and is now waiting for guilds - * @event WebSocketShard#ready - */ - this.emit(WebSocketShardEvents.Ready); - - this.sessionId = packet.d.session_id; - this.resumeURL = packet.d.resume_gateway_url; - this.expectedGuilds = new Set(packet.d.guilds.map(d => d.id)); - this.status = Status.WaitingForGuilds; - this.debug(`[READY] Session ${this.sessionId} | Resume url ${this.resumeURL}.`); - this.lastHeartbeatAcked = true; - this.sendHeartbeat('ReadyHeartbeat'); - break; - case GatewayDispatchEvents.Resumed: { - /** - * Emitted when the shard resumes successfully - * @event WebSocketShard#resumed - */ - this.emit(WebSocketShardEvents.Resumed); - - this.status = Status.Ready; - const replayed = packet.s - this.closeSequence; - this.debug(`[RESUMED] Session ${this.sessionId} | Replayed ${replayed} events.`); - this.lastHeartbeatAcked = true; - this.sendHeartbeat('ResumeHeartbeat'); - break; - } - } + /** + * Emitted when the shard receives the READY payload and is now waiting for guilds + * @event WebSocketShard#ready + */ + this.emit(WebSocketShardEvents.Ready); - if (packet.s > this.sequence) this.sequence = packet.s; + this.expectedGuilds = new Set(packet.d.guilds.map(d => d.id)); + this.status = Status.WaitingForGuilds; + } - switch (packet.op) { - case GatewayOpcodes.Hello: - this.setHelloTimeout(-1); - this.setHeartbeatTimer(packet.d.heartbeat_interval); - this.identify(); - break; - case GatewayOpcodes.Reconnect: - this.debug('[RECONNECT] Discord asked us to reconnect'); - this.destroy({ closeCode: 4_000 }); - break; - case GatewayOpcodes.InvalidSession: - this.debug(`[INVALID SESSION] Resumable: ${packet.d}.`); - // If we can resume the session, do so immediately - if (packet.d) { - this.identifyResume(); - return; - } - // Reset the sequence - this.sequence = -1; - // Reset the session id as it's invalid - this.sessionId = null; - // Set the status to reconnecting - this.status = Status.Reconnecting; - // Finally, emit the INVALID_SESSION event - /** - * Emitted when the session has been invalidated. - * @event WebSocketShard#invalidSession - */ - this.emit(WebSocketShardEvents.InvalidSession); - break; - case GatewayOpcodes.HeartbeatAck: - this.ackHeartbeat(); - break; - case GatewayOpcodes.Heartbeat: - this.sendHeartbeat('HeartbeatRequest', true); - break; - default: - this.manager.handlePacket(packet, this); - if (this.status === Status.WaitingForGuilds && packet.t === GatewayDispatchEvents.GuildCreate) { - this.expectedGuilds.delete(packet.d.id); - this.checkReady(); - } - } + /** + * Called when a GuildCreate for this shard was sent after READY payload was received, + * but before we emitted the READY event. + * @param {Snowflake} guildId the id of the Guild sent in the payload + * @private + */ + gotGuild(guildId) { + this.expectedGuilds.delete(guildId); + this.checkReady(); } /** @@ -552,190 +197,6 @@ class WebSocketShard extends EventEmitter { ).unref(); } - /** - * Sets the HELLO packet timeout. - * @param {number} [time] If set to -1, it will clear the hello timeout - * @private - */ - setHelloTimeout(time) { - if (time === -1) { - if (this.helloTimeout) { - this.debug('Clearing the HELLO timeout.'); - clearTimeout(this.helloTimeout); - this.helloTimeout = null; - } - return; - } - this.debug('Setting a HELLO timeout for 20s.'); - this.helloTimeout = setTimeout(() => { - this.debug('Did not receive HELLO in time. Destroying and connecting again.'); - this.destroy({ reset: true, closeCode: 4009 }); - }, 20_000).unref(); - } - - /** - * Sets the WebSocket Close timeout. - * This method is responsible for detecting any zombie connections if the WebSocket fails to close properly. - * @param {number} [time] If set to -1, it will clear the timeout - * @private - */ - setWsCloseTimeout(time) { - if (this.wsCloseTimeout) { - this.debug('[WebSocket] Clearing the close timeout.'); - clearTimeout(this.wsCloseTimeout); - } - if (time === -1) { - this.wsCloseTimeout = null; - return; - } - this.wsCloseTimeout = setTimeout(() => { - this.setWsCloseTimeout(-1); - this.debug(`[WebSocket] Close Emitted: ${this.closeEmitted}`); - // Check if close event was emitted. - if (this.closeEmitted) { - this.debug( - `[WebSocket] was closed. | WS State: ${CONNECTION_STATE[this.connection?.readyState ?? WebSocket.CLOSED]}`, - ); - // Setting the variable false to check for zombie connections. - this.closeEmitted = false; - return; - } - - this.debug( - `[WebSocket] did not close properly, assuming a zombie connection.\nEmitting close and reconnecting again.`, - ); - - // Cleanup connection listeners - if (this.connection) this._cleanupConnection(); - - this.emitClose({ - code: 4009, - reason: 'Session time out.', - wasClean: false, - }); - }, time); - } - - /** - * Sets the heartbeat timer for this shard. - * @param {number} time If -1, clears the interval, any other number sets an interval - * @private - */ - setHeartbeatTimer(time) { - if (time === -1) { - if (this.heartbeatInterval) { - this.debug('Clearing the heartbeat interval.'); - clearInterval(this.heartbeatInterval); - this.heartbeatInterval = null; - } - return; - } - this.debug(`Setting a heartbeat interval for ${time}ms.`); - // Sanity checks - if (this.heartbeatInterval) clearInterval(this.heartbeatInterval); - this.heartbeatInterval = setInterval(() => this.sendHeartbeat(), time).unref(); - } - - /** - * Sends a heartbeat to the WebSocket. - * If this shard didn't receive a heartbeat last time, it will destroy it and reconnect - * @param {string} [tag='HeartbeatTimer'] What caused this heartbeat to be sent - * @param {boolean} [ignoreHeartbeatAck] If we should send the heartbeat forcefully. - * @private - */ - sendHeartbeat( - tag = 'HeartbeatTimer', - ignoreHeartbeatAck = [Status.WaitingForGuilds, Status.Identifying, Status.Resuming].includes(this.status), - ) { - if (ignoreHeartbeatAck && !this.lastHeartbeatAcked) { - this.debug(`[${tag}] Didn't process heartbeat ack yet but we are still connected. Sending one now.`); - } else if (!this.lastHeartbeatAcked) { - this.debug( - `[${tag}] Didn't receive a heartbeat ack last time, assuming zombie connection. Destroying and reconnecting. - Status : ${STATUS_KEYS[this.status]} - Sequence : ${this.sequence} - Connection State: ${this.connection ? CONNECTION_STATE[this.connection.readyState] : 'No Connection??'}`, - ); - - this.destroy({ reset: true, closeCode: 4009 }); - return; - } - - this.debug(`[${tag}] Sending a heartbeat.`); - this.lastHeartbeatAcked = false; - this.lastPingTimestamp = Date.now(); - this.send({ op: GatewayOpcodes.Heartbeat, d: this.sequence }, true); - } - - /** - * Acknowledges a heartbeat. - * @private - */ - ackHeartbeat() { - this.lastHeartbeatAcked = true; - const latency = Date.now() - this.lastPingTimestamp; - this.debug(`Heartbeat acknowledged, latency of ${latency}ms.`); - this.ping = latency; - } - - /** - * Identifies the client on the connection. - * @private - * @returns {void} - */ - identify() { - return this.sessionId ? this.identifyResume() : this.identifyNew(); - } - - /** - * Identifies as a new connection on the gateway. - * @private - */ - identifyNew() { - const { client } = this.manager; - if (!client.token) { - this.debug('[IDENTIFY] No token available to identify a new session.'); - return; - } - - this.status = Status.Identifying; - - // Clone the identify payload and assign the token and shard info - const d = { - ...client.options.ws, - intents: client.options.intents.bitfield, - token: client.token, - shard: [this.id, Number(client.options.shardCount)], - }; - - this.debug(`[IDENTIFY] Shard ${this.id}/${client.options.shardCount} with intents: ${d.intents}`); - this.send({ op: GatewayOpcodes.Identify, d }, true); - } - - /** - * Resumes a session on the gateway. - * @private - */ - identifyResume() { - if (!this.sessionId) { - this.debug('[RESUME] No session id was present; identifying as a new session.'); - this.identifyNew(); - return; - } - - this.status = Status.Resuming; - - this.debug(`[RESUME] Session ${this.sessionId}, sequence ${this.closeSequence}`); - - const d = { - token: this.manager.client.token, - session_id: this.sessionId, - seq: this.closeSequence, - }; - - this.send({ op: GatewayOpcodes.Resume, d }, true); - } - /** * Adds a packet to the queue to be sent to the gateway. * If you use this method, make sure you understand that you need to provide @@ -743,161 +204,17 @@ class WebSocketShard extends EventEmitter { * Do not use this method if you don't know what you're doing. * @param {Object} data The full packet to send * @param {boolean} [important=false] If this packet should be added first in queue + * This parameter is **deprecated**. Important payloads are determined by their opcode instead. */ send(data, important = false) { - this.ratelimit.queue[important ? 'unshift' : 'push'](data); - this.processQueue(); - } - - /** - * Sends data, bypassing the queue. - * @param {Object} data Packet to send - * @returns {void} - * @private - */ - _send(data) { - if (this.connection?.readyState !== WebSocket.OPEN) { - this.debug( - `Tried to send packet '${JSON.stringify(data).replaceAll( - this.manager.client.token, - this.manager.client._censoredToken, - )}' but no WebSocket is available!`, + if (important) { + process.emitWarning( + 'Sending important payloads explicitly is deprecated. They are determined by their opcode implicitly now.', + 'DeprecationWarning', ); - this.destroy({ closeCode: 4_000 }); - return; - } - - this.connection.send(WebSocket.pack(data), err => { - if (err) this.manager.client.emit(Events.ShardError, err, this.id); - }); - } - - /** - * Processes the current WebSocket queue. - * @returns {void} - * @private - */ - processQueue() { - if (this.ratelimit.remaining === 0) return; - if (this.ratelimit.queue.length === 0) return; - if (this.ratelimit.remaining === this.ratelimit.total) { - this.ratelimit.timer = setTimeout(() => { - this.ratelimit.remaining = this.ratelimit.total; - this.processQueue(); - }, this.ratelimit.time).unref(); - } - while (this.ratelimit.remaining > 0) { - const item = this.ratelimit.queue.shift(); - if (!item) return; - this._send(item); - this.ratelimit.remaining--; - } - } - - /** - * Destroys this shard and closes its WebSocket connection. - * @param {Object} [options={ closeCode: 1000, reset: false, emit: true, log: true }] Options for destroying the shard - * @private - */ - destroy({ closeCode = 1_000, reset = false, emit = true, log = true } = {}) { - if (log) { - this.debug(`[DESTROY] - Close Code : ${closeCode} - Reset : ${reset} - Emit DESTROYED: ${emit}`); - } - - // Step 0: Remove all timers - this.setHeartbeatTimer(-1); - this.setHelloTimeout(-1); - - this.debug( - `[WebSocket] Destroy: Attempting to close the WebSocket. | WS State: ${ - CONNECTION_STATE[this.connection?.readyState ?? WebSocket.CLOSED] - }`, - ); - // Step 1: Close the WebSocket connection, if any, otherwise, emit DESTROYED - if (this.connection) { - // If the connection is currently opened, we will (hopefully) receive close - if (this.connection.readyState === WebSocket.OPEN) { - this.connection.close(closeCode); - this.debug(`[WebSocket] Close: Tried closing. | WS State: ${CONNECTION_STATE[this.connection.readyState]}`); - } else { - // Connection is not OPEN - this.debug(`WS State: ${CONNECTION_STATE[this.connection.readyState]}`); - // Remove listeners from the connection - this._cleanupConnection(); - // Attempt to close the connection just in case - try { - this.connection.close(closeCode); - } catch (err) { - this.debug( - `[WebSocket] Close: Something went wrong while closing the WebSocket: ${ - err.message || err - }. Forcefully terminating the connection | WS State: ${CONNECTION_STATE[this.connection.readyState]}`, - ); - this.connection.terminate(); - } - - // Emit the destroyed event if needed - if (emit) this._emitDestroyed(); - } - } else if (emit) { - // We requested a destroy, but we had no connection. Emit destroyed - this._emitDestroyed(); - } - - this.debug( - `[WebSocket] Adding a WebSocket close timeout to ensure a correct WS reconnect. - Timeout: ${this.manager.client.options.closeTimeout}ms`, - ); - this.setWsCloseTimeout(this.manager.client.options.closeTimeout); - - // Step 2: Null the connection object - this.connection = null; - - // Step 3: Set the shard status to disconnected - this.status = Status.Disconnected; - - // Step 4: Cache the old sequence (use to attempt a resume) - if (this.sequence !== -1) this.closeSequence = this.sequence; - - // Step 5: Reset the sequence, resume url and session id if requested - if (reset) { - this.sequence = -1; - this.sessionId = null; - this.resumeURL = null; - } - - // Step 6: reset the rate limit data - this.ratelimit.remaining = this.ratelimit.total; - this.ratelimit.queue.length = 0; - if (this.ratelimit.timer) { - clearTimeout(this.ratelimit.timer); - this.ratelimit.timer = null; } - } - - /** - * Cleans up the WebSocket connection listeners. - * @private - */ - _cleanupConnection() { - this.connection.onopen = this.connection.onclose = this.connection.onmessage = null; - this.connection.onerror = () => null; - } - - /** - * Emits the DESTROYED event on the shard - * @private - */ - _emitDestroyed() { - /** - * Emitted when a shard is destroyed, but no WebSocket connection was present. - * @private - * @event WebSocketShard#destroyed - */ - this.emit(WebSocketShardEvents.Destroyed); + this.manager._ws.send(this.id, data); + this.processQueue(); } } diff --git a/packages/discord.js/typings/index.d.ts b/packages/discord.js/typings/index.d.ts index 128d20839db8f..2bb9b6280a8df 100644 --- a/packages/discord.js/typings/index.d.ts +++ b/packages/discord.js/typings/index.d.ts @@ -3269,11 +3269,8 @@ export class WebhookClient extends WebhookMixin(BaseClient) { export class WebSocketManager extends EventEmitter { private constructor(client: Client); - private totalShards: number | string; - private shardQueue: Set; private readonly packetQueue: unknown[]; private destroyed: boolean; - private reconnecting: boolean; public readonly client: Client; public gateway: string | null; @@ -3286,8 +3283,6 @@ export class WebSocketManager extends EventEmitter { private debug(message: string, shard?: WebSocketShard): void; private connect(): Promise; - private createShards(): Promise; - private reconnect(): Promise; private broadcast(packet: unknown): void; private destroy(): void; private handlePacket(packet?: unknown, shard?: WebSocketShard): boolean; diff --git a/yarn.lock b/yarn.lock index 8b8a2208d346f..e172a46096166 100644 --- a/yarn.lock +++ b/yarn.lock @@ -8453,6 +8453,7 @@ __metadata: "@discordjs/formatters": "workspace:^" "@discordjs/rest": "workspace:^" "@discordjs/util": "workspace:^" + "@discordjs/ws": "workspace:^" "@favware/cliff-jumper": ^1.10.0 "@sapphire/snowflake": ^3.4.0 "@types/node": 16.18.11