Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

feat: message splitting #100

Merged
merged 1 commit into from
Nov 28, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 2 additions & 7 deletions src/mplex.js
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ class Mplex {
registry.delete(id)
this.onStreamEnd && this.onStreamEnd(stream)
}
const stream = createStream({ id, name, send, type, onEnd })
const stream = createStream({ id, name, send, type, onEnd, maxMsgSize: this._options.maxMsgSize })
registry.set(id, stream)
return stream
}
Expand Down Expand Up @@ -176,12 +176,7 @@ class Mplex {
for (const s of receivers.values()) s.abort(err)
}
const source = pushable({ onEnd, writev: true })
const encodedSource = pipe(
source,
restrictSize(this._options.maxMsgSize),
Coder.encode
)
return Object.assign(encodedSource, {
return Object.assign(Coder.encode(source), {
push: source.push,
end: source.end,
return: source.return
Expand Down
2 changes: 2 additions & 0 deletions src/restrict-size.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,5 @@ module.exports = max => {
})()
}
}

module.exports.MAX_MSG_SIZE = MAX_MSG_SIZE
19 changes: 15 additions & 4 deletions src/stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ const abortable = require('abortable-iterator')
const AbortController = require('abort-controller')
const log = require('debug')('libp2p:mplex:stream')
const pushable = require('it-pushable')
const BufferList = require('bl/BufferList')
const { MAX_MSG_SIZE } = require('./restrict-size')
const { InitiatorMessageTypes, ReceiverMessageTypes } = require('./message-types')

/**
Expand All @@ -12,10 +14,11 @@ const { InitiatorMessageTypes, ReceiverMessageTypes } = require('./message-types
* @param {string} options.name
* @param {function(*)} options.send Called to send data through the stream
* @param {function(Error)} [options.onEnd] Called whenever the stream ends
* @param {string} options.type One of ['initiator','receiver']. Defaults to 'initiator'
* @param {string} [options.type] One of ['initiator','receiver']. Defaults to 'initiator'
* @param {number} [options.maxMsgSize] Max size of an mplex message in bytes. Writes > size are automatically split. Defaults to 1MB
* @returns {*} A muxed stream
*/
module.exports = ({ id, name, send, onEnd = () => {}, type = 'initiator' }) => {
module.exports = ({ id, name, send, onEnd = () => {}, type = 'initiator', maxMsgSize = MAX_MSG_SIZE }) => {
const abortController = new AbortController()
const resetController = new AbortController()
const Types = type === 'initiator' ? InitiatorMessageTypes : ReceiverMessageTypes
Expand Down Expand Up @@ -70,8 +73,16 @@ module.exports = ({ id, name, send, onEnd = () => {}, type = 'initiator' }) => {
}

try {
for await (const data of source) {
send({ id, type: Types.MESSAGE, data })
for await (let data of source) {
while (data.length) {
if (data.length <= maxMsgSize) {
send({ id, type: Types.MESSAGE, data })
break
}
data = BufferList.isBufferList(data) ? data : new BufferList(data)
send({ id, type: Types.MESSAGE, data: data.shallowSlice(0, maxMsgSize) })
data.consume(maxMsgSize)
}
}
} catch (err) {
// Send no more data if this stream was remotely reset
Expand Down
81 changes: 72 additions & 9 deletions test/stream.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ chai.use(dirtyChai)
const pipe = require('it-pipe')
const randomBytes = require('random-bytes')
const randomInt = require('random-int')
const { tap, take, collect, consume } = require('streaming-iterables')
const { tap, take, collect, consume, map } = require('streaming-iterables')
const defer = require('p-defer')

const createStream = require('../src/stream')
Expand All @@ -30,6 +30,17 @@ const infiniteRandom = {
}
}

const msgToBuffer = msg => Buffer.from(JSON.stringify(msg))

const bufferToMessage = buf => {
const msg = JSON.parse(buf)
// JSON.stringify(Buffer) encodes as {"type":"Buffer","data":[1,2,3]}
if (msg.data && msg.data.type === 'Buffer') {
msg.data = Buffer.from(msg.data.data)
}
return msg
}

describe('stream', () => {
it('should initiate stream with NEW_STREAM message', async () => {
const msgs = []
Expand Down Expand Up @@ -197,11 +208,12 @@ describe('stream', () => {
// echo back (on the other side this will be { type: MESSAGE, data: msg })
pipe(
receiver,
tap(msg => {
map(msg => {
// when the initiator sends a CLOSE message, we call close
if (msg.type === MessageTypes.CLOSE_INITIATOR) {
receiver.close()
}
return msgToBuffer(msg)
}),
receiver
)
Expand All @@ -215,6 +227,9 @@ describe('stream', () => {
if (msg.type === MessageTypes.CLOSE_RECEIVER) {
initiator.close()
}
if (msg.data) {
msg.data = bufferToMessage(msg.data)
}
}),
collect
)
Expand Down Expand Up @@ -253,11 +268,12 @@ describe('stream', () => {
// echo back (on the other side this will be { type: MESSAGE, data: msg })
pipe(
receiver,
tap(msg => {
map(msg => {
// when the initiator sends a RESET message, we call reset
if (msg.type === MessageTypes.RESET_INITIATOR) {
receiver.reset()
}
return msgToBuffer(msg)
}),
receiver
)
Expand All @@ -274,9 +290,14 @@ describe('stream', () => {
await pipe(
input,
tap(msg => generatedMsgs.push(msg)),
tap(msg => { if (i++ >= maxMsgs) initiator.abort(error) }),
tap(() => { if (i++ >= maxMsgs) initiator.abort(error) }),
initiator,
tap(msg => msgs.push(msg)),
tap(msg => {
if (msg.data) {
msg.data = bufferToMessage(msg.data)
}
msgs.push(msg)
}),
consume
)
} catch (err) {
Expand Down Expand Up @@ -311,7 +332,10 @@ describe('stream', () => {
// echo back (on the other side this will be { type: MESSAGE, data: msg })
pipe(
receiver,
tap(msg => { if (i++ >= maxMsgs) receiver.abort(error) }),
map(msg => {
if (i++ >= maxMsgs) receiver.abort(error)
return msgToBuffer(msg)
}),
receiver
)

Expand All @@ -330,6 +354,9 @@ describe('stream', () => {
if (msg.type === MessageTypes.RESET_RECEIVER) {
initiator.reset()
}
if (msg.data) {
msg.data = bufferToMessage(msg.data)
}
}),
consume
)
Expand Down Expand Up @@ -365,11 +392,12 @@ describe('stream', () => {
// echo back (on the other side this will be { type: MESSAGE, data: msg })
pipe(
receiver,
tap(msg => {
map(msg => {
// when the initiator sends a RESET message, we call reset
if (msg.type === MessageTypes.RESET_INITIATOR) {
receiver.reset()
}
return msgToBuffer(msg)
}),
receiver
)
Expand All @@ -388,7 +416,12 @@ describe('stream', () => {
tap(msg => generatedMsgs.push(msg)),
tap(msg => { if (i++ >= maxMsgs) throw error }),
initiator,
tap(msg => msgs.push(msg)),
tap(msg => {
if (msg.data) {
msg.data = bufferToMessage(msg.data)
}
msgs.push(msg)
}),
consume
)
} catch (err) {
Expand Down Expand Up @@ -425,7 +458,10 @@ describe('stream', () => {
// echo back (on the other side this will be { type: MESSAGE, data: msg })
pipe(
receiver,
tap(msg => { if (i++ >= maxMsgs) throw error }),
map(msg => {
if (i++ >= maxMsgs) throw error
return msgToBuffer(msg)
}),
receiver
)

Expand All @@ -444,6 +480,9 @@ describe('stream', () => {
if (msg.type === MessageTypes.RESET_RECEIVER) {
initiator.reset()
}
if (msg.data) {
msg.data = bufferToMessage(msg.data)
}
}),
consume
)
Expand Down Expand Up @@ -503,4 +542,28 @@ describe('stream', () => {
}
throw new Error('did not call onEnd with error')
})

it('should split writes larger than max message size', async () => {
const send = msg => {
if (msg.type === MessageTypes.CLOSE_INITIATOR) {
stream.source.end()
} else if (msg.type === MessageTypes.MESSAGE_INITIATOR) {
stream.source.push(msg)
}
}

const id = randomInt(1000)
const name = id.toString()
const maxMsgSize = 5
const stream = createStream({ id, name, send, maxMsgSize })

const bigMessage = await randomBytes(12)
const dataMessages = await pipe([bigMessage], stream, collect)

expect(dataMessages.length).to.equal(3)
expect(dataMessages[0].data.length).to.equal(maxMsgSize)
expect(dataMessages[1].data.length).to.equal(maxMsgSize)
expect(dataMessages[2].data.length).to.equal(2)
expect(Buffer.concat(dataMessages.map(m => m.data.slice()))).to.deep.equal(bigMessage)
})
})