Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: typedef generation & type checking #261

Merged
merged 15 commits into from
Mar 9, 2021
27 changes: 27 additions & 0 deletions .github/workflows/typecheck.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
on:
hugomrdias marked this conversation as resolved.
Show resolved Hide resolved
push:
branches:
- master
- main
- default
pull_request:
branches:
- '**'

name: Typecheck
jobs:
check:
runs-on: ubuntu-latest
strategy:
matrix:
node-version: [12.x]
steps:
- uses: actions/checkout@v1
- name: Use Node.js ${{ matrix.node-version }}
uses: actions/setup-node@v1
with:
node-version: ${{ matrix.node-version }}
- name: Install dependencies
run: npm install
- name: Typecheck
uses: gozala/typescript-error-reporter-action@v1.0.8
29 changes: 23 additions & 6 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,33 @@
"./test/utils/create-libp2p-node": false,
"./test/utils/create-temp-repo-nodejs.js": "./test/utils/create-temp-repo-browser.js"
},
"types": "dist/src/index.d.ts",
"typesVersions": {
Gozala marked this conversation as resolved.
Show resolved Hide resolved
"*": {
"src/*": [
"dist/src/*",
"dist/src/*/index"
]
}
},
"eslintConfig": {
"extends": "ipfs"
},
"files": [
"dist",
"src"
],
"scripts": {
"prepare": "aegir build",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need full build here

Suggested change
"prepare": "aegir build",
"prepare": "aegir ts -p check",

"test": "aegir test",
"test:browser": "aegir test -t browser -t webworker",
"test:node": "aegir test -t node",
"lint": "aegir lint",
"check": "aegir ts -p check",
"release": "aegir release",
"release-minor": "aegir release --type minor",
"release-major": "aegir release --type major",
"bench": "node benchmarks/index",
"build": "aegir build",
"coverage": "aegir coverage --provider codecov",
"docs": "aegir docs",
"benchmarks": "node test/benchmarks/get-many"
Expand All @@ -43,11 +56,12 @@
"homepage": "https://github.com/ipfs/js-ipfs-bitswap#readme",
"devDependencies": {
"@nodeutils/defaults-deep": "^1.1.0",
"aegir": "^28.1.0",
"@types/debug": "^4.1.5",
"aegir": "^29.2.0",
"benchmark": "^2.1.4",
"delay": "^4.3.0",
"ipfs-repo": "^7.0.0",
"ipfs-utils": "^4.0.0",
"ipfs-utils": "^5.0.1",
"iso-random-stream": "^1.1.1",
"it-all": "^1.0.2",
"it-drain": "^1.0.1",
Expand All @@ -69,22 +83,24 @@
"rimraf": "^3.0.0",
"sinon": "^9.0.0",
"stats-lite": "^2.2.0",
"typescript": "^4.0.5",
"uuid": "^8.0.0"
},
"dependencies": {
"abort-controller": "^3.0.0",
"any-signal": "^2.1.1",
"bignumber.js": "^9.0.0",
"cids": "^1.0.0",
"debug": "^4.1.0",
"debug": "^4.2.0",
"ipld-block": "^0.11.0",
"it-length-prefixed": "^3.0.0",
"it-pipe": "^1.1.0",
"just-debounce-it": "^1.1.0",
"libp2p-interfaces": "^0.7.1",
"libp2p-interfaces": "^0.8.1",
"moving-average": "^1.0.0",
"multicodec": "^2.0.0",
"multihashing-async": "^2.0.1",
"native-abort-controller": "0.0.3",
"protons": "^2.0.0",
"streaming-iterables": "^5.0.2",
"uint8arrays": "^1.1.0",
Expand Down Expand Up @@ -115,6 +131,7 @@
"dmitriy ryajov <dryajov@dmitriys-MBP.HomeNET>",
"Dmitriy Ryajov <dryajov@gmail.com>",
"Bryan Stenson <bryan.stenson@gmail.com>",
"Richard Schneider <makaretu@gmail.com>"
"Richard Schneider <makaretu@gmail.com>",
"Irakli Gozalishvili <dev@gozala.io>"
]
}
120 changes: 104 additions & 16 deletions src/decision-engine/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,16 @@ const TARGET_MESSAGE_SIZE = 16 * 1024
const MAX_SIZE_REPLACE_HAS_WITH_BLOCK = 1024

