Skip to content

Commit

Permalink
Merge pull request #25 from SocketDev/add-missing-transform-methods
Browse files Browse the repository at this point in the history
Add missing transform methods
  • Loading branch information
jhiesey authored Jun 25, 2021
2 parents fc42e5b + d452516 commit 3329a58
Showing 1 changed file with 31 additions and 13 deletions.
44 changes: 31 additions & 13 deletions lib/transform-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,36 @@ class TransformStreamSource {
this.resolveDone = resolveDone
this.rejectDone = rejectDone
this.reader = readable.getReader()
this.progressMade = false // reset on each pull
this.wrappedController = null
}

async start (controller) {
this.wrappedController = {
enqueue: (value) => {
this.progressMade = true
controller.enqueue(value)
},
error: (reason) => {
this.progressMade = true
if (!(reason instanceof Error)) {
reason = new Error(`stream errored; reason: ${reason}`)
}
controller.error(reason)
this.reader.cancel(reason).catch(() => {})
this.rejectDone(reason)
},
terminate: () => {
this.progressMade = true
controller.close()
this.reader.cancel(new Error('stream terminated')).catch(() => {})
this.resolveDone()
}
}

if (this.transformer.start) {
try {
await this.transformer.start(controller)
await this.transformer.start(this.wrappedController)
} catch (err) {
this.rejectDone(err)
throw err
Expand All @@ -58,33 +82,27 @@ class TransformStreamSource {
}

async pull (controller) {
let enqueued = false
const wrappedController = {
enqueue (d) {
enqueued = true
controller.enqueue(d)
}
}

this.progressMade = false
// eslint-disable-next-line no-unmodified-loop-condition
while (!enqueued) {
while (!this.progressMade) {
try {
const data = await this.reader.read()
if (data.done) {
if (this.transformer.flush) {
await this.transformer.flush(controller)
await this.transformer.flush(this.wrappedController)
}
controller.close()
this.resolveDone()
return
}
if (this.transformer.transform) {
await this.transformer.transform(data.value, wrappedController)
await this.transformer.transform(data.value, this.wrappedController)
} else {
wrappedController.enqueue(data.value)
this.wrappedController.enqueue(data.value)
}
} catch (err) {
this.rejectDone(err)
this.reader.cancel(err).catch(() => {})
throw err
}
}
Expand Down

0 comments on commit 3329a58

Please sign in to comment.