Skip to content

Commit

Permalink
chore: fix up tests
Browse files Browse the repository at this point in the history
  • Loading branch information
achingbrain committed Feb 6, 2024
1 parent e485722 commit 73ab5f9
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 67 deletions.
127 changes: 66 additions & 61 deletions packages/bitswap/src/bitswap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { setMaxListeners } from '@libp2p/interface'
import { PeerSet } from '@libp2p/peer-collections'
import { PeerQueue } from '@libp2p/utils/peer-queue'
import { anySignal } from 'any-signal'
import drain from 'it-drain'
import { CID } from 'multiformats/cid'
import { sha256 } from 'multiformats/hashes/sha2'
import pDefer from 'p-defer'
Expand Down Expand Up @@ -103,40 +104,58 @@ export class Bitswap implements BitswapInterface {
* handle messages received through the network
*/
async _receiveMessage (peerId: PeerId, message: BitswapMessage): Promise<void> {
// hash all incoming blocks
const received = await Promise.all(
message.blocks
.filter(block => block.prefix != null && block.data != null)
.map(async block => {
const values = vd(block.prefix)
const cidVersion = values[0]
const multicodec = values[1]
const hashAlg = values[2]
// const hashLen = values[3] // We haven't need to use this so far

const hasher = hashAlg === sha256.code ? sha256 : await this.hashLoader?.getHasher(hashAlg)

if (hasher == null) {
throw new CodeError('Unknown hash algorithm', 'ERR_UNKNOWN_HASH_ALG')
}
// CIDs we received
const received: CID[] = []

// process all incoming blocks
const self = this
await drain(this.blockstore.putMany(async function * () {
for (const block of message.blocks) {
if (block.prefix == null || block.data == null) {
continue
}

const hash = await hasher.digest(block.data)
const cid = CID.create(cidVersion === 0 ? 0 : 1, multicodec, hash)
const wasWanted = this.notifications.listenerCount(receivedBlockEvent(cid)) > 0
self.log('received block')
const values = vd(block.prefix)
const cidVersion = values[0]
const multicodec = values[1]
const hashAlg = values[2]
// const hashLen = values[3] // We haven't need to use this so far

return { wasWanted, cid, data: block.data }
})
)
const hasher = hashAlg === sha256.code ? sha256 : await self.hashLoader?.getHasher(hashAlg)

if (hasher == null) {
self.log.error('unknown hash algorithm', hashAlg)
continue
}

const hash = await hasher.digest(block.data)
const cid = CID.create(cidVersion === 0 ? 0 : 1, multicodec, hash)
const wasWanted = self.notifications.listenerCount(receivedBlockEvent(cid)) > 0
received.push(cid)

const has = await self.blockstore.has(cid)

self._updateReceiveCounters(peerId, block.data, has)

if (!wasWanted) {
continue
}

if (!has) {
yield { cid, block: block.data }
}

self.notifications.receivedBlock(cid, block.data, peerId)
}
}()))

// quickly send out cancels, reduces chances of duplicate block receives
if (received.length > 0) {
this.wantList.cancelWants(
received
.filter(({ wasWanted }) => wasWanted)
.map(({ cid }) => cid)
).catch(err => {
this.log.error('error sending block cancels', err)
})
this.wantList.cancelWants(received)
.catch(err => {
this.log.error('error sending block cancels', err)
})
}

// notify sessions of block haves/don't haves
Expand All @@ -150,43 +169,15 @@ export class Bitswap implements BitswapInterface {
}
}

await Promise.all(
received.map(
async ({ cid, wasWanted, data }) => {
await this._handleReceivedBlock(peerId, cid, data, wasWanted)
this.notifications.receivedBlock(cid, data, peerId)
}
)
)

try {
// Note: this allows the engine to respond to any wants in the message.
// Processing of the blocks in the message happens below, after the
// blocks have been added to the blockstore.
// Respond to any wants in the message
await this.peerWantLists.messageReceived(peerId, message)
} catch (err) {
// Log instead of throwing an error so as to process as much as
// possible of the message. Currently `messageReceived` does not
// throw any errors, but this could change in the future.
this.log('failed to receive message from %p', peerId, message)
}
}

private async _handleReceivedBlock (peerId: PeerId, cid: CID, data: Uint8Array, wasWanted: boolean): Promise<void> {
this.log('received block')

const has = await this.blockstore.has(cid)

this._updateReceiveCounters(peerId, cid, data, has)

if (!wasWanted || has) {
return
}

await this.blockstore.put(cid, data)
}

