Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

Commit

Permalink
feat: message splitting (#100)
Browse files Browse the repository at this point in the history
  • Loading branch information
alanshaw authored and vasco-santos committed Nov 28, 2019
1 parent ce6a236 commit fba56a5
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 20 deletions.
9 changes: 2 additions & 7 deletions src/mplex.js
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ class Mplex {
registry.delete(id)
this.onStreamEnd && this.onStreamEnd(stream)
}
const stream = createStream({ id, name, send, type, onEnd })
const stream = createStream({ id, name, send, type, onEnd, maxMsgSize: this._options.maxMsgSize })
registry.set(id, stream)
return stream
}
Expand Down Expand Up @@ -176,12 +176,7 @@ class Mplex {
for (const s of receivers.values()) s.abort(err)
}
const source = pushable({ onEnd, writev: true })
const encodedSource = pipe(
source,
restrictSize(this._options.maxMsgSize),
Coder.encode
)
return Object.assign(encodedSource, {
return Object.assign(Coder.encode(source), {
push: source.push,
end: source.end,
return: source.return
Expand Down
2 changes: 2 additions & 0 deletions src/restrict-size.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,5 @@ module.exports = max => {
})()
}
}

module.exports.MAX_MSG_SIZE = MAX_MSG_SIZE
19 changes: 15 additions & 4 deletions src/stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ const abortable = require('abortable-iterator')
const AbortController = require('abort-controller')
const log = require('debug')('libp2p:mplex:stream')
const pushable = require('it-pushable')
const BufferList = require('bl/BufferList')
const { MAX_MSG_SIZE } = require('./restrict-size')
const { InitiatorMessageTypes, ReceiverMessageTypes } = require('./message-types')

/**
Expand All @@ -12,10 +14,11 @@ const { InitiatorMessageTypes, ReceiverMessageTypes } = require('./message-types
* @param {string} options.name
* @param {function(*)} options.send Called to send data through the stream
* @param {function(Error)} [options.onEnd] Called whenever the stream ends
* @param {string} options.type One of ['initiator','receiver']. Defaults to 'initiator'
* @param {string} [options.type] One of ['initiator','receiver']. Defaults to 'initiator'
* @param {number} [options.maxMsgSize] Max size of an mplex message in bytes. Writes > size are automatically split. Defaults to 1MB
* @returns {*} A muxed stream
*/
module.exports = ({ id, name, send, onEnd = () => {}, type = 'initiator' }) => {
module.exports = ({ id, name, send, onEnd = () => {}, type = 'initiator', maxMsgSize = MAX_MSG_SIZE }) => {
const abortController = new AbortController()
const resetController = new AbortController()
const Types = type === 'initiator' ? InitiatorMessageTypes : ReceiverMessageTypes
Expand Down Expand Up @@ -70,8 +73,16 @@ module.exports = ({ id, name, send, onEnd = () => {}, type = 'initiator' }) => {
}

try {
for await (const data of source) {
send({ id, type: Types.MESSAGE, data })
for await (let data of source) {
while (data.length) {
if (data.length <= maxMsgSize) {
send({ id, type: Types.MESSAGE, data })
break
}
data = BufferList.isBufferList(data) ? data : new BufferList(data)
send({ id, type: Types.MESSAGE, data: data.shallowSlice(0, maxMsgSize) })
data.consume(maxMsgSize)
}
}
} catch (err) {
// Send no more data if this stream was remotely reset
Expand Down
81 changes: 72 additions & 9 deletions test/stream.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ chai.use(dirtyChai)
const pipe = require('it-pipe')
const randomBytes = require('random-bytes')
const randomInt = require('random-int')
const { tap, take, collect, consume } = require('streaming-iterables')
const { tap, take, collect, consume, map } = require('streaming-iterables')
const defer = require('p-defer')

const createStream = require('../src/stream')
Expand All @@ -30,6 +30,17 @@ const infiniteRandom = {
}
}

const msgToBuffer = msg => Buffer.from(JSON.stringify(msg))

const bufferToMessage = buf => {
const msg = JSON.parse(buf)
// JSON.stringify(Buffer) encodes as {"type":"Buffer","data":[1,2,3]}
if (msg.data && msg.data.type === 'Buffer') {
msg.data = Buffer.from(msg.data.data)
}
return msg
}

describe('stream', () => {
it('should initiate stream with NEW_STREAM message', async () => {
const msgs = []
Expand Down Expand Up @@ -197,11 +208,12 @@ describe('stream', () => {
// echo back (on the other side this will be { type: MESSAGE, data: msg })
pipe(
receiver,
tap(msg => {
map(msg => {
// when the initiator sends a CLOSE message, we call close
if (msg.type === MessageTypes.CLOSE_INITIATOR) {
receiver.close()
}
return msgToBuffer(msg)
}),
receiver
)
Expand All @@ -215,6 +227,9 @@ describe('stream', () => {
if (msg.type === MessageTypes.CLOSE_RECEIVER) {
initiator.close()
}
if (msg.data) {
msg.data = bufferToMessage(msg.data)
}
}),
collect
)
Expand Down Expand Up @@ -253,11 +268,12 @@ describe('stream', () => {
// echo back (on the other side this will be { type: MESSAGE, data: msg })
pipe(
receiver,
tap(msg => {
map(msg => {
// when the initiator sends a RESET message, we call reset
if (msg.type === MessageTypes.RESET_INITIATOR) {
receiver.reset()
}
return msgToBuffer(msg)
}),
receiver
)
Expand All @@ -274,9 +290,14 @@ describe('stream', () => {
await pipe(
input,
tap(msg => generatedMsgs.push(msg)),
tap(msg => { if (i++ >= maxMsgs) initiator.abort(error) }),
tap(() => { if (i++ >= maxMsgs) initiator.abort(error) }),
initiator,
tap(msg => msgs.push(msg)),
tap(msg => {
if (msg.data) {
msg.data = bufferToMessage(msg.data)
}
msgs.push(msg)
}),
consume
)
} catch (err) {
Expand Down Expand Up @@ -311,7 +332,10 @@ describe('stream', () => {
// echo back (on the other side this will be { type: MESSAGE, data: msg })
pipe(
receiver,
tap(msg => { if (i++ >= maxMsgs) receiver.abort(error) }),
map(msg => {
if (i++ >= maxMsgs) receiver.abort(error)
return msgToBuffer(msg)
}),
receiver
)

Expand All @@ -330,6 +354,9 @@ describe('stream', () => {
if (msg.type === MessageTypes.RESET_RECEIVER) {
initiator.reset()
}
if (msg.data) {
msg.data = bufferToMessage(msg.data)
}
}),
consume
)
Expand Down Expand Up @@ -365,11 +392,12 @@ describe('stream', () => {
// echo back (on the other side this will be { type: MESSAGE, data: msg })
pipe(
receiver,
tap(msg => {
map(msg => {
// when the initiator sends a RESET message, we call reset
if (msg.type === MessageTypes.RESET_INITIATOR) {
receiver.reset()
}
return msgToBuffer(msg)
}),
receiver
)
Expand All @@ -388,7 +416,12 @@ describe('stream', () => {
tap(msg => generatedMsgs.push(msg)),
tap(msg => { if (i++ >= maxMsgs) throw error }),
initiator,
tap(msg => msgs.push(msg)),
tap(msg => {
if (msg.data) {
msg.data = bufferToMessage(msg.data)
}
msgs.push(msg)
}),
consume
)
} catch (err) {
Expand Down Expand Up @@ -425,7 +458,10 @@ describe('stream', () => {
// echo back (on the other side this will be { type: MESSAGE, data: msg })
pipe(
receiver,
tap(msg => { if (i++ >= maxMsgs) throw error }),
map(msg => {
if (i++ >= maxMsgs) throw error
return msgToBuffer(msg)
}),
receiver
)

Expand All @@ -444,6 +480,9 @@ describe('stream', () => {
if (msg.type === MessageTypes.RESET_RECEIVER) {
initiator.reset()
}
if (msg.data) {
msg.data = bufferToMessage(msg.data)
}
}),
consume
)
Expand Down Expand Up @@ -503,4 +542,28 @@ describe('stream', () => {
}
throw new Error('did not call onEnd with error')
})

it('should split writes larger than max message size', async () => {
const send = msg => {
if (msg.type === MessageTypes.CLOSE_INITIATOR) {
stream.source.end()
} else if (msg.type === MessageTypes.MESSAGE_INITIATOR) {
stream.source.push(msg)
}
}

const id = randomInt(1000)
const name = id.toString()
const maxMsgSize = 5
const stream = createStream({ id, name, send, maxMsgSize })

const bigMessage = await randomBytes(12)
const dataMessages = await pipe([bigMessage], stream, collect)

expect(dataMessages.length).to.equal(3)
expect(dataMessages[0].data.length).to.equal(maxMsgSize)
expect(dataMessages[1].data.length).to.equal(maxMsgSize)
expect(dataMessages[2].data.length).to.equal(2)
expect(Buffer.concat(dataMessages.map(m => m.data.slice()))).to.deep.equal(bigMessage)
})
})

0 comments on commit fba56a5

Please sign in to comment.