Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

Commit

Permalink
chore: apply suggestions from code review
Browse files Browse the repository at this point in the history
Co-Authored-By: dirkmc <dirkmdev@gmail.com>
Co-Authored-By: Jacob Heun <jacobheun@gmail.com>
  • Loading branch information
3 people committed Nov 22, 2019
1 parent 89f85bf commit 44054b2
Show file tree
Hide file tree
Showing 8 changed files with 60 additions and 67 deletions.
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
"abort-controller": "^3.0.0",
"async": "^2.6.2",
"base32.js": "~0.1.0",
"chai-checkmark": "^1.0.1",
"cids": "~0.7.1",
"debug": "^4.1.1",
"err-code": "^2.0.0",
Expand All @@ -60,18 +59,19 @@
"p-queue": "^6.2.1",
"p-timeout": "^3.2.0",
"p-times": "^2.1.0",
"paramap-it": "^0.1.1",
"peer-id": "~0.13.5",
"peer-info": "~0.17.0",
"promise-to-callback": "^1.0.0",
"protons": "^1.0.1",
"streaming-iterables": "^4.1.1",
"varint": "^5.0.0",
"xor-distance": "^2.0.0"
},
"devDependencies": {
"aegir": "^20.4.1",
"async-iterator-all": "^1.0.0",
"chai": "^4.2.0",
"chai-checkmark": "^1.0.1",
"datastore-level": "~0.12.1",
"delay": "^4.3.0",
"dirty-chai": "^2.0.1",
Expand Down
4 changes: 2 additions & 2 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class KadDHT extends EventEmitter {
randomWalk = {}
}) {
super()
assert(dialer, 'libp2p-kad-dht requires a instance of Dialer')
assert(dialer, 'libp2p-kad-dht requires an instance of Dialer')

/**
* Local reference to the libp2p dialer instance
Expand All @@ -82,7 +82,7 @@ class KadDHT extends EventEmitter {
this.peerInfo = peerInfo

/**
* Local peer info
* Local PeerStore
* @type {PeerStore}
*/
this.peerStore = peerStore
Expand Down
27 changes: 11 additions & 16 deletions src/network.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ class Network {
return
}

// TODO remove: add a way to check if switch has started or not
if (!this.dht.isStarted) {
throw errcode(new Error('Can not start network'), 'ERR_CANNOT_START_NETWORK')
}
Expand All @@ -52,7 +51,7 @@ class Network {

// register protocol with topology
const topology = new MulticodecTopology({
multicodecs: c.PROTOCOL_DHT,
multicodecs: [c.PROTOCOL_DHT],
handlers: {
onConnect: this._onPeerConnected,
onDisconnect: () => {}
Expand Down Expand Up @@ -98,15 +97,11 @@ class Network {
* Registrar notifies a connection successfully with dht protocol.
* @private
* @param {PeerInfo} peerInfo remote peer info
* @param {Connection} conn connection to the peer
* @returns {Promise<void>}
*/
async _onPeerConnected (peerInfo, conn) {
async _onPeerConnected (peerInfo) {
await this.dht._add(peerInfo)
this._log('added to the routing table: %s', peerInfo.id.toB58String())

// Open a stream with the connected peer
await conn.newStream(c.PROTOCOL_DHT)
}

/**
Expand Down Expand Up @@ -152,40 +147,40 @@ class Network {
* If no response is received after the specified timeout
* this will error out.
*
* @param {Connection} conn - the connection to use
* @param {DuplexIterable} stream - the stream to use
* @param {Buffer} msg - the message to send
* @returns {Promise<Message>}
* @private
*/
async _writeReadMessage (conn, msg) { // eslint-disable-line require-await
async _writeReadMessage (stream, msg) { // eslint-disable-line require-await
return pTimeout(
writeReadMessage(conn, msg),
writeReadMessage(stream, msg),
this.readMessageTimeout
)
}

/**
* Write a message to the given connection.
* Write a message to the given stream.
*
* @param {Connection} conn - the connection to use
* @param {DuplexIterable} stream - the stream to use
* @param {Buffer} msg - the message to send
* @returns {Promise<void>}
* @private
*/
_writeMessage (conn, msg) {
_writeMessage (stream, msg) {
return pipe(
[msg],
lp.encode(),
conn
stream
)
}
}

async function writeReadMessage (conn, msg) {
async function writeReadMessage (stream, msg) {
const res = await pipe(
[msg],
lp.encode(),
conn,
stream,
utils.itFilter(
(msg) => msg.length < c.maxMessageSize
),
Expand Down
39 changes: 19 additions & 20 deletions src/rpc/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

const pipe = require('it-pipe')
const lp = require('it-length-prefixed')
const paramap = require('paramap-it')
const PeerInfo = require('peer-info')

const Message = require('../message')
Expand All @@ -22,15 +21,14 @@ module.exports = (dht) => {
*
* @private
*/
async function handleMessage (peer, msg) { // eslint-disable-line
// get handler & exectue it
async function handleMessage (peer, msg) {
// get handler & execute it
const handler = getMessageHandler(msg.type)

try {
await dht._add(peer)
} catch (err) {
log.error('Failed to update the kbucket store')
log.error(err)
log.error('Failed to update the kbucket store', err)
}

if (!handler) {
Expand All @@ -44,14 +42,12 @@ module.exports = (dht) => {
/**
* Handle incoming streams on the dht protocol.
* @param {Object} props
* @param {string} props.protocol
* @param {DuplexStream} props.stream
* @param {Connection} props.connection connection
* @returns {Promise<void>}
*/
return async function onIncomingStream ({ protocol, stream, connection }) {
return async function onIncomingStream ({ stream, connection }) {
const peerInfo = await PeerInfo.create(connection.remotePeer)
peerInfo.protocols.add(protocol)

try {
await dht._add(peerInfo)
Expand All @@ -65,18 +61,21 @@ module.exports = (dht) => {
await pipe(
stream.source,
lp.decode(),
utils.itFilter(
(msg) => msg.length < c.maxMessageSize
),
source => paramap(source, rawMsg => {
const msg = Message.deserialize(rawMsg.slice())
return handleMessage(peerInfo, msg)
}),
// Not all handlers will return a response
utils.itFilter(Boolean),
source => paramap(source, response => {
return response.serialize()
}),
source => (async function * () {
for await (const msg of source) {
// Check message size
if (msg.length < c.maxMessageSize) {
// handle the message
const desMessage = Message.deserialize(msg.slice())
const res = await handleMessage(peerInfo, desMessage)

// Not all handlers will return a response
if (res) {
yield res.serialize()
}
}
}
})(),
lp.encode(),
stream.sink
)
Expand Down
5 changes: 1 addition & 4 deletions test/query/index.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,7 @@ describe('Query', () => {
before('create a dht', () => {
const peerStore = new PeerBook()
dht = new DHT({
dialer: {
_peerInfo: ourPeerInfo,
_peerBook: peerStore
},
dialer: {},
peerStore,
peerInfo: ourPeerInfo
})
Expand Down
29 changes: 8 additions & 21 deletions test/rpc/index.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@ const expect = chai.expect
const pDefer = require('p-defer')
const pipe = require('it-pipe')
const lp = require('it-length-prefixed')
const { collect } = require('streaming-iterables')

const Message = require('../../src/message')
const rpc = require('../../src/rpc')

const createPeerInfo = require('../utils/create-peer-info')
const TestDHT = require('../utils/test-dht')
const toBuffer = require('../utils/to-buffer')

describe('rpc', () => {
let peerInfos
Expand All @@ -38,35 +40,20 @@ describe('rpc', () => {
defer.resolve()
}

const data = []
await pipe(
const source = await pipe(
[msg.serialize()],
lp.encode(),
async source => {
for await (const chunk of source) {
data.push(chunk.slice())
}
}
collect
)

const duplexStream = {
source: function * () {
const array = data

while (array.length) {
yield array.shift()
}
},
source,
sink: async (source) => {
const res = []
await pipe(
const res = await pipe(
source,
lp.decode(),
async source => {
for await (const chunk of source) {
res.push(chunk.slice())
}
}
toBuffer, // Ensure we have buffers here for validateMessage to consume
collect
)
validateMessage(res)
}
Expand Down
4 changes: 2 additions & 2 deletions test/utils/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
const delay = require('delay')
const pRetry = require('p-retry')
const pTimeout = require('p-timeout')
const DuplexPair = require('it-pair/duplex')
const duplexPair = require('it-pair/duplex')

const { sortClosestPeers } = require('../../src/utils')

Expand Down Expand Up @@ -35,7 +35,7 @@ const createMockRegistrar = (registrarRecord) => ({
exports.createMockRegistrar = createMockRegistrar

const ConnectionPair = () => {
const [d0, d1] = DuplexPair()
const [d0, d1] = duplexPair()

return [
{
Expand Down
15 changes: 15 additions & 0 deletions test/utils/to-buffer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
'use strict'
/**
* Converts BufferList messages to Buffers
* @param {*} source
* @returns {AsyncGenerator}
*/
const toBuffer = (source) => {
return (async function * () {
for await (const chunk of source) {
yield Buffer.isBuffer(chunk) ? chunk : chunk.slice()
}
})()
}

module.exports = toBuffer

0 comments on commit 44054b2

Please sign in to comment.