Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(handlers): create burst handler for interaction callbacks #8996

Merged
merged 5 commits into from
Mar 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 139 additions & 0 deletions packages/rest/__tests__/BurstHandler.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/* eslint-disable id-length */
/* eslint-disable promise/prefer-await-to-then */
import { performance } from 'node:perf_hooks';
import { MockAgent, setGlobalDispatcher } from 'undici';
import type { Interceptable, MockInterceptor } from 'undici/types/mock-interceptor';
import { beforeEach, afterEach, test, expect, vitest } from 'vitest';
import { DiscordAPIError, HTTPError, RateLimitError, REST, BurstHandlerMajorIdKey } from '../src/index.js';
import { BurstHandler } from '../src/lib/handlers/BurstHandler.js';
import { genPath } from './util.js';

const callbackKey = `Global(POST:/interactions/:id/:token/callback):${BurstHandlerMajorIdKey}`;
const callbackPath = new RegExp(genPath('/interactions/[0-9]{17,19}/.+/callback'));

const api = new REST();

let mockAgent: MockAgent;
let mockPool: Interceptable;

beforeEach(() => {
mockAgent = new MockAgent();
mockAgent.disableNetConnect();
setGlobalDispatcher(mockAgent);

mockPool = mockAgent.get('https://discord.com');
api.setAgent(mockAgent);
});

afterEach(async () => {
await mockAgent.close();
});

// @discordjs/rest uses the `content-type` header to detect whether to parse
// the response as JSON or as an ArrayBuffer.
const responseOptions: MockInterceptor.MockResponseOptions = {
headers: {
'content-type': 'application/json',
},
};

test('Interaction callback creates burst handler', async () => {
mockPool.intercept({ path: callbackPath, method: 'POST' }).reply(200);

expect(api.requestManager.handlers.get(callbackKey)).toBe(undefined);
expect(
await api.post('/interactions/1234567890123456789/totallyarealtoken/callback', {
auth: false,
body: { type: 4, data: { content: 'Reply' } },
}),
).toBeInstanceOf(Uint8Array);
expect(api.requestManager.handlers.get(callbackKey)).toBeInstanceOf(BurstHandler);
});

test('Requests are handled in bursts', async () => {
mockPool.intercept({ path: callbackPath, method: 'POST' }).reply(200).delay(100).times(3);

// Return the current time on these results as their response does not indicate anything
const [a, b, c] = await Promise.all([
api
.post('/interactions/1234567890123456789/totallyarealtoken/callback', {
auth: false,
body: { type: 4, data: { content: 'Reply1' } },
})
.then(() => performance.now()),
api
.post('/interactions/2345678901234567890/anotherveryrealtoken/callback', {
auth: false,
body: { type: 4, data: { content: 'Reply2' } },
})
.then(() => performance.now()),
api
.post('/interactions/3456789012345678901/nowaytheresanotherone/callback', {
auth: false,
body: { type: 4, data: { content: 'Reply3' } },
})
.then(() => performance.now()),
]);

expect(b - a).toBeLessThan(10);
expect(c - a).toBeLessThan(10);
});

test('Handle 404', async () => {
mockPool
.intercept({ path: callbackPath, method: 'POST' })
.reply(404, { message: 'Unknown interaction', code: 10_062 }, responseOptions);

const promise = api.post('/interactions/1234567890123456788/definitelynotarealinteraction/callback', {
auth: false,
body: { type: 4, data: { content: 'Malicious' } },
});
await expect(promise).rejects.toThrowError('Unknown interaction');
await expect(promise).rejects.toBeInstanceOf(DiscordAPIError);
});

