Skip to content

Commit

Permalink
Merge pull request #21 from SocketDev/transform-promise
Browse files Browse the repository at this point in the history
Return a promise from transformStream shim
  • Loading branch information
jhiesey authored Jun 16, 2021
2 parents 90d79f8 + 5ea470c commit fa846d7
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 26 deletions.
10 changes: 6 additions & 4 deletions lib/ece.js
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,12 @@ export function encryptStream (
const stream = transformStream(
input,
new SliceTransformer(rs - TAG_LENGTH - 1)
)
).readable

return transformStream(
stream,
new ECETransformer(MODE_ENCRYPT, secretKey, rs, salt)
)
).readable
}

/**
Expand All @@ -94,11 +95,12 @@ export function encryptStream (
* rs: int containing record size, optional
*/
export function decryptStream (input, secretKey, rs = RECORD_SIZE) {
const stream = transformStream(input, new SliceTransformer(HEADER_LENGTH, rs))
const stream = transformStream(input, new SliceTransformer(HEADER_LENGTH, rs)).readable

return transformStream(
stream,
new ECETransformer(MODE_DECRYPT, secretKey, rs)
)
).readable
}

function checkSecretKey (secretKey) {
Expand Down
83 changes: 61 additions & 22 deletions lib/transform-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,58 @@
/* global TransformStream */

/**
* Pipe a readable stream through a transformer. Return the readable end of the
* TransformStream.
* Pipe a readable stream through a transformer. Returns a result, where
* result.readable is the readable end of the TransformStream and
* result.done is a promise that fulfills or rejects once the stream is done.
* Includes a shim for environments where TransformStream is not available.
*/
export function transformStream (readable, transformer) {
// Chrome, Edge, Safari TP
export function transformStream (sourceReadable, transformer) {
let transformedReadable
let done

if (typeof TransformStream !== 'undefined') {
return readable.pipeThrough(new TransformStream(transformer))
// Chrome, Edge, Safari 14.1+
const transform = new TransformStream(transformer)

done = sourceReadable.pipeTo(transform.writable)
transformedReadable = transform.readable
} else {
// Firefox, Safari 14 and older
let resolveDone
let rejectDone
done = new Promise((resolve, reject) => {
resolveDone = resolve
rejectDone = reject
})
transformedReadable = new ReadableStream(new TransformStreamSource(sourceReadable, transformer, { resolveDone, rejectDone }))
}

// Firefox, Safari 14 and older
return new ReadableStream(new TransformStreamSource(readable, transformer))
// Ensure the caller doesn't need to catch errors
done.catch(() => {})

return {
readable: transformedReadable,
done
}
}

class TransformStreamSource {
constructor (readable, transformer) {
constructor (readable, transformer, { resolveDone, rejectDone }) {
this.readable = readable
this.transformer = transformer
this.resolveDone = resolveDone
this.rejectDone = rejectDone
this.reader = readable.getReader()
}

async start (controller) {
if (this.transformer.start) {
return await this.transformer.start(controller)
if (this.transformer?.start) {
try {
await this.transformer.start(controller)
} catch (err) {
this.rejectDone(err)
throw err
}
}
}

Expand All @@ -40,23 +68,34 @@ class TransformStreamSource {

// eslint-disable-next-line no-unmodified-loop-condition
while (!enqueued) {
const data = await this.reader.read()
if (data.done) {
if (this.transformer.flush) {
await this.transformer.flush(controller)
try {
const data = await this.reader.read()
if (data.done) {
if (this.transformer?.flush) {
await this.transformer.flush(controller)
}
controller.close()
this.resolveDone()
return
}
controller.close()
return
}
if (this.transformer.transform) {
await this.transformer.transform(data.value, wrappedController)
} else {
wrappedController.enqueue(data.value)
if (this.transformer?.transform) {
await this.transformer.transform(data.value, wrappedController)
} else {
wrappedController.enqueue(data.value)
}
} catch (err) {
this.rejectDone(err)
throw err
}
}
}

async cancel (reason) {
return await this.reader.cancel(reason)
await this.reader.cancel(reason)
if (reason instanceof Error) {
this.rejectDone(reason)
} else {
this.rejectDone(new Error(`stream cancelled; reason: ${reason}`))
}
}
}

0 comments on commit fa846d7

Please sign in to comment.