Skip to content

Commit

Permalink
feat(streams/merge): earlyZipReadableStreams (#2264)
Browse files Browse the repository at this point in the history
  • Loading branch information
crowlKats committed May 26, 2022
1 parent 852968f commit 8c994de
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 2 deletions.
34 changes: 33 additions & 1 deletion streams/merge.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ export function mergeReadableStreams<T>(
/**
* Merge multiple streams into a single one, taking order into account, and each stream
* will wait for a chunk to enqueue before the next stream can append another chunk.
* If a stream ends before other ones, the other will continue adding data in order,
* If a stream ends before other ones, the others will continue adding data in order,
* and the finished one will not add any more data.
*/
export function zipReadableStreams<T>(
Expand Down Expand Up @@ -64,3 +64,35 @@ export function zipReadableStreams<T>(
},
});
}

/**
* Merge multiple streams into a single one, taking order into account, and each stream
* will wait for a chunk to enqueue before the next stream can append another chunk.
* If a stream ends before other ones, the others will be cancelled.
*/
export function earlyZipReadableStreams<T>(
...streams: ReadableStream<T>[]
): ReadableStream<T> {
const readers = streams.map((s) => s.getReader());
return new ReadableStream<T>({
async start(controller) {
try {
loop:
while (true) {
for (const reader of readers) {
const { value, done } = await reader.read();
if (!done) {
controller.enqueue(value!);
} else {
await Promise.all(readers.map((reader) => reader.cancel()));
break loop;
}
}
}
controller.close();
} catch (e) {
controller.error(e);
}
},
});
}
79 changes: 78 additions & 1 deletion streams/merge_test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license.

import { mergeReadableStreams, zipReadableStreams } from "./merge.ts";
import {
earlyZipReadableStreams,
mergeReadableStreams,
zipReadableStreams,
} from "./merge.ts";
import { assertEquals } from "../testing/asserts.ts";

Deno.test("[streams] mergeReadableStreams", async () => {
Expand Down Expand Up @@ -70,3 +74,76 @@ Deno.test("[streams] zipReadableStreams", async () => {
"qwertzuiopasq123d",
]);
});

Deno.test("[streams] earlyZipReadableStreams short first", async () => {
const textStream = new ReadableStream<string>({
start(controller) {
controller.enqueue("1");
controller.enqueue("2");
controller.enqueue("3");
controller.close();
},
});

const textStream2 = new ReadableStream<string>({
start(controller) {
controller.enqueue("a");
controller.enqueue("b");
controller.enqueue("c");
controller.enqueue("d");
controller.enqueue("e");
controller.close();
},
});

const buf = [];
for await (const s of earlyZipReadableStreams(textStream, textStream2)) {
buf.push(s);
}

assertEquals(buf, [
"1",
"a",
"2",
"b",
"3",
"c",
]);
});

Deno.test("[streams] earlyZipReadableStreams long first", async () => {
const textStream = new ReadableStream<string>({
start(controller) {
controller.enqueue("a");
controller.enqueue("b");
controller.enqueue("c");
controller.enqueue("d");
controller.enqueue("e");
controller.close();
},
});

const textStream2 = new ReadableStream<string>({
start(controller) {
controller.enqueue("1");
controller.enqueue("2");
controller.enqueue("3");
controller.close();
},
});

const buf = [];
for await (const s of earlyZipReadableStreams(textStream, textStream2)) {
buf.push(s);
}

assertEquals(buf, [
"a",
"1",
"b",
"2",
"c",
"3",
"d",
]);
});

0 comments on commit 8c994de

Please sign in to comment.