Skip to content

Commit

Permalink
fix(WebSocketShard): send ratelimit handling (#8887)
Browse files Browse the repository at this point in the history
* fix(WebSocketShard): send ratelimit handling

* chore: remove unnecessary else

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
  • Loading branch information
didinele and kodiakhq[bot] authored Dec 1, 2022
1 parent 322cb99 commit 40b504a
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 21 deletions.
8 changes: 8 additions & 0 deletions packages/ws/src/utils/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { Collection } from '@discordjs/collection';
import { lazy } from '@discordjs/util';
import { APIVersion, GatewayOpcodes } from 'discord-api-types/v10';
import type { OptionalWebSocketManagerOptions, SessionInfo } from '../ws/WebSocketManager.js';
import type { SendRateLimitState } from '../ws/WebSocketShard.js';

/**
* Valid encoding types
Expand Down Expand Up @@ -60,3 +61,10 @@ export const ImportantGatewayOpcodes = new Set([
GatewayOpcodes.Identify,
GatewayOpcodes.Resume,
]);

export function getInitialSendRateLimitState(): SendRateLimitState {
return {
remaining: 120,
resetAt: Date.now() + 60_000,
};
}
76 changes: 55 additions & 21 deletions packages/ws/src/ws/WebSocketShard.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,14 @@ import {
import { WebSocket, type RawData } from 'ws';
import type { Inflate } from 'zlib-sync';
import type { IContextFetchingStrategy } from '../strategies/context/IContextFetchingStrategy';
import { ImportantGatewayOpcodes } from '../utils/constants.js';
import { getInitialSendRateLimitState, ImportantGatewayOpcodes } from '../utils/constants.js';
import type { SessionInfo } from './WebSocketManager.js';

// eslint-disable-next-line promise/prefer-await-to-then
const getZlibSync = lazy(async () => import('zlib-sync').then((mod) => mod.default).catch(() => null));

export enum WebSocketShardEvents {
Closed = 'closed',
Debug = 'debug',
Dispatch = 'dispatch',
Hello = 'hello',
Expand All @@ -51,6 +52,7 @@ export enum WebSocketShardDestroyRecovery {

// eslint-disable-next-line @typescript-eslint/consistent-type-definitions
export type WebSocketShardEventsMap = {
[WebSocketShardEvents.Closed]: [{ code: number }];
[WebSocketShardEvents.Debug]: [payload: { message: string }];
[WebSocketShardEvents.Hello]: [];
[WebSocketShardEvents.Ready]: [payload: { data: GatewayReadyDispatchData }];
Expand All @@ -69,6 +71,11 @@ export enum CloseCodes {
Resuming = 4_200,
}

export interface SendRateLimitState {
remaining: number;
resetAt: number;
}

export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
private connection: WebSocket | null = null;

Expand All @@ -86,10 +93,7 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {

private isAck = true;

private sendRateLimitState = {
remaining: 120,
resetAt: Date.now(),
};
private sendRateLimitState: SendRateLimitState = getInitialSendRateLimitState();

private heartbeatInterval: NodeJS.Timer | null = null;

Expand Down Expand Up @@ -146,6 +150,8 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {

this.status = WebSocketShardStatus.Connecting;

this.sendRateLimitState = getInitialSendRateLimitState();

await this.waitForEvent(WebSocketShardEvents.Hello, this.strategy.options.helloTimeout);

if (session?.shardCount === this.strategy.options.shardCount) {
Expand Down Expand Up @@ -187,22 +193,32 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
await this.strategy.updateSessionInfo(this.id, null);
}

if (
this.connection &&
(this.connection.readyState === WebSocket.OPEN || this.connection.readyState === WebSocket.CONNECTING)
) {
if (this.connection) {
// No longer need to listen to messages
this.connection.removeAllListeners('message');
// Prevent a reconnection loop by unbinding the main close event
this.connection.removeAllListeners('close');
this.connection.close(options.code, options.reason);

// Actually wait for the connection to close
await once(this.connection, 'close');
const shouldClose =
this.connection.readyState === WebSocket.OPEN || this.connection.readyState === WebSocket.CONNECTING;

this.debug([
'Connection status during destroy',
`Needs closing: ${shouldClose}`,
`Ready state: ${this.connection.readyState}`,
]);

if (shouldClose) {
this.connection.close(options.code, options.reason);
await once(this.connection, 'close');
this.emit(WebSocketShardEvents.Closed, { code: options.code });
}

// Lastly, remove the error event.
// Doing this earlier would cause a hard crash in case an error event fired on our `close` call
this.connection.removeAllListeners('error');
} else {
this.debug(['Destroying a shard that has no connection; please open an issue on GitHub']);
}

this.status = WebSocketShardStatus.Idle;
Expand All @@ -227,26 +243,44 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
}
}

public async send(payload: GatewaySendPayload) {
public async send(payload: GatewaySendPayload): Promise<void> {
if (!this.connection) {
throw new Error("WebSocketShard wasn't connected");
}

if (this.status !== WebSocketShardStatus.Ready && !ImportantGatewayOpcodes.has(payload.op)) {
this.debug(['Tried to send a non-crucial payload before the shard was ready, waiting']);
await once(this, WebSocketShardEvents.Ready);
}

await this.sendQueue.wait();

if (--this.sendRateLimitState.remaining <= 0) {
if (this.sendRateLimitState.resetAt < Date.now()) {
await sleep(Date.now() - this.sendRateLimitState.resetAt);
const now = Date.now();

if (this.sendRateLimitState.resetAt > now) {
const sleepFor = this.sendRateLimitState.resetAt - now;

this.debug([`Was about to hit the send rate limit, sleeping for ${sleepFor}ms`]);
const controller = new AbortController();

// Sleep for the remaining time, but if the connection closes in the meantime, we shouldn't wait the remainder to avoid blocking the new conn
const interrupted = await Promise.race([
sleep(sleepFor).then(() => false),
once(this, WebSocketShardEvents.Closed, { signal: controller.signal }).then(() => true),
]);

if (interrupted) {
this.debug(['Connection closed while waiting for the send rate limit to reset, re-queueing payload']);
this.sendQueue.shift();
return this.send(payload);
}

// This is so the listener from the `once` call is removed
controller.abort();
}

this.sendRateLimitState = {
remaining: 119,
resetAt: Date.now() + 60_000,
};
this.sendRateLimitState = getInitialSendRateLimitState();
}

this.sendQueue.shift();
Expand Down Expand Up @@ -476,9 +510,10 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
}

private async onClose(code: number) {
this.emit(WebSocketShardEvents.Closed, { code });

switch (code) {
case CloseCodes.Normal: {
this.debug([`Disconnected normally from code ${code}`]);
return this.destroy({
code,
reason: 'Got disconnected by Discord',
Expand All @@ -487,7 +522,6 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
}

case CloseCodes.Resuming: {
this.debug([`Disconnected normally from code ${code}`]);
break;
}

Expand Down

0 comments on commit 40b504a

Please sign in to comment.