From 674d4a347694350eb9452476fb63a77293c23635 Mon Sep 17 00:00:00 2001 From: nyapat <73502164+nyapat@users.noreply.github.com> Date: Thu, 22 Aug 2024 21:23:00 +1000 Subject: [PATCH 1/2] fix: mark stream as ended refactor: prefer destroying the stream --- packages/voice/src/receive/AudioReceiveStream.ts | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/packages/voice/src/receive/AudioReceiveStream.ts b/packages/voice/src/receive/AudioReceiveStream.ts index 119903844f1b..65c215f7f3c2 100644 --- a/packages/voice/src/receive/AudioReceiveStream.ts +++ b/packages/voice/src/receive/AudioReceiveStream.ts @@ -74,7 +74,14 @@ export class AudioReceiveStream extends Readable { this.renewEndTimeout(this.end); } - return super.push(buffer); + const result = super.push(buffer); + + if (buffer === null) { + // null marks EOF for stream + this.destroy(); + } + + return result; } private renewEndTimeout(end: EndBehavior & { duration: number }) { From d6a380e6993e6743955d7ae6ba40f62be17a906a Mon Sep 17 00:00:00 2001 From: nyapat <73502164+nyapat@users.noreply.github.com> Date: Sun, 25 Aug 2024 19:34:54 +1000 Subject: [PATCH 2/2] refactor: callback for nextTick test: wait duration ms to check end chore: eslint test: end before timeout --- .../__tests__/AudioReceiveStream.test.ts | 50 ++++++++++++------- .../voice/src/receive/AudioReceiveStream.ts | 7 ++- 2 files changed, 36 insertions(+), 21 deletions(-) diff --git a/packages/voice/__tests__/AudioReceiveStream.test.ts b/packages/voice/__tests__/AudioReceiveStream.test.ts index 415ec1bf2b69..abeb04243699 100644 --- a/packages/voice/__tests__/AudioReceiveStream.test.ts +++ b/packages/voice/__tests__/AudioReceiveStream.test.ts @@ -23,29 +23,27 @@ describe('AudioReceiveStream', () => { await wait(200); stream.push(DUMMY_BUFFER); expect(stream.readable).toEqual(true); + stream.push(null); + await wait(200); + expect(stream.readable).toEqual(false); }); - // TODO: Fix this test - // test('AfterSilence end behavior', async () => { - // const duration = 100; - // const increment = 20; - - // const stream = new AudioReceiveStream({ end: { behavior: EndBehaviorType.AfterSilence, duration: 100 } }); - // stream.resume(); + test('AfterSilence end behavior', async () => { + const duration = 100; + const increment = 20; - // for (let i = increment; i < duration / 2; i += increment) { - // await stepSilence(stream, increment); - // } + const stream = new AudioReceiveStream({ end: { behavior: EndBehaviorType.AfterSilence, duration } }); + stream.resume(); - // stream.push(DUMMY_BUFFER); + for (let step = increment; step < duration / 2; step += increment) { + await stepSilence(stream, increment); + } - // for (let i = increment; i < duration; i += increment) { - // await stepSilence(stream, increment); - // } + stream.push(DUMMY_BUFFER); - // await wait(increment); - // expect(stream.readableEnded).toEqual(true); - // }); + await wait(duration); + expect(stream.readableEnded).toEqual(true); + }); test('AfterInactivity end behavior', async () => { const duration = 100; @@ -71,4 +69,22 @@ describe('AudioReceiveStream', () => { expect(stream.readableEnded).toEqual(true); }); + + test('Stream ends after pushing null', async () => { + const stream = new AudioReceiveStream({ end: { behavior: EndBehaviorType.AfterInactivity, duration: 100 } }); + stream.resume(); + + stream.push(DUMMY_BUFFER); + + expect(stream.readable).toEqual(true); + expect(stream.readableEnded).toEqual(false); + expect(stream.destroyed).toEqual(false); + + stream.push(null); + await wait(50); + + expect(stream.readable).toEqual(false); + expect(stream.readableEnded).toEqual(true); + expect(stream.destroyed).toEqual(true); + }); }); diff --git a/packages/voice/src/receive/AudioReceiveStream.ts b/packages/voice/src/receive/AudioReceiveStream.ts index 65c215f7f3c2..45b0ef3e8133 100644 --- a/packages/voice/src/receive/AudioReceiveStream.ts +++ b/packages/voice/src/receive/AudioReceiveStream.ts @@ -1,4 +1,5 @@ import type { Buffer } from 'node:buffer'; +import { nextTick } from 'node:process'; import { Readable, type ReadableOptions } from 'node:stream'; import { SILENCE_FRAME } from '../audio/AudioPlayer'; @@ -74,14 +75,12 @@ export class AudioReceiveStream extends Readable { this.renewEndTimeout(this.end); } - const result = super.push(buffer); - if (buffer === null) { // null marks EOF for stream - this.destroy(); + nextTick(() => this.destroy()); } - return result; + return super.push(buffer); } private renewEndTimeout(end: EndBehavior & { duration: number }) {