class DecisionEngine {
/**
*
* @param {PeerId} peerId
* @param {BlockStore} blockstore
* @param {import('../network')} network
* @param {Stats} stats
* @param {Object} [opts]
* @param {number} [opts.targetMessageSize]
* @param {number} [opts.maxSizeReplaceHasWithBlock]
*/
constructor (peerId, blockstore, network, stats, opts) {
this._log = logger(peerId, 'engine')
this.blockstore = blockstore
Expand All @@ -34,6 +44,7 @@ class DecisionEngine {
this._opts = this._processOpts(opts)

// A list of of ledgers by their partner id
/** @type {Map<string, Ledger>} */
this.ledgerMap = new Map()
this._running = false

Expand Down Expand Up @@ -112,7 +123,7 @@ class DecisionEngine {

// If there's nothing in the message, bail out
if (msg.empty) {
this._requestQueue.tasksDone(peerId, tasks)
peerId && this._requestQueue.tasksDone(peerId, tasks)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

peerId is obtained on line 76 via following code:

 const { peerId, tasks, pendingSize } = this._requestQueue.popTasks(this._opts.targetMessageSize)

However popTasks returns an object without peerId in 2 code paths out of 3. At the same time tasksDone and messageSent below do not expect undefined.

It was not exactly clear what made most sense here, so I just conditioned those calls on peerIds presence, but it would be good to make sure all this makes sense.


// Trigger the next round of task processing
this._scheduleProcessTasks()
Expand All @@ -122,32 +133,36 @@ class DecisionEngine {

try {
// Send the message
await this.network.sendMessage(peerId, msg)
peerId && await this.network.sendMessage(peerId, msg)

// Peform sent message accounting
for (const block of blocks.values()) {
this.messageSent(peerId, block)
peerId && this.messageSent(peerId, block)
}
} catch (err) {
this._log.error(err)
}

// Free the tasks up from the request queue
this._requestQueue.tasksDone(peerId, tasks)
peerId && this._requestQueue.tasksDone(peerId, tasks)

// Trigger the next round of task processing
this._scheduleProcessTasks()
}

/**
* @param {PeerId} peerId
* @returns {Map<string, WantListEntry>}
*/
wantlistForPeer (peerId) {
const peerIdStr = peerId.toB58String()
if (!this.ledgerMap.has(peerIdStr)) {
return new Map()
}

return this.ledgerMap.get(peerIdStr).wantlist.sortedEntries()
const ledger = this.ledgerMap.get(peerIdStr)
return ledger ? ledger.wantlist.sortedEntries() : new Map()
}

/**
* @param {PeerId} peerId
*/
ledgerForPeer (peerId) {
const peerIdStr = peerId.toB58String()

Expand All @@ -164,12 +179,20 @@ class DecisionEngine {
}
}

/**
* @returns {PeerId[]}
*/
peers () {
return Array.from(this.ledgerMap.values()).map((l) => l.partner)
}

// Receive blocks either from an incoming message from the network, or from
// blocks being added by the client on the localhost (eg IPFS add)
/**
* Receive blocks either from an incoming message from the network, or from
* blocks being added by the client on the localhost (eg IPFS add)
*
* @param {Block[]} blocks
* @returns {void}
*/
receivedBlocks (blocks) {
if (!blocks.length) {
return
Expand Down Expand Up @@ -211,7 +234,13 @@ class DecisionEngine {
this._scheduleProcessTasks()
}

// Handle incoming messages
/**
* Handle incoming messages
*
* @param {PeerId} peerId
* @param {Message} msg
* @returns {Promise<void>}
*/
async messageReceived (peerId, msg) {
const ledger = this._findOrCreate(peerId)

Expand Down Expand Up @@ -251,12 +280,24 @@ class DecisionEngine {
this._scheduleProcessTasks()
}

/**
* @private
* @param {PeerId} peerId
* @param {CID[]} cids
* @returns {void}
*/
_cancelWants (peerId, cids) {
for (const c of cids) {
this._requestQueue.remove(c.toString(), peerId)
}
}

/**
* @private
* @param {PeerId} peerId
* @param {BitswapMessageEntry[]} wants
* @returns {Promise<void>}
*/
async _addWants (peerId, wants) {
// Get the size of each wanted block
const blockSizes = await this._getBlockSizes(wants.map(w => w.cid))
Expand Down Expand Up @@ -320,11 +361,21 @@ class DecisionEngine {
blockSize <= this._opts.maxSizeReplaceHasWithBlock
}

/**
* @private
* @param {CID[]} cids
* @returns {Promise<Map<string, number>>}
*/
async _getBlockSizes (cids) {
const blocks = await this._getBlocks(cids)
return new Map([...blocks].map(([k, v]) => [k, v.data.length]))
}

/**
* @private
* @param {CID[]} cids
* @returns {Promise<Map<string, Block>>}
*/
async _getBlocks (cids) {
const res = new Map()
await Promise.all(cids.map(async (cid) => {
Expand All @@ -347,7 +398,14 @@ class DecisionEngine {
})
}

// Clear up all accounting things after message was sent
/**
* Clear up all accounting things after message was sent
*
* @param {PeerId} peerId
* @param {Object} [block]
* @param {Uint8Array} block.data
* @param {CID} [block.cid]
*/
messageSent (peerId, block) {
const ledger = this._findOrCreate(peerId)
ledger.sentBytes(block ? block.data.length : 0)
Expand All @@ -356,15 +414,29 @@ class DecisionEngine {
}
}

/**
* @param {PeerId} peerId
* @returns {number}
*/
numBytesSentTo (peerId) {
return this._findOrCreate(peerId).accounting.bytesSent
}

/**
* @param {PeerId} peerId
* @returns {number}
*/

numBytesReceivedFrom (peerId) {
return this._findOrCreate(peerId).accounting.bytesRecv
}

peerDisconnected (peerId) {
/**
*
* @param {PeerId} _peerId
* @returns {void}
*/
peerDisconnected (_peerId) {
// if (this.ledgerMap.has(peerId.toB58String())) {
// this.ledgerMap.delete(peerId.toB58String())
// }
Expand All @@ -373,10 +445,16 @@ class DecisionEngine {
// in the peer request queue
}

/**
* @private
* @param {PeerId} peerId
* @returns {Ledger}
*/
_findOrCreate (peerId) {
const peerIdStr = peerId.toB58String()
if (this.ledgerMap.has(peerIdStr)) {
return this.ledgerMap.get(peerIdStr)
const ledger = this.ledgerMap.get(peerIdStr)
if (ledger) {
return ledger
}

const l = new Ledger(peerId)
Expand All @@ -399,3 +477,13 @@ class DecisionEngine {
}

module.exports = DecisionEngine

/**
* @typedef {import('peer-id')} PeerId
* @typedef {import('../stats')} Stats
* @typedef {import('../types').BlockData} BlockData
* @typedef {import('ipld-block')} Block
* @typedef {import('../types/message/entry')} BitswapMessageEntry
* @typedef {import('../types/wantlist/entry')} WantListEntry
* @typedef {import('../types').BlockStore} BlockStore
*/
Loading