Skip to content

Commit

Permalink
Unflake "streaming responses cancel inner stream after disconnect" te…
Browse files Browse the repository at this point in the history
…st (#71163)

The
[flakiness](https://app.datadoghq.com/ci/test-runs?query=test_level%3Atest%20env%3Aci%20%40git.repository.id%3Agithub.com%2Fvercel%2Fnext.js%20%40test.service%3Anextjs%20%40test.status%3Afail%20%40test.suite%3A%22streaming%20responses%20cancel%20inner%20stream%20after%20disconnect%22&agg_m=count&agg_m_source=base&agg_t=count&currentTab=overview&eventStack=&fromUser=false&index=citest&start=1720903715972&end=1728679715972&paused=false)
of the test is underreported because due to masked errors, the test
sometimes yields false-positive results.

Due to a slight increase in compilation times for route handlers in
#70897, the test started to fail consistently in that PR.

This PR fixes the flakiness by handling the case where the request might
already be aborted before the response was sent. This leads to the
stream not being consumed, and subsequently its `finished` promise is
never resolved, which finally leads to test timeouts.
  • Loading branch information
unstubbable authored Oct 11, 2024
1 parent 7f94b8e commit 84ee9e6
Show file tree
Hide file tree
Showing 7 changed files with 153 additions and 34 deletions.
31 changes: 27 additions & 4 deletions test/e2e/cancel-request/app/edge-route/route.ts
Original file line number Diff line number Diff line change
@@ -1,26 +1,49 @@
import { NextRequest } from 'next/server'
import { Streamable } from '../../streamable'

export const runtime = 'edge'

let streamable: ReturnType<typeof Streamable> | undefined

export async function GET(req: Request): Promise<Response> {
export async function GET(req: NextRequest): Promise<Response> {
if (req.nextUrl.searchParams.has('compile')) {
// The request just wants to trigger compilation.
return new Response(null, { status: 204 })
}

// Consume the entire request body.
// This is so we don't confuse the request close with the connection close.
await req.text()

const write = new URL(req.url!, 'http://localhost/').searchParams.get('write')
const write = req.nextUrl.searchParams.get('write')

if (write) {
const s = (streamable = Streamable(+write!))
const s = (streamable = Streamable(+write))

// The request was aborted before the response was returned.
if (req.signal.aborted) {
s.abort()
return new Response(null, { status: 204 })
}

req.signal.onabort = () => {
s.abort()
}

return new Response(s.stream)
}

// The 2nd request should render the stats. We don't use a query param
// because edge rendering will create a different bundle for that.
const old = streamable!
const old = streamable

if (!old) {
return new Response(
'The streamable from the prime request is unexpectedly not available',
{ status: 500 }
)
}

streamable = undefined
const i = await old.finished
return new Response(`${i}`)
Expand Down
31 changes: 27 additions & 4 deletions test/e2e/cancel-request/app/node-route/route.ts
Original file line number Diff line number Diff line change
@@ -1,28 +1,51 @@
import { Streamable } from '../../streamable'
import { NextRequest } from 'next/server'

export const runtime = 'nodejs'
// Next thinks it can statically compile this route, which breaks the test.
export const dynamic = 'force-dynamic'

let streamable: ReturnType<typeof Streamable> | undefined

export async function GET(req: Request): Promise<Response> {
export async function GET(req: NextRequest): Promise<Response> {
if (req.nextUrl.searchParams.has('compile')) {
// The request just wants to trigger compilation.
return new Response(null, { status: 204 })
}

// Consume the entire request body.
// This is so we don't confuse the request close with the connection close.
await req.text()

const write = new URL(req.url!, 'http://localhost/').searchParams.get('write')
const write = req.nextUrl.searchParams.get('write')

if (write) {
const s = (streamable = Streamable(+write!))
const s = (streamable = Streamable(+write))

// The request was aborted before the response was returned.
if (req.signal.aborted) {
s.abort()
return new Response(null, { status: 204 })
}

req.signal.onabort = () => {
s.abort()
}

return new Response(s.stream)
}

// The 2nd request should render the stats. We don't use a query param
// because edge rendering will create a different bundle for that.
const old = streamable!
const old = streamable

if (!old) {
return new Response(
'The streamable from the prime request is unexpectedly not available',
{ status: 500 }
)
}

streamable = undefined
const i = await old.finished
return new Response(`${i}`)
Expand Down
30 changes: 26 additions & 4 deletions test/e2e/cancel-request/middleware.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { NextRequest } from 'next/server'
import { Streamable } from './streamable'

export const config = {
Expand All @@ -6,24 +7,45 @@ export const config = {

let streamable: ReturnType<typeof Streamable> | undefined

export default async function handler(req: Request): Promise<Response> {
export default async function handler(req: NextRequest): Promise<Response> {
if (req.nextUrl.searchParams.has('compile')) {
// The request just wants to trigger compilation.
return new Response(null, { status: 204 })
}

// Consume the entire request body.
// This is so we don't confuse the request close with the connection close.
await req.text()

const write = new URL(req.url!, 'http://localhost/').searchParams.get('write')
const write = req.nextUrl.searchParams.get('write')

if (write) {
const s = (streamable = Streamable(+write!))
const s = (streamable = Streamable(+write))

// The request was aborted before the response was returned.
if (req.signal.aborted) {
s.abort()
return new Response(null, { status: 204 })
}

req.signal.onabort = () => {
s.abort()
}

return new Response(s.stream)
}

// The 2nd request should render the stats. We don't use a query param
// because edge rendering will create a different bundle for that.
const old = streamable!
const old = streamable

if (!old) {
return new Response(
'The streamable from the prime request is unexpectedly not available',
{ status: 500 }
)
}

streamable = undefined
const i = await old.finished
return new Response(`${i}`)
Expand Down
31 changes: 27 additions & 4 deletions test/e2e/cancel-request/pages/api/edge-api.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { NextRequest } from 'next/server'
import { Streamable } from '../../streamable'

export const config = {
Expand All @@ -6,23 +7,45 @@ export const config = {

let streamable: ReturnType<typeof Streamable> | undefined

export default async function handler(req: Request): Promise<Response> {
export default async function handler(req: NextRequest): Promise<Response> {
if (req.nextUrl.searchParams.has('compile')) {
// The request just wants to trigger compilation.
return new Response(null, { status: 204 })
}

// Consume the entire request body.
// This is so we don't confuse the request close with the connection close.
await req.text()

const write = new URL(req.url!, 'http://localhost/').searchParams.get('write')
const write = req.nextUrl.searchParams.get('write')

if (write) {
const s = (streamable = Streamable(+write!))
const s = (streamable = Streamable(+write))

// The request was aborted before the response was returned.
if (req.signal.aborted) {
s.abort()
return new Response(null, { status: 204 })
}

req.signal.onabort = () => {
s.abort()
}

return new Response(s.stream)
}

// The 2nd request should render the stats. We don't use a query param
// because edge rendering will create a different bundle for that.
const old = streamable!
const old = streamable

if (!old) {
return new Response(
'The streamable from the prime request is unexpectedly not available',
{ status: 500 }
)
}

streamable = undefined
const i = await old.finished
return new Response(`${i}`)
Expand Down
36 changes: 27 additions & 9 deletions test/e2e/cancel-request/pages/api/node-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,26 @@ export const config = {

let readable: ReturnType<typeof Readable> | undefined

export default function handler(
export default async function handler(
req: IncomingMessage,
res: ServerResponse
): Promise<void> {
const url = new URL(req.url!, 'http://localhost/')

if (url.searchParams.has('compile')) {
// The request just wants to trigger compilation.
res.statusCode = 204
res.end()
return
}

// Pages API requests have already consumed the body.
// This is so we don't confuse the request close with the connection close.

const write = new URL(req.url!, 'http://localhost/').searchParams.get('write')
// The 2nd request should render the stats. We don't use a query param
// because edge rendering will create a different bundle for that.
const write = url.searchParams.get('write')

if (write) {
const r = (readable = Readable(+write!))
const r = (readable = Readable(+write))
res.on('close', () => {
r.abort()
})
Expand All @@ -31,9 +39,19 @@ export default function handler(
})
}

const old = readable!
// The 2nd request should render the stats. We don't use a query param
// because edge rendering will create a different bundle for that.
const old = readable

if (!old) {
res.statusCode = 500
res.end(
'The streamable from the prime request is unexpectedly not available'
)
return
}

readable = undefined
return old.finished.then((i) => {
res.end(`${i}`)
})
const i = await old.finished
res.end(`${i}`)
}
21 changes: 12 additions & 9 deletions test/e2e/cancel-request/stream-cancel.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,6 @@ describe('streaming responses cancel inner stream after disconnect', () => {
files: __dirname,
})

// For some reason, it's flakey. Try a few times.
jest.retryTimes(3)

function prime(url: string, noData?: boolean) {
return new Promise<void>((resolve, reject) => {
url = new URL(url, next.url).href
Expand Down Expand Up @@ -55,31 +52,37 @@ describe('streaming responses cancel inner stream after disconnect', () => {
['edge pages api', '/api/edge-api'],
['node pages api', '/api/node-api'],
])('%s', (_name, path) => {
beforeAll(async () => {
// Trigger compilation of the route so that compilation time does not
// factor into the actual test requests.
await next.fetch(path + '?compile')
})

it('cancels stream making progress', async () => {
// If the stream is making regular progress, then we'll eventually hit
// the break because `res.destroyed` is true.
await prime(path + '?write=25')
const res = await next.fetch(path)
const i = +(await res.text())
expect(i).toBeWithin(1, 5)
const i = await res.text()
expect(i).toBeOneOf(['1', '2', '3', '4', '5'])
}, 2500)

it('cancels stalled stream', async () => {
// If the stream is stalled, we'll never hit the `res.destroyed` break
// point, so this ensures we handle it with an out-of-band cancellation.
await prime(path + '?write=1')
const res = await next.fetch(path)
const i = +(await res.text())
expect(i).toBe(1)
const i = await res.text()
expect(i).toBe('1')
}, 2500)

it('cancels stream that never sent data', async () => {
// If the client has never sent any data (including headers), then we
// haven't even established the response object yet.
await prime(path + '?write=0', true)
const res = await next.fetch(path)
const i = +(await res.text())
expect(i).toBe(0)
const i = await res.text()
expect(i).toBe('0')
}, 2500)
})
})
7 changes: 7 additions & 0 deletions test/e2e/cancel-request/streamable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,22 @@ export function Streamable(write: number) {
const cleanedUp = new Deferred()
const aborted = new Deferred()
let i = 0
let startedConsuming = false

const streamable = {
finished: Promise.all([cleanedUp.promise, aborted.promise]).then(() => i),

abort() {
aborted.resolve()

if (!startedConsuming) {
cleanedUp.resolve()
}
},
stream: new ReadableStream({
async pull(controller) {
startedConsuming = true

if (i >= write) {
return
}
Expand Down

0 comments on commit 84ee9e6

Please sign in to comment.