Skip to content

Commit

Permalink
feat(WebSocketManager): use /ws package internally
Browse files Browse the repository at this point in the history
  • Loading branch information
Qjuh committed Jan 30, 2023
1 parent 8b70f49 commit 78cc7cf
Show file tree
Hide file tree
Showing 5 changed files with 137 additions and 892 deletions.
1 change: 1 addition & 0 deletions packages/discord.js/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
"@discordjs/formatters": "workspace:^",
"@discordjs/rest": "workspace:^",
"@discordjs/util": "workspace:^",
"@discordjs/ws": "workspace:^",
"@sapphire/snowflake": "^3.4.0",
"@types/ws": "^8.5.4",
"discord-api-types": "^0.37.28",
Expand Down
279 changes: 105 additions & 174 deletions packages/discord.js/src/client/websocket/WebSocketManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@

const EventEmitter = require('node:events');
const { setImmediate } = require('node:timers');
const { setTimeout: sleep } = require('node:timers/promises');
const { Collection } = require('@discordjs/collection');
const { GatewayCloseCodes, GatewayDispatchEvents, Routes } = require('discord-api-types/v10');
const {
WebSocketManager: WSWebSocketManager,
WebSocketShardEvents: WSWebSocketShardEvents,
CloseCodes,
} = require('@discordjs/ws');
const { GatewayCloseCodes, GatewayDispatchEvents } = require('discord-api-types/v10');
const WebSocketShard = require('./WebSocketShard');
const PacketHandlers = require('./handlers');
const { DiscordjsError, ErrorCodes } = require('../../errors');
Expand All @@ -22,16 +26,6 @@ const BeforeReadyWhitelist = [
GatewayDispatchEvents.GuildMemberRemove,
];

const unrecoverableErrorCodeMap = {
[GatewayCloseCodes.AuthenticationFailed]: ErrorCodes.TokenInvalid,
[GatewayCloseCodes.InvalidShard]: ErrorCodes.ShardingInvalid,
[GatewayCloseCodes.ShardingRequired]: ErrorCodes.ShardingRequired,
[GatewayCloseCodes.InvalidIntents]: ErrorCodes.InvalidIntents,
[GatewayCloseCodes.DisallowedIntents]: ErrorCodes.DisallowedIntents,
};

const UNRESUMABLE_CLOSE_CODES = [1000, GatewayCloseCodes.AlreadyAuthenticated, GatewayCloseCodes.InvalidSeq];

/**
* The WebSocket manager for this client.
* <info>This class forwards raw dispatch events,
Expand All @@ -56,27 +50,12 @@ class WebSocketManager extends EventEmitter {
*/
this.gateway = null;

/**
* The amount of shards this manager handles
* @private
* @type {number}
*/
this.totalShards = this.client.options.shards.length;

/**
* A collection of all shards this manager handles
* @type {Collection<number, WebSocketShard>}
*/
this.shards = new Collection();

/**
* An array of shards to be connected or that need to reconnect
* @type {Set<WebSocketShard>}
* @private
* @name WebSocketManager#shardQueue
*/
Object.defineProperty(this, 'shardQueue', { value: new Set(), writable: true });

/**
* An array of queued events before this WebSocketManager became ready
* @type {Object[]}
Expand All @@ -98,12 +77,7 @@ class WebSocketManager extends EventEmitter {
*/
this.destroyed = false;

/**
* If this manager is currently reconnecting one or multiple shards
* @type {boolean}
* @private
*/
this.reconnecting = false;
this._ws = null;
}