let unexpected429 = true;
test('Handle unexpected 429', async () => {
mockPool
.intercept({
path: callbackPath,
method: 'POST',
})
.reply(() => {
if (unexpected429) {
unexpected429 = false;
return {
statusCode: 429,
data: '',
responseOptions: {
headers: {
'retry-after': '1',
via: '1.1 google',
},
},
};
}

return {
statusCode: 200,
data: { test: true },
responseOptions,
};
})
.times(2);

const previous = performance.now();
let firstResolvedTime: number;
const unexpectedLimit = api
.post('/interactions/1234567890123456789/totallyarealtoken/callback', {
auth: false,
body: { type: 4, data: { content: 'Reply' } },
})
.then((res) => {
firstResolvedTime = performance.now();
return res;
});

expect(await unexpectedLimit).toStrictEqual({ test: true });
expect(performance.now()).toBeGreaterThanOrEqual(previous + 1_000);
});
6 changes: 4 additions & 2 deletions packages/rest/__tests__/REST.test.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
import { Buffer } from 'node:buffer';
import { Buffer, File as NativeFile } from 'node:buffer';
import { URLSearchParams } from 'node:url';
import { DiscordSnowflake } from '@sapphire/snowflake';
import type { Snowflake } from 'discord-api-types/v10';
import { Routes } from 'discord-api-types/v10';
import type { FormData } from 'undici';
import { File, MockAgent, setGlobalDispatcher } from 'undici';
import { File as UndiciFile, MockAgent, setGlobalDispatcher } from 'undici';
import type { Interceptable, MockInterceptor } from 'undici/types/mock-interceptor';
import { beforeEach, afterEach, test, expect } from 'vitest';
import { REST } from '../src/index.js';
import { genPath } from './util.js';

const File = NativeFile ?? UndiciFile;

const newSnowflake: Snowflake = DiscordSnowflake.generate().toString();

const api = new REST().setToken('A-Very-Fake-Token');
Expand Down
22 changes: 20 additions & 2 deletions packages/rest/src/lib/RequestManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,16 @@ import { lazy } from '@discordjs/util';
import { DiscordSnowflake } from '@sapphire/snowflake';
import { FormData, type RequestInit, type BodyInit, type Dispatcher, type Agent } from 'undici';
import type { RESTOptions, RestEvents, RequestOptions } from './REST.js';
import { BurstHandler } from './handlers/BurstHandler.js';
import type { IHandler } from './handlers/IHandler.js';
import { SequentialHandler } from './handlers/SequentialHandler.js';
import { DefaultRestOptions, DefaultUserAgent, OverwrittenMimeTypes, RESTEvents } from './utils/constants.js';
import {
BurstHandlerMajorIdKey,
DefaultRestOptions,
DefaultUserAgent,
OverwrittenMimeTypes,
RESTEvents,
} from './utils/constants.js';
import { resolveBody } from './utils/utils.js';

