Skip to content
This repository has been archived by the owner on Jun 26, 2023. It is now read-only.

fix: chunk data in mock muxer #218

Merged
merged 2 commits into from
May 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/libp2p-interface-compliance-tests/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@
"p-limit": "^4.0.0",
"p-wait-for": "^4.1.0",
"sinon": "^14.0.0",
"uint8arraylist": "^1.5.1",
"uint8arrays": "^3.0.0",
"util": "^0.12.4"
}
Expand Down
53 changes: 42 additions & 11 deletions packages/libp2p-interface-compliance-tests/src/mocks/muxer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@ import type { Source } from 'it-stream-types'
import { pipe } from 'it-pipe'
import map from 'it-map'
import type { Components } from '@libp2p/interfaces/components'
import { Uint8ArrayList } from 'uint8arraylist'

let muxers = 0
let streams = 0
const MAX_MESSAGE_SIZE = 1024 * 1024

interface DataMessage {
id: string
Expand Down Expand Up @@ -53,6 +55,7 @@ class MuxedStream {
private sourceEnded: boolean
private readonly abortController: AbortController
private readonly resetController: AbortController
private readonly closeController: AbortController
private readonly log: Logger

constructor (init: { id: string, type: 'initiator' | 'recipient', push: Pushable<StreamMessage>, onEnd: (err?: Error) => void }) {
Expand All @@ -64,6 +67,7 @@ class MuxedStream {
this.type = type
this.abortController = new AbortController()
this.resetController = new AbortController()
this.closeController = new AbortController()

this.sourceEnded = false
this.sinkEnded = false
Expand Down Expand Up @@ -121,9 +125,14 @@ class MuxedStream {
this.stream = {
id,
sink: async (source) => {
if (this.sinkEnded) {
throw errCode(new Error('stream closed for writing'), 'ERR_SINK_ENDED')
}

source = abortableSource(source, anySignal([
this.abortController.signal,
this.resetController.signal
this.resetController.signal,
this.closeController.signal
]))

try {
Expand All @@ -137,18 +146,31 @@ class MuxedStream {
push.push(createMsg)
}

const list = new Uint8ArrayList()

for await (const chunk of source) {
const dataMsg: DataMessage = {
id,
type: 'data',
chunk: uint8ArrayToString(chunk, 'base64'),
direction: this.type
list.append(chunk)

while (list.length > 0) {
const available = Math.min(list.length, MAX_MESSAGE_SIZE)
const subList = list.subarray(0, available)
const dataMsg: DataMessage = {
id,
type: 'data',
chunk: uint8ArrayToString(subList.slice(), 'base64'),
direction: this.type
}

push.push(dataMsg)
list.consume(available)
}

push.push(dataMsg)
}
} catch (err: any) {
if (err.type === 'aborted' && err.message === 'The operation was aborted') {
if (this.closeController.signal.aborted) {
return
}

if (this.resetController.signal.aborted) {
err.message = 'stream reset'
err.code = 'ERR_STREAM_RESET'
Expand Down Expand Up @@ -192,19 +214,28 @@ class MuxedStream {

// Close for reading
close: () => {
this.input.end()
this.stream.closeRead()
this.stream.closeWrite()
},

closeRead: () => {
this.input.end()
},

closeWrite: () => {
this.input.end()
this.closeController.abort()

const closeMsg: CloseMessage = {
id,
type: 'close',
direction: this.type
}
push.push(closeMsg)
onSinkEnd()
},

// Close for reading and writing (local error)
abort: (err?: Error) => {
abort: (err: Error) => {
// End the source with the passed error
this.input.end(err)
this.abortController.abort()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ export default async (createMuxer: (init?: StreamMuxerInit) => Promise<StreamMux
void pipe(
stream,
drain
).then(async () => await pipe([], stream))
)
}
})
const dialer = await createMuxer()
Expand Down
2 changes: 1 addition & 1 deletion packages/libp2p-multistream-select/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@
"it-reader": "^5.0.0",
"it-stream-types": "^1.0.4",
"p-defer": "^4.0.0",
"uint8arraylist": "^1.4.0",
"uint8arraylist": "^1.5.1",
"uint8arrays": "^3.0.0"
},
"devDependencies": {
Expand Down