Skip to content
This repository has been archived by the owner on Feb 12, 2024. It is now read-only.

fix: handle node readable streams properly #3890

Merged
merged 3 commits into from
Sep 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion packages/ipfs-core-utils/src/files/normalise-content.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,14 @@ import {
/**
* @param {import('./normalise').ToContent} input
*/
export async function * normaliseContent (input) {
export async function normaliseContent (input) {
return toAsyncGenerator(input)
}

/**
* @param {import('./normalise').ToContent} input
*/
async function * toAsyncGenerator (input) {
// Bytes | String
if (isBytes(input)) {
yield toBytes(input)
Expand Down
12 changes: 9 additions & 3 deletions packages/ipfs-core-utils/src/files/normalise.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import {

/**
* @param {ImportCandidate | ImportCandidateStream} input
* @param {(content:ToContent) => AsyncIterable<Uint8Array>} normaliseContent
* @param {(content:ToContent) => Promise<AsyncIterable<Uint8Array>>} normaliseContent
*/
// eslint-disable-next-line complexity
export async function * normalise (input, normaliseContent) {
Expand Down Expand Up @@ -72,6 +72,13 @@ export async function * normalise (input, normaliseContent) {
return
}

// fs.ReadStream<Bytes>
if (value._readableState) {
// @ts-ignore Node readable streams have a `.path` property so we need to pass it as the content
yield * map(peekable, (/** @type {ImportCandidate} */ value) => toFileObject({ content: value }, normaliseContent))
return
}

// (Async)Iterable<Blob>
// (Async)Iterable<String>
// (Async)Iterable<{ path, content }>
Expand Down Expand Up @@ -103,7 +110,7 @@ export async function * normalise (input, normaliseContent) {

/**
* @param {ImportCandidate} input
* @param {(content:ToContent) => AsyncIterable<Uint8Array>} normaliseContent
* @param {(content:ToContent) => Promise<AsyncIterable<Uint8Array>>} normaliseContent
*/
async function toFileObject (input, normaliseContent) {
// @ts-ignore - Those properties don't exist on most input types
Expand All @@ -117,7 +124,6 @@ async function toFileObject (input, normaliseContent) {
}

if (content) {
// @ts-ignore TODO vmx 2021-03-30 enable again
file.content = await normaliseContent(content)
} else if (!path) { // Not already a file object with path or content prop
// @ts-ignore - input still can be different ToContent
Expand Down
29 changes: 29 additions & 0 deletions packages/ipfs-core-utils/test/files/normalise-input.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import all from 'it-all'
import { File } from '@web-std/file'
import { normaliseInput } from '../../src/files/normalise-input.js'
import { isNode } from 'ipfs-utils/src/env.js'
import resolve from 'aegir/utils/resolve.js'

const { Blob, ReadableStream } = globalThis

Expand Down Expand Up @@ -208,4 +210,31 @@ describe('normalise-input', function () {
describe('TypedArray', () => {
testInputType(TYPEDARRAY, 'TypedArray', true)
})

if (isNode) {
/** @type {import('fs')} */
let fs

before(async () => {
fs = await import('fs')
})

describe('Node fs.ReadStream', () => {
const NODEFSREADSTREAM = () => {
const path = resolve('test/fixtures/file.txt', 'ipfs-core-utils')

return fs.createReadStream(path)
}

testInputType(NODEFSREADSTREAM, 'Node fs.ReadStream', false)

it('Iterable<Node fs.ReadStream>', async function () {
await testContent(iterableOf(NODEFSREADSTREAM()))
})

it('AsyncIterable<Node fs.ReadStream>', async function () {
await testContent(asyncIterableOf(NODEFSREADSTREAM()))
})
})
}
})
1 change: 1 addition & 0 deletions packages/ipfs-core-utils/test/fixtures/file.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
hello world
2 changes: 1 addition & 1 deletion packages/ipfs-grpc-client/src/core-api/files/write.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import {
* @param {*} content
*/
async function * stream (path, content) {
for await (const buf of normaliseContent(content)) {
for await (const buf of await normaliseContent(content)) {
yield { path, content: buf }
}
}
Expand Down