Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AJS Retry improvements for 500 and 429, normal and batch #1084

Merged
merged 17 commits into from
Sep 17, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ jest.mock('unfetch', () => {
return fetch
})

import { createError, createSuccess } from '../../../test-helpers/factories'
import batch from '../batched-dispatcher'

const fatEvent = {
Expand Down Expand Up @@ -52,6 +53,7 @@ describe('Batching', () => {
jest.useFakeTimers({
now: new Date('9 Jun 1993 00:00:00Z').getTime(),
})
fetch.mockReturnValue(createSuccess({}))
})

afterEach(() => {
Expand Down
128 changes: 128 additions & 0 deletions packages/browser/src/plugins/segmentio/__tests__/retries.test.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
const fetch = jest.fn()
jest.mock('unfetch', () => {
return fetch
})

import { segmentio, SegmentioSettings } from '..'
import { Analytics } from '../../../core/analytics'
// @ts-ignore isOffline mocked dependency is accused as unused
Expand All @@ -8,11 +13,134 @@ import { scheduleFlush } from '../schedule-flush'
import * as PPQ from '../../../lib/priority-queue/persisted'
import * as PQ from '../../../lib/priority-queue'
import { Context } from '../../../core/context'
import { createError, createSuccess } from '../../../test-helpers/factories'

jest.mock('../schedule-flush')

type QueueType = 'priority' | 'persisted'

describe('Segment.io retries 500s and 429', () => {
let options: SegmentioSettings
let analytics: Analytics
let segment: Plugin
beforeEach(async () => {
jest.resetAllMocks()
jest.restoreAllMocks()

options = { apiKey: 'foo' }
analytics = new Analytics(
{ writeKey: options.apiKey },
{
retryQueue: true,
}
)
segment = await segmentio(analytics, options, {})
await analytics.register(segment, envEnrichment)
MichaelGHSeg marked this conversation as resolved.
Show resolved Hide resolved
})

test('retries on 500', async () => {
fetch.mockImplementation(() => createError({ status: 500 }))

const ctx = await analytics.track('event')

expect(ctx.attempts).toBe(1)
expect(analytics.queue.queue.getAttempts(ctx)).toBe(1)
expect(fetch).toHaveBeenCalledTimes(1)
expect(scheduleFlush).toHaveBeenCalled()
})

test('delays retry on 429', async () => {
const headers = new Headers()
const resetTime = 1
headers.set('x-ratelimit-reset', resetTime.toString())
fetch
.mockReturnValueOnce(
createError({
status: 429,
statusText: 'Too Many Requests',
headers: headers,
})
)
.mockReturnValue(createSuccess({}))

const ctx = await analytics.track('event')
expect(ctx.attempts).toBe(1)
expect(fetch).toHaveBeenCalledTimes(1)
expect(scheduleFlush).toHaveBeenCalled()
})
})

describe('Batches retry 500s and 429', () => {
let options: SegmentioSettings
let analytics: Analytics
let segment: Plugin
beforeEach(async () => {
jest.resetAllMocks()
jest.restoreAllMocks()

options = {
apiKey: 'foo',
deliveryStrategy: {
strategy: 'batching',
// timeout is set very low to get consistent behavior out of scheduleflush
config: { size: 3, timeout: 1, retryattempts: 3 },
},
}
analytics = new Analytics(
{ writeKey: options.apiKey },
{
retryQueue: true,
}
)
segment = await segmentio(analytics, options, {})
await analytics.register(segment, envEnrichment)
})

test('retries on 500', async () => {
fetch
.mockReturnValueOnce(() => createError({ status: 500 }))
.mockReturnValue(createSuccess({}))

const ctx1 = await analytics.track('event1')
const ctx2 = await analytics.track('event2')
// wait a bit for retries - timeout is only 1 ms
await new Promise((resolve) => setTimeout(resolve, 100))

expect(ctx1.attempts).toBe(1)
expect(analytics.queue.queue.getAttempts(ctx1)).toBe(1)
expect(fetch).toHaveBeenCalledTimes(2)
})

test('delays retry on 429', async () => {
const headers = new Headers()
const resetTime = 1
headers.set('x-ratelimit-reset', resetTime.toString())
fetch.mockReturnValue(
createError({
status: 429,
statusText: 'Too Many Requests',
headers: headers,
})
)

const ctx1 = await analytics.track('event1')
const ctx2 = await analytics.track('event2')

await new Promise((resolve) => setTimeout(resolve, 100))

expect(ctx1.attempts).toBe(1)
expect(fetch).toHaveBeenCalledTimes(1)

expect(fetch).toHaveBeenCalledTimes(1)
await new Promise((resolve) => setTimeout(resolve, 1000))
expect(fetch).toHaveBeenCalledTimes(2)
await new Promise((resolve) => setTimeout(resolve, 1000))
expect(fetch).toHaveBeenCalledTimes(3)
await new Promise((resolve) => setTimeout(resolve, 1000))
expect(fetch).toHaveBeenCalledTimes(3) // capped at 3 retries
MichaelGHSeg marked this conversation as resolved.
Show resolved Hide resolved
})
})

describe('Segment.io retries', () => {
let options: SegmentioSettings
let analytics: Analytics
Expand Down
48 changes: 40 additions & 8 deletions packages/browser/src/plugins/segmentio/batched-dispatcher.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import { SegmentEvent } from '../../core/events'
import { fetch } from '../../lib/fetch'
import { onPageChange } from '../../lib/on-page-change'
import { RateLimitError } from './ratelimit-error'

export type BatchingDispatchConfig = {
size?: number
timeout?: number
retryattempts?: number
}

const MAX_PAYLOAD_SIZE = 500
Expand Down Expand Up @@ -52,6 +54,7 @@ export default function batch(

const limit = config?.size ?? 10
const timeout = config?.timeout ?? 5000
let ratelimittimeout = 0

function sendBatch(batch: object[]) {
if (batch.length === 0) {
Expand All @@ -65,7 +68,7 @@ export default function batch(
const { sentAt, ...newEvent } = event as SegmentEvent
return newEvent
})

console.log('sending batch', updatedBatch)
MichaelGHSeg marked this conversation as resolved.
Show resolved Hide resolved
return fetch(`https://${apiHost}/b`, {
keepalive: pageUnloaded,
headers: {
Expand All @@ -77,28 +80,56 @@ export default function batch(
batch: updatedBatch,
sentAt: new Date().toISOString(),
}),
}).then((res) => {
if (res.status >= 500) {
throw new Error(`Bad response from server: ${res.status}`)
}
if (res.status == 429) {
const retryTimeoutStringSecs = res.headers?.get('x-ratelimit-reset')
const retryTimeoutMS =
retryTimeoutStringSecs != null
? parseInt(retryTimeoutStringSecs) * 1000
MichaelGHSeg marked this conversation as resolved.
Show resolved Hide resolved
: timeout
throw new RateLimitError(
`Rate limit exceeded: ${res.status}`,
retryTimeoutMS
)
}
})
}

async function flush(): Promise<unknown> {
async function flush(attempt = 1): Promise<unknown> {
if (buffer.length) {
const batch = buffer
buffer = []
return sendBatch(batch)
return sendBatch(batch)?.catch((error) => {
console.error('Error sending batch', error)
if (attempt < (config?.retryattempts ?? 10)) {
if (error.name == 'RateLimitError') {
ratelimittimeout = error.retryTimeout
MichaelGHSeg marked this conversation as resolved.
Show resolved Hide resolved
}
buffer.push(batch)
scheduleFlush(attempt + 1)
}
})
}
}

let schedule: NodeJS.Timeout | undefined

function scheduleFlush(): void {
function scheduleFlush(attempt = 1): void {
if (schedule) {
return
}

schedule = setTimeout(() => {
schedule = undefined
flush().catch(console.error)
}, timeout)
schedule = setTimeout(
() => {
schedule = undefined
flush(attempt).catch(console.error)
},
ratelimittimeout ? ratelimittimeout : timeout
)
ratelimittimeout = 0
}

onPageChange((unloaded) => {
Expand All @@ -107,6 +138,7 @@ export default function batch(
if (pageUnloaded && buffer.length) {
const reqs = chunks(buffer).map(sendBatch)
Promise.all(reqs).catch(console.error)
// can we handle a retry here?
MichaelGHSeg marked this conversation as resolved.
Show resolved Hide resolved
}
})

Expand Down
16 changes: 16 additions & 0 deletions packages/browser/src/plugins/segmentio/fetch-dispatcher.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { fetch } from '../../lib/fetch'
import { RateLimitError } from './ratelimit-error'

export type Dispatcher = (url: string, body: object) => Promise<unknown>

Expand All @@ -10,11 +11,26 @@ export default function (config?: StandardDispatcherConfig): {
dispatch: Dispatcher
} {
function dispatch(url: string, body: object): Promise<unknown> {
console.log('dispatching', url, body)
return fetch(url, {
keepalive: config?.keepalive,
headers: { 'Content-Type': 'text/plain' },
method: 'post',
body: JSON.stringify(body),
}).then((res) => {
if (res.status >= 500) {
throw new Error(`Bad response from server: ${res.status}`)
}
if (res.status == 429) {
const retryTimeoutStringSecs = res.headers?.get('x-ratelimit-reset')
const retryTimeoutMS = retryTimeoutStringSecs
? parseInt(retryTimeoutStringSecs) * 1000
: 5000
throw new RateLimitError(
`Rate limit exceeded: ${res.status}`,
retryTimeoutMS
)
}
})
}

Expand Down
12 changes: 9 additions & 3 deletions packages/browser/src/plugins/segmentio/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,17 @@ export function segmentio(
return client
.dispatch(
`${remote}/${path}`,
normalize(analytics, json, settings, integrations)
normalize(analytics, json, settings, integrations, ctx)
)
.then(() => ctx)
.catch(() => {
buffer.pushWithBackoff(ctx)
.catch((error) => {
console.error('Error sending event', error)
if (error.name == 'RateLimitError') {
const timeout = error.retryTimeout
buffer.pushWithBackoff(ctx, timeout)
} else {
buffer.pushWithBackoff(ctx)
}
// eslint-disable-next-line @typescript-eslint/no-use-before-define
scheduleFlush(flushing, buffer, segmentio, scheduleFlush)
return ctx
Expand Down
14 changes: 13 additions & 1 deletion packages/browser/src/plugins/segmentio/normalize.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ import { Analytics } from '../../core/analytics'
import { CDNSettings } from '../../browser'
import { SegmentFacade } from '../../lib/to-facade'
import { SegmentioSettings } from './index'
import { Context } from '../../core/context'

export function normalize(
analytics: Analytics,
json: ReturnType<SegmentFacade['json']>,
settings?: SegmentioSettings,
integrations?: CDNSettings['integrations']
integrations?: CDNSettings['integrations'],
ctx?: Context
): object {
const user = analytics.user()

Expand All @@ -25,6 +27,16 @@ export function normalize(
json._metadata = { failedInitializations: failed }
}

if (ctx != null) {
const retryCount = analytics.queue.queue.getAttempts(ctx)
if (retryCount > 1) {
json._metadata = {
...json._metadata,
MichaelGHSeg marked this conversation as resolved.
Show resolved Hide resolved
retryCount,
}
}
}

const bundled: string[] = []
const unbundled: string[] = []

Expand Down
9 changes: 9 additions & 0 deletions packages/browser/src/plugins/segmentio/ratelimit-error.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
export class RateLimitError extends Error {
retryTimeout: number

constructor(message: string, retryTimeout: number) {
super(message)
this.retryTimeout = retryTimeout
this.name = 'RateLimitError'
}
}
7 changes: 5 additions & 2 deletions packages/core/src/priority-queue/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ export class PriorityQueue<Item extends QueueItem = QueueItem> extends Emitter {
return accepted
}

pushWithBackoff(item: Item): boolean {
pushWithBackoff(item: Item, minTimeout = 0): boolean {
if (this.getAttempts(item) === 0) {
return this.push(item)[0]
}
Expand All @@ -57,7 +57,10 @@ export class PriorityQueue<Item extends QueueItem = QueueItem> extends Emitter {
return false
}

const timeout = backoff({ attempt: attempt - 1 })
let timeout = backoff({ attempt: attempt - 1 })
if (timeout < minTimeout) {
timeout = minTimeout
}

setTimeout(() => {
this.queue.push(item)
Expand Down
Loading