Skip to content

Commit

Permalink
fix: add onError to pubsub.subscribe types (#3706)
Browse files Browse the repository at this point in the history
This was missed in #3468.  Also runs the type checker on the pubsub http client
tests to ensure typing is correct.
  • Loading branch information
achingbrain authored Jun 4, 2021
1 parent bf88d98 commit a0f2b34
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 20 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
"it-concat": "^2.0.0",
"it-first": "^1.0.4",
"nock": "^13.0.2",
"p-defer": "^3.0.0",
"rimraf": "^3.0.2"
},
"engines": {
Expand Down
1 change: 1 addition & 0 deletions test/commands.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ const f = require('./utils/factory')()
describe('.commands', function () {
this.timeout(60 * 1000)

/** @type {import('ipfs-core-types').IPFS} */
let ipfs

before(async () => {
Expand Down
11 changes: 10 additions & 1 deletion test/node/agent.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ const { expect } = require('aegir/utils/chai')
const ipfsClient = require('../../src').create
const delay = require('delay')

/**
* @typedef {import('http').IncomingMessage} IncomingMessage
*
* @param {(message: IncomingMessage) => Promise<any>} handler
*/
function startServer (handler) {
return new Promise((resolve) => {
// spin up a test http server to inspect the requests made by the library
Expand All @@ -20,15 +25,18 @@ function startServer (handler) {
})

server.listen(0, () => {
const addressInfo = server.address()

resolve({
port: server.address().port,
port: addressInfo && (typeof addressInfo === 'string' ? addressInfo : addressInfo.port),
close: () => server.close()
})
})
})
}

describe('agent', function () {
/** @type {import('http').Agent} */
let agent

before(() => {
Expand All @@ -40,6 +48,7 @@ describe('agent', function () {
})

it('restricts the number of concurrent connections', async () => {
/** @type {((arg: any) => void)[]} */
const responses = []

const server = await startServer(() => {
Expand Down
36 changes: 18 additions & 18 deletions test/pubsub.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,17 @@

const { expect } = require('aegir/utils/chai')
const { AbortController } = require('native-abort-controller')
const uint8ArrayFromString = require('uint8arrays/from-string')
const defer = require('p-defer')

const f = require('./utils/factory')()

describe('.pubsub', function () {
this.timeout(20 * 1000)
describe('.subscribe', () => {
/** @type {import('ipfs-core-types').IPFS} */
let ipfs
/** @type {any} */
let ctl

beforeEach(async function () {
Expand All @@ -27,8 +31,7 @@ describe('.pubsub', function () {
it('.onError when connection is closed', async () => {
const topic = 'gossipboom'
let messageCount = 0
let onError
const error = new Promise(resolve => { onError = resolve })
const onError = defer()

await ipfs.pubsub.subscribe(topic, message => {
messageCount++
Expand All @@ -38,47 +41,44 @@ describe('.pubsub', function () {
ctl.stop().catch()
}
}, {
onError
onError: onError.resolve
})

await ipfs.pubsub.publish(topic, 'hello')
await ipfs.pubsub.publish(topic, 'bye')
await ipfs.pubsub.publish(topic, uint8ArrayFromString('hello'))
await ipfs.pubsub.publish(topic, uint8ArrayFromString('bye'))

await expect(error).to.eventually.be.fulfilled().and.to.be.instanceOf(Error)
await expect(onError.promise).to.eventually.be.fulfilled().and.to.be.instanceOf(Error)
})

it('does not call onError when aborted', async () => {
const controller = new AbortController()
const topic = 'gossipabort'
const messages = []
let onError
let onReceived

const received = new Promise(resolve => { onReceived = resolve })
const error = new Promise(resolve => { onError = resolve })
const onError = defer()
const onReceived = defer()

await ipfs.pubsub.subscribe(topic, message => {
messages.push(message)
if (messages.length === 2) {
onReceived()
onReceived.resolve()
}
}, {
onError,
onError: onError.resolve,
signal: controller.signal
})

await ipfs.pubsub.publish(topic, 'hello')
await ipfs.pubsub.publish(topic, 'bye')
await ipfs.pubsub.publish(topic, uint8ArrayFromString('hello'))
await ipfs.pubsub.publish(topic, uint8ArrayFromString('bye'))

await received
await onReceived.promise
controller.abort()

// Stop the daemon
await ctl.stop()
// Just to make sure no error is caused by above line
setTimeout(onError, 200, 'aborted')
setTimeout(onError.resolve, 200, 'aborted')

await expect(error).to.eventually.be.fulfilled().and.to.equal('aborted')
await expect(onError.promise).to.eventually.be.fulfilled().and.to.equal('aborted')
})
})
})
2 changes: 2 additions & 0 deletions test/utils/factory.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
'use strict'

// @ts-ignore no types
const { createFactory } = require('ipfsd-ctl')
const merge = require('merge-options')
const { isNode } = require('ipfs-utils/src/env')
Expand All @@ -13,6 +14,7 @@ const commonOptions = {

const commonOverrides = {
go: {
// @ts-ignore go-ipfs has no types
ipfsBin: isNode ? require('go-ipfs').path() : undefined
}
}
Expand Down
4 changes: 3 additions & 1 deletion tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
"outDir": "dist"
},
"include": [
"src"
"src",
"test/utils/factory.js",
"test/pubsub.spec.js"
],
"references": [
{
Expand Down

0 comments on commit a0f2b34

Please sign in to comment.