Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Peer and Content Routing (DHT) #142

Merged
merged 4 commits into from
Aug 25, 2017
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
feat: Peer and Content Routing Support (DHT) - first round
  • Loading branch information
dignifiedquire authored and daviddias committed Aug 22, 2017
commit 1dbf264ef89af8e175812222009d4ad35afa930a
53 changes: 30 additions & 23 deletions src/components/network/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
const debug = require('debug')
const lp = require('pull-length-prefixed')
const pull = require('pull-stream')
const setImmediate = require('async/setImmediate')
const waterfall = require('async/waterfall')
const each = require('async/each')

const Message = require('../../types/message')
const CONSTANTS = require('../../constants')
Expand Down Expand Up @@ -89,34 +90,46 @@ class Network {
if (!this._running) {
return
}

this.bitswap._onPeerConnected(peerInfo.id)
}

_onPeerDisconnect (peerInfo) {
if (!this._running) {
return
}

this.bitswap._onPeerDisconnected(peerInfo.id)
}

// Connect to the given peer
connectTo (peerId, callback) {
const done = (err) => setImmediate(() => callback(err))

if (!this._running) {
return done(new Error('No running network'))
return callback(new Error('No running network'))
}

// NOTE: For now, all this does is ensure that we are
// connected. Once we have Peer Routing, we will be able
// to find the Peer
if (this.libp2p.swarm.muxedConns[peerId.toB58String()]) {
done()
} else {
done(new Error('Could not connect to peer with peerId:', peerId.toB58String()))
}
this.libp2p.dial(peerId, callback)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dialing without a protocol doesn't create a conn because it never opens a multiplex conn, it is what is called 'warming up a conn'.

This can be just: this.libp2p.dial(peerId, callback)


findProviders (cid, maxProviders, callback) {
// TODO
// consider if we want to trickleDown maxProviders, currently this is
// not an exposed option:
// https://github.com/libp2p/js-libp2p-kad-dht/blob/master/src/index.js#L416
this.libp2p.contentRouting.findProviders(cid, CONSTANTS.providerRequestTimeout, callback)
}

findAndConnect (cid, maxProviders, callback) {
waterfall([
(cb) => this.findProviders(cid, maxProviders, cb),
(provs, cb) => each(provs, (p, cb) => this.connectTo(p, cb))
], callback)
}

provide (cid, callback) {
this.libp2p.contentRouting.provide(cid, callback)
}

// Connect to the given peer
// Send the given msg (instance of Message) to the given peer
sendMessage (peerId, msg, callback) {
if (!this._running) {
Expand All @@ -125,14 +138,8 @@ class Network {

const stringId = peerId.toB58String()
log('sendMessage to %s', stringId, msg)
let peerInfo
try {
peerInfo = this.peerBook.get(stringId)
} catch (err) {
return callback(err)
}

this._dialPeer(peerInfo, (err, conn, protocol) => {
this._dialPeer(peerId, (err, conn, protocol) => {
if (err) {
return callback(err)
}
Expand All @@ -157,14 +164,14 @@ class Network {
})
}

_dialPeer (peerInfo, callback) {
_dialPeer (peer, callback) {
// dialByPeerInfo throws if no network is there
try {
// Attempt Bitswap 1.1.0
this.libp2p.dial(peerInfo, BITSWAP110, (err, conn) => {
this.libp2p.dial(peer, BITSWAP110, (err, conn) => {
if (err) {
// Attempt Bitswap 1.0.0
this.libp2p.dial(peerInfo, BITSWAP100, (err, conn) => {
this.libp2p.dial(peer, BITSWAP100, (err, conn) => {
if (err) {
return callback(err)
}
Expand Down
1 change: 1 addition & 0 deletions src/components/want-manager/msg-queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ module.exports = class MsgQueue {
log.error('cant connect to peer %s: %s', this.peerId.toB58String(), err.message)
return
}

log('sending message')
this.network.sendMessage(this.peerId, msg, (err) => {
if (err) {
Expand Down
133 changes: 110 additions & 23 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ const reject = require('async/reject')
const each = require('async/each')
const EventEmitter = require('events').EventEmitter
const debug = require('debug')
const series = require('async/series')
const map = require('async/map')
const once = require('once')

const CONSTANTS = require('./constants')
const WantManager = require('./components/want-manager')
Expand Down Expand Up @@ -126,6 +129,12 @@ class Bitswap {
`block:${block.cid.buffer.toString()}`,
block
)
this.network.provide(block.cid, (err) => {
if (err) {
log.error('Failed to provide: %s', err.message)
}
})

this.engine.receivedBlocks([block.cid])
callback()
})
Expand All @@ -150,26 +159,64 @@ class Bitswap {
* @returns {void}
*/
get (cid, callback) {
this.getMany([cid], (err, blocks) => {
if (err) {
return callback(err)
}

if (blocks && blocks.length > 0) {
callback(null, blocks[0])
} else {
// when a unwant happens
callback()
}
})
}

/**
* Fetch a a list of blocks by cid. If the blocks are in the local
* blockstore they are returned, otherwise the blocks are added to the wantlist and returned once another node sends them to us.
*
* @param {Array<CID>} cids
* @param {function(Error, Blocks)} callback
* @returns {void}
*/
getMany (cids, callback) {
callback = once(callback)
const unwantListeners = {}
const blockListeners = {}
const cidStr = cid.buffer.toString()
const unwantEvent = `unwant:${cidStr}`
const blockEvent = `block:${cidStr}`

log('get: %s', cidStr)
const cleanupListener = () => {
const unwantEvent = (c) => `unwant:${c}`
const blockEvent = (c) => `block:${c}`
const retrieved = []
const locals = []
const missing = []

log('getMany', cids.length)
const cleanupListener = (cidStr) => {
if (unwantListeners[cidStr]) {
this.notifications.removeListener(unwantEvent, unwantListeners[cidStr])
this.notifications.removeListener(
unwantEvent(cidStr),
unwantListeners[cidStr]
)
delete unwantListeners[cidStr]
}

if (blockListeners[cidStr]) {
this.notifications.removeListener(blockEvent, blockListeners[cidStr])
this.notifications.removeListener(
blockEvent(cidStr),
blockListeners[cidStr]
)
delete blockListeners[cidStr]
}
}

const addListener = () => {
const addListeners = (cids) => {
cids.forEach((c) => addListener(c))
}

const addListener = (cid) => {
const cidStr = cid.buffer.toString()

unwantListeners[cidStr] = () => {
log(`manual unwant: ${cidStr}`)
cleanupListener()
Expand All @@ -180,26 +227,61 @@ class Bitswap {
blockListeners[cidStr] = (block) => {
this.wm.cancelWants([cid])
cleanupListener(cid)
callback(null, block)
retrieved.push(block)

if (retrieved.length === missing.length) {
finish(callback)
}
}

this.notifications.once(unwantEvent, unwantListeners[cidStr])
this.notifications.once(blockEvent, blockListeners[cidStr])
this.notifications.once(
unwantEvent(cidStr),
unwantListeners[cidStr]
)
this.notifications.once(
blockEvent(cidStr),
blockListeners[cidStr]
)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Q: Can we make this notification/tracking gets system be an external and tested module?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would that be beneficial? We can test it more in here if we want to.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly to add more tests :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


this.blockstore.has(cid, (err, has) => {
if (err) {
return callback(err)
}
const finish = (cb) => {
map(locals, (cid, cb) => {
this.blockstore.get(cid, cb)
}, (err, localBlocks) => {
if (err) {
return callback(err)
}

if (has) {
log('already have block: %s', cidStr)
return this.blockstore.get(cid, callback)
}
callback(null, localBlocks.concat(retrieved))
})
}

addListener()
this.wm.wantBlocks([cid])
})
series([
(cb) => each(cids, (cid, cb) => {
this.blockstore.has(cid, (err, has) => {
if (err) {
return cb(err)
}

if (has) {
locals.push(cid)
} else {
missing.push(cid)
}
cb()
})
}, cb),
(cb) => {
if (missing.length > 0) {
addListeners(missing)
this.wm.wantBlocks(missing)

this.network.findAndConnect(cids[0], CONSTANTS.maxProvidersPerRequest, cb)
} else {
cb()
}
}
], finish)
}

// removes the given cids from the wantlist independent of any ref counts
Expand Down Expand Up @@ -269,6 +351,11 @@ class Bitswap {
block
)
this.engine.receivedBlocks([block.cid])
this.network.provide(block.cid, (err) => {
if (err) {
log.error('Failed to provide: %s', err.message)
}
})
})
cb()
})
Expand Down
Loading