From 13c641e0d34dcceee1d498e86e7c91848156bdd4 Mon Sep 17 00:00:00 2001 From: Alex Potsides Date: Fri, 18 Jun 2021 09:28:25 +0100 Subject: [PATCH] chore: simplify streaming http response code and use instances of pubsub tracker instead of a singleton (#3719) Simplifies deps, pass peer id as string instead of cid. --- src/pubsub/index.js | 20 +++-- src/pubsub/subscribe.js | 118 +++++++++++++++-------------- src/pubsub/subscription-tracker.js | 15 +--- src/pubsub/unsubscribe.js | 10 +-- src/refs/index.js | 4 +- 5 files changed, 84 insertions(+), 83 deletions(-) diff --git a/src/pubsub/index.js b/src/pubsub/index.js index 4898e6687..4693ebd8a 100644 --- a/src/pubsub/index.js +++ b/src/pubsub/index.js @@ -1,12 +1,18 @@ 'use strict' +const SubscriptionTracker = require('./subscription-tracker') + /** * @param {import('../types').Options} config */ -module.exports = config => ({ - ls: require('./ls')(config), - peers: require('./peers')(config), - publish: require('./publish')(config), - subscribe: require('./subscribe')(config), - unsubscribe: require('./unsubscribe')(config) -}) +module.exports = config => { + const subscriptionTracker = new SubscriptionTracker() + + return { + ls: require('./ls')(config), + peers: require('./peers')(config), + publish: require('./publish')(config), + subscribe: require('./subscribe')(config, subscriptionTracker), + unsubscribe: require('./unsubscribe')(config, subscriptionTracker) + } +} diff --git a/src/pubsub/subscribe.js b/src/pubsub/subscribe.js index 134811745..db99bf128 100644 --- a/src/pubsub/subscribe.js +++ b/src/pubsub/subscribe.js @@ -3,7 +3,6 @@ const uint8ArrayFromString = require('uint8arrays/from-string') const uint8ArrayToString = require('uint8arrays/to-string') const log = require('debug')('ipfs-http-client:pubsub:subscribe') -const SubscriptionTracker = require('./subscription-tracker') const configure = require('../lib/configure') const toUrlSearchParams = require('../lib/to-url-search-params') @@ -12,70 +11,75 @@ const toUrlSearchParams = require('../lib/to-url-search-params') * @typedef {import('ipfs-core-types/src/pubsub').Message} Message * @typedef {(err: Error, fatal: boolean, msg?: Message) => void} ErrorHandlerFn * @typedef {import('ipfs-core-types/src/pubsub').API} PubsubAPI + * @typedef {import('../types').Options} Options */ -module.exports = configure((api, options) => { - const subsTracker = SubscriptionTracker.singleton() - - /** - * @type {PubsubAPI["subscribe"]} - */ - async function subscribe (topic, handler, options = {}) { // eslint-disable-line require-await - options.signal = subsTracker.subscribe(topic, handler, options.signal) - - /** @type {(value?: any) => void} */ - let done - /** @type {(error: Error) => void} */ - let fail - - const result = new Promise((resolve, reject) => { - done = resolve - fail = reject - }) - - // In Firefox, the initial call to fetch does not resolve until some data - // is received. If this doesn't happen within 1 second assume success - const ffWorkaround = setTimeout(() => done(), 1000) - - // Do this async to not block Firefox - setTimeout(() => { - api.post('pubsub/sub', { - timeout: options.timeout, - signal: options.signal, - searchParams: toUrlSearchParams({ - arg: topic, - ...options - }), - headers: options.headers +/** + * @param {Options} options + * @param {import('./subscription-tracker')} subsTracker + */ +module.exports = (options, subsTracker) => { + return configure((api) => { + /** + * @type {PubsubAPI["subscribe"]} + */ + async function subscribe (topic, handler, options = {}) { // eslint-disable-line require-await + options.signal = subsTracker.subscribe(topic, handler, options.signal) + + /** @type {(value?: any) => void} */ + let done + /** @type {(error: Error) => void} */ + let fail + + const result = new Promise((resolve, reject) => { + done = resolve + fail = reject }) - .catch((err) => { - // Initial subscribe fail, ensure we clean up - subsTracker.unsubscribe(topic, handler) - fail(err) + // In Firefox, the initial call to fetch does not resolve until some data + // is received. If this doesn't happen within 1 second assume success + const ffWorkaround = setTimeout(() => done(), 1000) + + // Do this async to not block Firefox + setTimeout(() => { + api.post('pubsub/sub', { + timeout: options.timeout, + signal: options.signal, + searchParams: toUrlSearchParams({ + arg: topic, + ...options + }), + headers: options.headers }) - .then((response) => { - clearTimeout(ffWorkaround) - - if (!response) { - // if there was no response, the subscribe failed - return - } - - readMessages(response, { - onMessage: handler, - onEnd: () => subsTracker.unsubscribe(topic, handler), - onError: options.onError + .catch((err) => { + // Initial subscribe fail, ensure we clean up + subsTracker.unsubscribe(topic, handler) + + fail(err) }) + .then((response) => { + clearTimeout(ffWorkaround) - done() - }) - }, 0) + if (!response) { + // if there was no response, the subscribe failed + return + } - return result - } - return subscribe -}) + readMessages(response, { + onMessage: handler, + onEnd: () => subsTracker.unsubscribe(topic, handler), + onError: options.onError + }) + + done() + }) + }, 0) + + return result + } + return subscribe + })(options) +} /** * @param {import('ipfs-utils/src/types').ExtendedResponse} response diff --git a/src/pubsub/subscription-tracker.js b/src/pubsub/subscription-tracker.js index a257b57a8..faf15ad6c 100644 --- a/src/pubsub/subscription-tracker.js +++ b/src/pubsub/subscription-tracker.js @@ -16,12 +16,6 @@ class SubscriptionTracker { this._subs = new Map() } - static singleton () { - if (SubscriptionTracker.instance) return SubscriptionTracker.instance - SubscriptionTracker.instance = new SubscriptionTracker() - return SubscriptionTracker.instance - } - /** * @param {string} topic * @param {MessageHandlerFn} handler @@ -63,13 +57,12 @@ class SubscriptionTracker { unsubs = subs } + if (!(this._subs.get(topic) || []).length) { + this._subs.delete(topic) + } + unsubs.forEach(s => s.controller.abort()) } } -/** - * @type {SubscriptionTracker | null} - */ -SubscriptionTracker.instance = null - module.exports = SubscriptionTracker diff --git a/src/pubsub/unsubscribe.js b/src/pubsub/unsubscribe.js index 498988372..79c40eb3b 100644 --- a/src/pubsub/unsubscribe.js +++ b/src/pubsub/unsubscribe.js @@ -1,18 +1,16 @@ 'use strict' -const SubscriptionTracker = require('./subscription-tracker') - /** * @typedef {import('../types').HTTPClientExtraOptions} HTTPClientExtraOptions * @typedef {import('ipfs-core-types/src/pubsub').API} PubsubAPI + * @typedef {import('../types').Options} Options */ /** - * @param {import('../types').Options} config + * @param {Options} options + * @param {import('./subscription-tracker')} subsTracker */ -module.exports = config => { - const subsTracker = SubscriptionTracker.singleton() - +module.exports = (options, subsTracker) => { /** * @type {PubsubAPI["unsubscribe"]} */ diff --git a/src/refs/index.js b/src/refs/index.js index 2ecbc1c2f..cdab0085b 100644 --- a/src/refs/index.js +++ b/src/refs/index.js @@ -10,7 +10,7 @@ const toUrlSearchParams = require('../lib/to-url-search-params') * @typedef {import('ipfs-core-types/src/refs').API} RefsAPI */ -module.exports = configure((api, options) => { +module.exports = configure((api, opts) => { /** * @type {RefsAPI["refs"]} */ @@ -34,6 +34,6 @@ module.exports = configure((api, options) => { } return Object.assign(refs, { - local: require('./local')(options) + local: require('./local')(opts) }) })