// Make this a lazy dynamic import as file-type is a pure ESM package
Expand Down Expand Up @@ -351,7 +358,10 @@ export class RequestManager extends EventEmitter {
*/
private createHandler(hash: string, majorParameter: string) {
// Create the async request queue to handle requests
const queue = new SequentialHandler(this, hash, majorParameter);
const queue =
majorParameter === BurstHandlerMajorIdKey
? new BurstHandler(this, hash, majorParameter)
: new SequentialHandler(this, hash, majorParameter);
// Save the queue based on its id
this.handlers.set(queue.id, queue);

Expand Down Expand Up @@ -499,6 +509,14 @@ export class RequestManager extends EventEmitter {
* @internal
*/
private static generateRouteData(endpoint: RouteLike, method: RequestMethod): RouteData {
if (endpoint.startsWith('/interactions/') && endpoint.endsWith('/callback')) {
return {
majorParameter: BurstHandlerMajorIdKey,
bucketRoute: '/interactions/:id/:token/callback',
original: endpoint,
};
}

const majorIdMatch = /^\/(?:channels|guilds|webhooks)\/(\d{17,19})/.exec(endpoint);

// Get the major id for this route - global otherwise
Expand Down
146 changes: 146 additions & 0 deletions packages/rest/src/lib/handlers/BurstHandler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
import { setTimeout as sleep } from 'node:timers/promises';
import type { Dispatcher } from 'undici';
import type { RequestOptions } from '../REST.js';
import type { HandlerRequestData, RequestManager, RouteData } from '../RequestManager.js';
import { RESTEvents } from '../utils/constants.js';
import { onRateLimit, parseHeader } from '../utils/utils.js';
import type { IHandler } from './IHandler.js';
import { handleErrors, incrementInvalidCount, makeNetworkRequest } from './Shared.js';

/**
* The structure used to handle burst requests for a given bucket.
* Burst requests have no ratelimit handling but allow for pre- and post-processing
* of data in the same manner as sequentially queued requests.
*
* @remarks
* This queue may still emit a rate limit error if an unexpected 429 is hit
*/
export class BurstHandler implements IHandler {
/**
* {@inheritdoc IHandler.id}
*/
public readonly id: string;

/**
* {@inheritDoc IHandler.inactive}
*/
public inactive = false;

/**
* @param manager - The request manager
* @param hash - The hash that this RequestHandler handles
* @param majorParameter - The major parameter for this handler
*/
public constructor(
private readonly manager: RequestManager,
private readonly hash: string,
private readonly majorParameter: string,
) {
this.id = `${hash}:${majorParameter}`;
}

/**
* Emits a debug message
*
* @param message - The message to debug
*/
private debug(message: string) {
this.manager.emit(RESTEvents.Debug, `[REST ${this.id}] ${message}`);
}

/**
* {@inheritDoc IHandler.queueRequest}
*/
public async queueRequest(
routeId: RouteData,
url: string,
options: RequestOptions,
requestData: HandlerRequestData,
): Promise<Dispatcher.ResponseData> {
return this.runRequest(routeId, url, options, requestData);
}

/**
* The method that actually makes the request to the API, and updates info about the bucket accordingly
*
* @param routeId - The generalized API route with literal ids for major parameters
* @param url - The fully resolved URL to make the request to
* @param options - The fetch options needed to make the request
* @param requestData - Extra data from the user's request needed for errors and additional processing
* @param retries - The number of retries this request has already attempted (recursion)
*/
private async runRequest(
routeId: RouteData,
url: string,
options: RequestOptions,
requestData: HandlerRequestData,
retries = 0,
): Promise<Dispatcher.ResponseData> {
const method = options.method ?? 'get';

const res = await makeNetworkRequest(this.manager, routeId, url, options, requestData, retries);

// Retry requested
if (res === null) {
// eslint-disable-next-line no-param-reassign
return this.runRequest(routeId, url, options, requestData, ++retries);
}

const status = res.statusCode;
let retryAfter = 0;
const retry = parseHeader(res.headers['retry-after']);

// Amount of time in milliseconds until we should retry if rate limited (globally or otherwise)
if (retry) retryAfter = Number(retry) * 1_000 + this.manager.options.offset;

// Count the invalid requests
if (status === 401 || status === 403 || status === 429) {
incrementInvalidCount(this.manager);
}

if (status >= 200 && status < 300) {
return res;
} else if (status === 429) {
// Unexpected ratelimit
const isGlobal = res.headers['x-ratelimit-global'] !== undefined;
await onRateLimit(this.manager, {
timeToReset: retryAfter,
limit: Number.POSITIVE_INFINITY,
method,
hash: this.hash,
url,
route: routeId.bucketRoute,
majorParameter: this.majorParameter,
global: isGlobal,
});
this.debug(
[
'Encountered unexpected 429 rate limit',
` Global : ${isGlobal}`,
` Method : ${method}`,
` URL : ${url}`,
` Bucket : ${routeId.bucketRoute}`,
` Major parameter: ${routeId.majorParameter}`,
` Hash : ${this.hash}`,
` Limit : ${Number.POSITIVE_INFINITY}`,
` Retry After : ${retryAfter}ms`,
` Sublimit : None`,
].join('\n'),
);

// We are bypassing all other limits, but an encountered limit should be respected (it's probably a non-punished rate limit anyways)
await sleep(retryAfter);

// Since this is not a server side issue, the next request should pass, so we don't bump the retries counter
return this.runRequest(routeId, url, options, requestData, retries);
} else {
const handled = await handleErrors(this.manager, res, method, url, requestData, retries);
if (handled === null) {
// eslint-disable-next-line no-param-reassign
return this.runRequest(routeId, url, options, requestData, ++retries);
}

return handled;
}
}
}
6 changes: 6 additions & 0 deletions packages/rest/src/lib/handlers/IHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,9 @@ export interface IHandler {
requestData: HandlerRequestData,
): Promise<Dispatcher.ResponseData>;
}

export interface PolyFillAbortSignal {
readonly aborted: boolean;
addEventListener(type: 'abort', listener: () => void): void;
removeEventListener(type: 'abort', listener: () => void): void;
}
Loading