/**
Expand All @@ -119,11 +93,11 @@ class WebSocketManager extends EventEmitter {
/**
* Emits a debug message.
* @param {string} message The debug message
* @param {?WebSocketShard} [shard] The shard that emitted this message, if any
* @param {?number} [shardId] The id of the shard that emitted this message, if any
* @private
*/
debug(message, shard) {
this.client.emit(Events.Debug, `[WS => ${shard ? `Shard ${shard.id}` : 'Manager'}] ${message}`);
debug(message, shardId) {
this.client.emit(Events.Debug, `[WS => ${shardId ? `Shard ${shardId}` : 'Manager'}] ${message}`);
}

/**
Expand All @@ -132,11 +106,30 @@ class WebSocketManager extends EventEmitter {
*/
async connect() {
const invalidToken = new DiscordjsError(ErrorCodes.TokenInvalid);
const { shards, shardCount, intents, ws } = this.client.options;
if (this._ws && this._ws.options.token !== this.client.token) {
await this._ws.destroy({ code: 1000, reason: 'Login with differing token requested' });
this._ws = null;
}
if (!this._ws) {
this._ws = new WSWebSocketManager({
intents: intents.bitfield,
rest: this.client.rest,
token: this.client.token,
largeThreshold: ws.large_threshold,
version: ws.version,
shardIds: shards === 'auto' ? null : shards,
shardCount: shards === 'auto' ? null : shardCount,
initialPresence: ws.presence,
});
this.attachEvents();
}

const {
url: gatewayURL,
shards: recommendedShards,
session_start_limit: sessionStartLimit,
} = await this.client.rest.get(Routes.gatewayBot()).catch(error => {
} = await this._ws.fetchGatewayInformation().catch(error => {
throw error.status === 401 ? invalidToken : error;
});

Expand All @@ -152,156 +145,95 @@ class WebSocketManager extends EventEmitter {

this.gateway = `${gatewayURL}/`;

let { shards } = this.client.options;

if (shards === 'auto') {
this.debug(`Using the recommended shard count provided by Discord: ${recommendedShards}`);
this.totalShards = this.client.options.shardCount = recommendedShards;
shards = this.client.options.shards = Array.from({ length: recommendedShards }, (_, i) => i);
}

this.totalShards = shards.length;
this.debug(`Spawning shards: ${shards.join(', ')}`);
this.shardQueue = new Set(shards.map(id => new WebSocketShard(this, id)));

return this.createShards();
}

/**
* Handles the creation of a shard.
* @returns {Promise<boolean>}
* @private
*/
async createShards() {
// If we don't have any shards to handle, return
if (!this.shardQueue.size) return false;

const [shard] = this.shardQueue;

this.shardQueue.delete(shard);
await this._ws.connect();

if (!shard.eventsAttached) {
shard.on(WebSocketShardEvents.AllReady, unavailableGuilds => {
/**
* Emitted when a shard turns ready.
* @event Client#shardReady
* @param {number} id The shard id that turned ready
* @param {?Set<Snowflake>} unavailableGuilds Set of unavailable guild ids, if any
*/
this.client.emit(Events.ShardReady, shard.id, unavailableGuilds);

if (!this.shardQueue.size) this.reconnecting = false;
this.checkShardsReady();
});
this.totalShards = this.client.options.shardCount = await this._ws.getShardCount();
this.client.options.shards = await this._ws.getShardIds();
for (const id of this.client.options.shards) {
if (!this.shards.has(id)) {
const shard = new WebSocketShard(this, id);
this.shards.set(id, shard);

shard.on(WebSocketShardEvents.Close, event => {
if (event.code === 1_000 ? this.destroyed : event.code in unrecoverableErrorCodeMap) {
shard.on(WebSocketShardEvents.AllReady, unavailableGuilds => {
/**
* Emitted when a shard's WebSocket disconnects and will no longer reconnect.
* @event Client#shardDisconnect
* @param {CloseEvent} event The WebSocket close event
* @param {number} id The shard id that disconnected
* Emitted when a shard turns ready.
* @event Client#shardReady
* @param {number} id The shard id that turned ready
* @param {?Set<Snowflake>} unavailableGuilds Set of unavailable guild ids, if any
*/
this.client.emit(Events.ShardDisconnect, event, shard.id);
this.debug(GatewayCloseCodes[event.code], shard);
return;
}
this.client.emit(Events.ShardReady, shard.id, unavailableGuilds);

if (UNRESUMABLE_CLOSE_CODES.includes(event.code)) {
// These event codes cannot be resumed
shard.sessionId = null;
}

/**
* Emitted when a shard is attempting to reconnect or re-identify.
* @event Client#shardReconnecting
* @param {number} id The shard id that is attempting to reconnect
*/
this.client.emit(Events.ShardReconnecting, shard.id);

this.shardQueue.add(shard);

if (shard.sessionId) this.debug(`Session id is present, attempting an immediate reconnect...`, shard);
this.reconnect();
});

shard.on(WebSocketShardEvents.InvalidSession, () => {
this.client.emit(Events.ShardReconnecting, shard.id);
});

shard.on(WebSocketShardEvents.Destroyed, () => {
this.debug('Shard was destroyed but no WebSocket connection was present! Reconnecting...', shard);

this.client.emit(Events.ShardReconnecting, shard.id);

this.shardQueue.add(shard);
this.reconnect();
});

shard.eventsAttached = true;
}

this.shards.set(shard.id, shard);

try {
await shard.connect();
} catch (error) {
if (error?.code && error.code in unrecoverableErrorCodeMap) {
throw new DiscordjsError(unrecoverableErrorCodeMap[error.code]);
// Undefined if session is invalid, error event for regular closes
} else if (!error || error.code) {
this.debug('Failed to connect to the gateway, requeueing...', shard);
this.shardQueue.add(shard);
} else {
throw error;
this.checkShardsReady();
});
}
}
// If we have more shards, add a 5s delay
if (this.shardQueue.size) {
this.debug(`Shard Queue Size: ${this.shardQueue.size}; continuing in 5 seconds...`);
await sleep(5_000);
return this.createShards();
}

return true;
}

/**
* Handles reconnects for this manager.
* Attaches event handlers to the internal WebSocketShardManager from `@discordjs/ws`.
* @private
* @returns {Promise<boolean>}
*/
async reconnect() {
if (this.reconnecting || this.status !== Status.Ready) return false;
this.reconnecting = true;
try {
await this.createShards();
} catch (error) {
this.debug(`Couldn't reconnect or fetch information about the gateway. ${error}`);
if (error.httpStatus !== 401) {
this.debug(`Possible network error occurred. Retrying in 5s...`);
await sleep(5_000);
this.reconnecting = false;
return this.reconnect();
attachEvents() {
this._ws.on(WSWebSocketShardEvents.Debug, ({ message, shardId }) => this.debug(message, shardId));
this._ws.on(WSWebSocketShardEvents.Dispatch, ({ packet, shardId }) => {
this.client.emit(Events.Raw, packet, shardId);
const shard = this.shards.get(shardId);
this.handlePacket(packet, shard);
if (shard.status === Status.WaitingForGuilds && packet.t === GatewayDispatchEvents.GuildCreate) {
shard.onGuildPacket(packet);
}
// If we get an error at this point, it means we cannot reconnect anymore
if (this.client.listenerCount(Events.Invalidated)) {
});

this._ws.on(WSWebSocketShardEvents.Ready, ({ data, shardId }) => {
this.shards.get(shardId).onReadyPacket(data);
});

this._ws.on(WSWebSocketShardEvents.Closed, ({ code, reason = '', shardId }) => {
this.shards.get(shardId).status = code === CloseCodes.Resuming ? Status.Resuming : Status.Disconnected;
if (code === CloseCodes.Normal && this.destroyed) {
/**
* Emitted when the client's session becomes invalidated.
* You are expected to handle closing the process gracefully and preventing a boot loop
* if you are listening to this event.
* @event Client#invalidated
* Emitted when a shard's WebSocket disconnects and will no longer reconnect.
* @event Client#shardDisconnect
* @param {CloseEvent} event The WebSocket close event
* @param {number} id The shard id that disconnected
*/
this.client.emit(Events.Invalidated);
// Destroy just the shards. This means you have to handle the cleanup yourself
this.destroy();
} else {
this.client.destroy();
this.client.emit(Events.ShardDisconnect, { code, reason, wasClean: true }, shardId);
this.debug(GatewayCloseCodes[code], shardId);
return;
}
} finally {
this.reconnecting = false;
}
return true;

/**
* Emitted when a shard is attempting to reconnect or re-identify.
* @event Client#shardReconnecting
* @param {number} id The shard id that is attempting to reconnect
*/
this.client.emit(Events.ShardReconnecting, shardId);
});

this._ws.on(WSWebSocketShardEvents.Resumed, ({ shardId }) => {
/**
* Emitted when the shard resumes successfully
* @event WebSocketShard#resumed
*/
this.shards.get(shardId).emit(WebSocketShardEvents.Resumed);
});

this._ws.on(WSWebSocketShardEvents.HeartbeatComplete, ({ heartbeatAt, latency, shardId }) => {
const shard = this.shards.get(shardId);
shard.lastPingTimestamp = heartbeatAt;
shard.ping = latency;
});

// TODO: refactor once error event gets exposed publicly
this._ws.on('error', ({ err, shardId }) => {
/**
* Emitted whenever a shard's WebSocket encounters a connection error.
* @event Client#shardError
* @param {Error} error The encountered error
* @param {number} shardId The shard that encountered this error
*/
this.client.emit(Events.ShardError, err, shardId);
});
}

/**
Expand All @@ -310,7 +242,7 @@ class WebSocketManager extends EventEmitter {
* @private
*/
broadcast(packet) {
for (const shard of this.shards.values()) shard.send(packet);
for (const shardId of this.shards.keys()) this._ws.send(shardId, packet);
}

/**
Expand All @@ -322,8 +254,7 @@ class WebSocketManager extends EventEmitter {
// TODO: Make a util for getting a stack
this.debug(`Manager was destroyed. Called by:\n${new Error().stack}`);
this.destroyed = true;
this.shardQueue.clear();
for (const shard of this.shards.values()) shard.destroy({ closeCode: 1_000, reset: true, emit: false, log: false });
this._ws.destroy({ code: 1_000 });
}

/**
Expand Down
Loading

0 comments on commit 78cc7cf

Please sign in to comment.