Skip to content

Commit

Permalink
chore: simplify streaming http response code and use instances of pub…
Browse files Browse the repository at this point in the history
…sub tracker instead of a singleton (#3719)

Simplifies deps, pass peer id as string instead of cid.
  • Loading branch information
achingbrain authored Jun 18, 2021
1 parent 1523e33 commit 13c641e
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 83 deletions.
20 changes: 13 additions & 7 deletions src/pubsub/index.js
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
'use strict'

const SubscriptionTracker = require('./subscription-tracker')

/**
* @param {import('../types').Options} config
*/
module.exports = config => ({
ls: require('./ls')(config),
peers: require('./peers')(config),
publish: require('./publish')(config),
subscribe: require('./subscribe')(config),
unsubscribe: require('./unsubscribe')(config)
})
module.exports = config => {
const subscriptionTracker = new SubscriptionTracker()

return {
ls: require('./ls')(config),
peers: require('./peers')(config),
publish: require('./publish')(config),
subscribe: require('./subscribe')(config, subscriptionTracker),
unsubscribe: require('./unsubscribe')(config, subscriptionTracker)
}
}
118 changes: 61 additions & 57 deletions src/pubsub/subscribe.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
const uint8ArrayFromString = require('uint8arrays/from-string')
const uint8ArrayToString = require('uint8arrays/to-string')
const log = require('debug')('ipfs-http-client:pubsub:subscribe')
const SubscriptionTracker = require('./subscription-tracker')
const configure = require('../lib/configure')
const toUrlSearchParams = require('../lib/to-url-search-params')

Expand All @@ -12,70 +11,75 @@ const toUrlSearchParams = require('../lib/to-url-search-params')
* @typedef {import('ipfs-core-types/src/pubsub').Message} Message
* @typedef {(err: Error, fatal: boolean, msg?: Message) => void} ErrorHandlerFn
* @typedef {import('ipfs-core-types/src/pubsub').API<HTTPClientExtraOptions & { onError?: ErrorHandlerFn }>} PubsubAPI
* @typedef {import('../types').Options} Options
*/

module.exports = configure((api, options) => {
const subsTracker = SubscriptionTracker.singleton()

/**
* @type {PubsubAPI["subscribe"]}
*/
async function subscribe (topic, handler, options = {}) { // eslint-disable-line require-await
options.signal = subsTracker.subscribe(topic, handler, options.signal)

/** @type {(value?: any) => void} */
let done
/** @type {(error: Error) => void} */
let fail

const result = new Promise((resolve, reject) => {
done = resolve
fail = reject
})

// In Firefox, the initial call to fetch does not resolve until some data
// is received. If this doesn't happen within 1 second assume success
const ffWorkaround = setTimeout(() => done(), 1000)

// Do this async to not block Firefox
setTimeout(() => {
api.post('pubsub/sub', {
timeout: options.timeout,
signal: options.signal,
searchParams: toUrlSearchParams({
arg: topic,
...options
}),
headers: options.headers
/**
* @param {Options} options
* @param {import('./subscription-tracker')} subsTracker
*/
module.exports = (options, subsTracker) => {
return configure((api) => {
/**
* @type {PubsubAPI["subscribe"]}
*/
async function subscribe (topic, handler, options = {}) { // eslint-disable-line require-await
options.signal = subsTracker.subscribe(topic, handler, options.signal)

/** @type {(value?: any) => void} */
let done
/** @type {(error: Error) => void} */
let fail

const result = new Promise((resolve, reject) => {
done = resolve
fail = reject
})
.catch((err) => {
// Initial subscribe fail, ensure we clean up
subsTracker.unsubscribe(topic, handler)

fail(err)
// In Firefox, the initial call to fetch does not resolve until some data
// is received. If this doesn't happen within 1 second assume success
const ffWorkaround = setTimeout(() => done(), 1000)

// Do this async to not block Firefox
setTimeout(() => {
api.post('pubsub/sub', {
timeout: options.timeout,
signal: options.signal,
searchParams: toUrlSearchParams({
arg: topic,
...options
}),
headers: options.headers
})
.then((response) => {
clearTimeout(ffWorkaround)

if (!response) {
// if there was no response, the subscribe failed
return
}

readMessages(response, {
onMessage: handler,
onEnd: () => subsTracker.unsubscribe(topic, handler),
onError: options.onError
.catch((err) => {
// Initial subscribe fail, ensure we clean up
subsTracker.unsubscribe(topic, handler)

fail(err)
})
.then((response) => {
clearTimeout(ffWorkaround)

done()
})
}, 0)
if (!response) {
// if there was no response, the subscribe failed
return
}

return result
}
return subscribe
})
readMessages(response, {
onMessage: handler,
onEnd: () => subsTracker.unsubscribe(topic, handler),
onError: options.onError
})

done()
})
}, 0)

return result
}
return subscribe
})(options)
}

/**
* @param {import('ipfs-utils/src/types').ExtendedResponse} response
Expand Down
15 changes: 4 additions & 11 deletions src/pubsub/subscription-tracker.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,6 @@ class SubscriptionTracker {
this._subs = new Map()
}

static singleton () {
if (SubscriptionTracker.instance) return SubscriptionTracker.instance
SubscriptionTracker.instance = new SubscriptionTracker()
return SubscriptionTracker.instance
}

/**
* @param {string} topic
* @param {MessageHandlerFn} handler
Expand Down Expand Up @@ -63,13 +57,12 @@ class SubscriptionTracker {
unsubs = subs
}

if (!(this._subs.get(topic) || []).length) {
this._subs.delete(topic)
}

unsubs.forEach(s => s.controller.abort())
}
}

/**
* @type {SubscriptionTracker | null}
*/
SubscriptionTracker.instance = null

module.exports = SubscriptionTracker
10 changes: 4 additions & 6 deletions src/pubsub/unsubscribe.js
Original file line number Diff line number Diff line change
@@ -1,18 +1,16 @@
'use strict'

const SubscriptionTracker = require('./subscription-tracker')

/**
* @typedef {import('../types').HTTPClientExtraOptions} HTTPClientExtraOptions
* @typedef {import('ipfs-core-types/src/pubsub').API<HTTPClientExtraOptions>} PubsubAPI
* @typedef {import('../types').Options} Options
*/

/**
* @param {import('../types').Options} config
* @param {Options} options
* @param {import('./subscription-tracker')} subsTracker
*/
module.exports = config => {
const subsTracker = SubscriptionTracker.singleton()

module.exports = (options, subsTracker) => {
/**
* @type {PubsubAPI["unsubscribe"]}
*/
Expand Down
4 changes: 2 additions & 2 deletions src/refs/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ const toUrlSearchParams = require('../lib/to-url-search-params')
* @typedef {import('ipfs-core-types/src/refs').API<HTTPClientExtraOptions>} RefsAPI
*/

module.exports = configure((api, options) => {
module.exports = configure((api, opts) => {
/**
* @type {RefsAPI["refs"]}
*/
Expand All @@ -34,6 +34,6 @@ module.exports = configure((api, options) => {
}

return Object.assign(refs, {
local: require('./local')(options)
local: require('./local')(opts)
})
})

0 comments on commit 13c641e

Please sign in to comment.