_updateReceiveCounters (peerId: PeerId, cid: CID, data: Uint8Array, exists: boolean): void {
_updateReceiveCounters (peerId: PeerId, data: Uint8Array, exists: boolean): void {
this.stats.updateBlocksReceived(1, peerId)
this.stats.updateDataReceived(data.byteLength, peerId)

Expand Down Expand Up @@ -227,9 +218,9 @@ export class Bitswap implements BitswapInterface {
this.notifications.removeListener(receivedBlockEvent(root), receivedBlockListener)
this.notifications.removeListener(haveEvent(root), haveBlockListener)
this.notifications.removeListener(doNotHaveEvent(root), doNotHaveBlockListener)
}

queue.clear()
queue.clear()
}
})

const queriedPeers = new PeerSet()
Expand All @@ -245,6 +236,14 @@ export class Bitswap implements BitswapInterface {
if (session.peers.size === minProviders) {
deferred.resolve(session)
}

if (session.peers.size === maxProviders) {
this.notifications.removeListener(receivedBlockEvent(root), receivedBlockListener)
this.notifications.removeListener(haveEvent(root), haveBlockListener)
this.notifications.removeListener(doNotHaveEvent(root), doNotHaveBlockListener)

queue.clear()
}
}
const haveBlockListener: HaveBlockListener = (peer): void => {
this.log('adding %p to session after receiving HAVE_BLOCK', peer)
Expand All @@ -259,6 +258,8 @@ export class Bitswap implements BitswapInterface {
this.notifications.removeListener(receivedBlockEvent(root), receivedBlockListener)
this.notifications.removeListener(haveEvent(root), haveBlockListener)
this.notifications.removeListener(doNotHaveEvent(root), doNotHaveBlockListener)

queue.clear()
}
}
const doNotHaveBlockListener: DoNotHaveBlockListener = (peer) => {
Expand Down Expand Up @@ -312,6 +313,10 @@ export class Bitswap implements BitswapInterface {

// find network providers too but do not wait for the query to complete
void Promise.resolve().then(async () => {
if (options?.queryRoutingPeers === false) {
return
}

let providers = 0

for await (const provider of this.network.findProviders(root, options)) {
Expand Down
11 changes: 9 additions & 2 deletions packages/bitswap/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,20 +68,27 @@ export interface CreateSessionOptions extends AbortOptions, ProgressOptions<Bits

/**
* After this many providers for the root CID have been found, stop searching
* for more providers.
* for more providers
*
* @default 3
*/
maxProviders?: number

/**
* If true, query connected peers before searching for providers in the
* routing.
* routing
*
* @default true
*/
queryConnectedPeers?: boolean

/**
* If true, search for providers in the routing to query for the root CID
*
* @default true
*/
queryRoutingPeers?: boolean

/**
* The priority to use when querying availability of the root CID
*
Expand Down
6 changes: 3 additions & 3 deletions packages/bitswap/test/bitswap.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,6 @@ describe('bitswap', () => {
expect(providers[1].id.equals(components.libp2p.dialProtocol.getCall(2).args[0].toString())).to.be.true()
expect(providers[2].id.equals(components.libp2p.dialProtocol.getCall(3).args[0].toString())).to.be.true()

// one current peer and providers 1-4
expect(components.libp2p.dialProtocol.callCount).to.equal(5)

// should have stopped at DEFAULT_MAX_PROVIDERS_PER_REQUEST
expect(session.peers.size).to.equal(DEFAULT_MAX_PROVIDERS_PER_REQUEST)
})
Expand Down Expand Up @@ -436,6 +433,9 @@ function stubPeerResponse (libp2p: StubbedInstance<Libp2p>, peerId: PeerId, resp

const pbstr = pbStream(localStream).pb(BitswapMessage)
void pbstr.read().then(async message => {
// simulate network latency
await delay(10)

// after reading message from remote, open a new stream on the remote and
// send the response
const [localDuplex, remoteDuplex] = duplexPair<any>()
Expand Down
2 changes: 1 addition & 1 deletion packages/bitswap/test/network.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ describe('network', () => {
prefix: cidToPrefix(cid1),
data: Uint8Array.from([0, 1, 2, 3, 4])
}]
})
}).catch(() => {})

// send two messages while the queue is blocked
void network.sendMessage(peerId, messageA)
Expand Down

0 comments on commit 73ab5f9

Please sign in to comment.