diff --git a/.aegir.js b/.aegir.js new file mode 100644 index 0000000..5e1ac80 --- /dev/null +++ b/.aegir.js @@ -0,0 +1,59 @@ +'use strict' + +const { promisify } = require('util') +const http = require('http') +const url = require('url') +const querystring = require('querystring') + +const echoServer = async (port = 3000) => { + const server = http.createServer() + + server.on('request', (request, response) => { + try { + + const uri = url.parse(request.url) + const qs = uri.query ? querystring.parse(uri.query) : {} + const status = qs.status || 200 + const contentType = qs.contentType || 'text/plain' + + const headers = { + 'Access-Control-Allow-Origin': '*' + } + + if (qs.body) { + headers['Content-Type'] = contentType + headers['Content-Length'] = qs.body.length + } + + response.writeHead(status, headers) + + if (qs.body) { + response.end(qs.body) + } else { + request.pipe(response) + } + + } catch (err) { + console.error(err) + } + }) + + await promisify(server.listen.bind(server))(port) + + return { + stop: promisify(server.close.bind(server)) + } +} + +let echo + +module.exports = { + hooks: { + pre: async () => { + echo = await echoServer() + }, + post: async () => { + await echo.stop() + } + } +} diff --git a/README.md b/README.md index 4903b16..a0b7fc2 100644 --- a/README.md +++ b/README.md @@ -1,10 +1,4 @@ -# 🔒 Archived - -The contents of this repo have been merged into [ipfs/js-ipfs](https://github.com/ipfs/js-ipfs). - -Please open [issues](https://github.com/ipfs/js-ipfs/issues) or submit [PRs](https://github.com/ipfs/js-ipfs/pulls) there. - -# js-ipfs-utils +# js-ipfs-utils [![](https://img.shields.io/badge/made%20by-Protocol%20Labs-blue.svg?style=flat-square)](http://protocol.ai) [![](https://img.shields.io/badge/project-IPFS-blue.svg?style=flat-square)](http://ipfs.io/) @@ -29,10 +23,24 @@ The IPFS ecosystem has lots of repos with it comes several problems like: These problems are the motivation for this package, having shared logic in this package avoids creating cyclic dependencies, centralizes common use modules/functions (exactly like aegir does for the tooling), semantic versioning for 3rd party dependencies is handled in one single place (a good example is going from streams 2 to 3) and maintainers should only care about having `ipfs-utils` updated. -## Lead Maintainer +## Lead Maintainer [Hugo Dias](https://github.com/hugomrdias) +## Table of Contents + +- [Install](#install) +- [Usage](#usage) +- [Functions](#functions) + - [General Use](#general-use) + - [TODO](#todo) + - [Data Struct Wrangling](#data-struct-wrangling) + - [TODO](#todo-1) + - [Core API](#core-api) + - [TODO](#todo-2) +- [Contribute](#contribute) +- [License](#license) + ## Install @@ -64,7 +72,7 @@ validateAddInput(Buffer.from('test')) Contributions welcome. Please check out [the issues](https://github.com/ipfs/js-ipfs-utils/issues). -Check out our [contributing document](https://github.com/ipfs/community/blob/master/contributing.md) for more information on how we work, and about contributing in general. Please be aware that all interactions related to this repo are subject to the IPFS [Code of Conduct](https://github.com/ipfs/community/blob/master/code-of-conduct.md). +Check out our [contributing document](https://github.com/ipfs/community/blob/master/CONTRIBUTING_JS.md) for more information on how we work, and about contributing in general. Please be aware that all interactions related to this repo are subject to the IPFS [Code of Conduct](https://github.com/ipfs/community/blob/master/code-of-conduct.md). ## License diff --git a/package.json b/package.json index 11b6d07..1937ed0 100644 --- a/package.json +++ b/package.json @@ -26,21 +26,26 @@ }, "license": "MIT", "dependencies": { - "buffer": "^5.2.1", + "abort-controller": "^3.0.0", + "buffer": "^5.4.2", "err-code": "^2.0.0", "fs-extra": "^8.1.0", "is-electron": "^2.2.0", + "iso-url": "^0.4.7", "it-glob": "0.0.7", - "ky": "^0.15.0", - "ky-universal": "^0.3.0", + "merge-options": "^2.0.0", + "node-fetch": "^2.6.0", "stream-to-it": "^0.2.0" }, "devDependencies": { - "aegir": "^20.4.1", + "aegir": "21.3.0", "chai": "^4.2.0", "chai-as-promised": "^7.1.1", + "delay": "^4.3.0", "dirty-chai": "^2.0.1", - "it-all": "^1.0.1" + "it-all": "^1.0.1", + "it-drain": "^1.0.0", + "it-to-stream": "^0.1.1" }, "contributors": [ "Alan Shaw ", @@ -49,6 +54,7 @@ "Marcin Rataj " ], "browser": { - "fs-extra": false + "fs-extra": false, + "./src/text-encoder.js": "./src/text-encoder.browser.js" } } diff --git a/src/files/normalise-input.js b/src/files/normalise-input.js index fb89269..a0c1249 100644 --- a/src/files/normalise-input.js +++ b/src/files/normalise-input.js @@ -223,7 +223,7 @@ function toAsyncIterable (input) { })() } - throw errCode(new Error(`Unexpected input: ${input}`, 'ERR_UNEXPECTED_INPUT')) + throw errCode(new Error(`Unexpected input: ${input}`), 'ERR_UNEXPECTED_INPUT') } function toBuffer (chunk) { diff --git a/src/files/url-source.js b/src/files/url-source.js index a7e6322..f7df3f7 100644 --- a/src/files/url-source.js +++ b/src/files/url-source.js @@ -1,15 +1,13 @@ 'use strict' -const { default: ky } = require('ky-universal') -const toIterable = require('stream-to-it/source') +const Http = require('../http') module.exports = async function * urlSource (url, options) { options = options || {} - - const { body } = await ky.get(url) + const http = new Http() yield { path: decodeURIComponent(new URL(url).pathname.split('/').pop() || ''), - content: toIterable(body) + content: await http.iterator(url, { method: 'get' }) } } diff --git a/src/http.js b/src/http.js new file mode 100644 index 0000000..25720da --- /dev/null +++ b/src/http.js @@ -0,0 +1,332 @@ +/* eslint-disable no-undef */ +'use strict' + +const fetch = require('node-fetch') +const merge = require('merge-options') +const { URL, URLSearchParams } = require('iso-url') +const global = require('./globalthis') +const TextDecoder = require('./text-encoder') +const Request = global.Request +const AbortController = require('abort-controller') + +class TimeoutError extends Error { + constructor () { + super('Request timed out') + this.name = 'TimeoutError' + } +} + +class HTTPError extends Error { + constructor (response) { + super(response.statusText) + this.name = 'HTTPError' + this.response = response + } +} + +const timeout = (promise, ms, abortController) => { + if (ms === undefined) { + return promise + } + + return new Promise((resolve, reject) => { + const timeoutID = setTimeout(() => { + reject(new TimeoutError()) + + abortController.abort() + }, ms) + + promise + .then((result) => { + clearTimeout(timeoutID) + + resolve(result) + }, (err) => { + clearTimeout(timeoutID) + + reject(err) + }) + }) +} + +const defaults = { + throwHttpErrors: true, + credentials: 'same-origin', + transformSearchParams: p => p +} + +/** + * @typedef {Object} APIOptions - creates a new type named 'SpecialType' + * @prop {any} [body] - Request body + * @prop {string} [method] - GET, POST, PUT, DELETE, etc. + * @prop {string} [base] - The base URL to use in case url is a relative URL + * @prop {Headers|Record} [headers] - Request header. + * @prop {number} [timeout] - Amount of time until request should timeout in ms. + * @prop {AbortSignal} [signal] - Signal to abort the request. + * @prop {URLSearchParams|Object} [searchParams] - URL search param. + * @prop {string} [credentials] + * @prop {boolean} [throwHttpErrors] + * @prop {function(URLSearchParams): URLSearchParams } [transformSearchParams] + * @prop {function(any): any} [transform] - When iterating the response body, transform each chunk with this function. + * @prop {function(Response): Promise} [handleError] - Handle errors + */ + +class HTTP { + /** + * + * @param {APIOptions} options + */ + constructor (options = {}) { + /** @type {APIOptions} */ + this.opts = merge(defaults, options) + + // connect internal abort to external + this.abortController = new AbortController() + + if (this.opts.signal) { + this.opts.signal.addEventListener('abort', () => { + this.abortController.abort() + }) + } + + this.opts.signal = this.abortController.signal + } + + /** + * Fetch + * + * @param {string | URL | Request} resource + * @param {APIOptions} options + * @returns {Promise} + */ + async fetch (resource, options = {}) { + /** @type {APIOptions} */ + const opts = merge(this.opts, options) + + // validate resource type + if (typeof resource !== 'string' && !(resource instanceof URL || resource instanceof Request)) { + throw new TypeError('`resource` must be a string, URL, or Request') + } + + // validate resource format and normalize with prefixUrl + if (opts.base && typeof opts.base === 'string' && typeof resource === 'string') { + if (resource.startsWith('/')) { + throw new Error('`resource` must not begin with a slash when using `base`') + } + + if (!opts.base.endsWith('/')) { + opts.base += '/' + } + + resource = opts.base + resource + } + + // TODO: try to remove the logic above or fix URL instance input without trailing '/' + const url = new URL(resource, opts.base) + + if (opts.searchParams) { + url.search = opts.transformSearchParams(new URLSearchParams(opts.searchParams)) + } + + const response = await timeout(fetch(url, opts), opts.timeout, this.abortController) + + if (!response.ok && opts.throwHttpErrors) { + if (opts.handleError) { + await opts.handleError(response) + } + throw new HTTPError(response) + } + + return response + } + + /** + * @param {string | URL | Request} resource + * @param {APIOptions} options + * @returns {Promise} + */ + post (resource, options = {}) { + return this.fetch(resource, merge(this.opts, options, { method: 'POST' })) + } + + /** + * @param {string | URL | Request} resource + * @param {APIOptions} options + * @returns {Promise} + */ + get (resource, options = {}) { + return this.fetch(resource, merge(this.opts, options, { method: 'GET' })) + } + + /** + * @param {string | URL | Request} resource + * @param {APIOptions} options + * @returns {Promise} + */ + put (resource, options = {}) { + return this.fetch(resource, merge(this.opts, options, { method: 'PUT' })) + } + + /** + * @param {string | URL | Request} resource + * @param {APIOptions} options + * @returns {Promise} + */ + delete (resource, options = {}) { + return this.fetch(resource, merge(this.opts, options, { method: 'DELETE' })) + } + + /** + * @param {string | URL | Request} resource + * @param {APIOptions} options + * @returns {Promise} + */ + options (resource, options = {}) { + return this.fetch(resource, merge(this.opts, options, { method: 'OPTIONS' })) + } + + /** + * @param {string | URL | Request} resource + * @param {APIOptions} options + * @returns {Promise>} + */ + async stream (resource, options = {}) { + const res = await this.fetch(resource, merge(this.opts, options)) + + return res.body + } + + /** + * @param {string | URL | Request} resource + * @param {APIOptions} options + * @returns {AsyncGenerator} + */ + async * iterator (resource, options = {}) { + const res = await this.fetch(resource, merge(this.opts, options)) + const it = streamToAsyncIterator(res.body) + + if (!isAsyncIterator(it)) { + throw new Error('Can\'t convert fetch body into a Async Iterator:') + } + + for await (const chunk of it) { + yield chunk + } + } + + /** + * @param {string | URL | Request} resource + * @param {APIOptions} options + * @returns {AsyncGenerator} + */ + ndjson (resource, options = {}) { + const source = ndjson(this.iterator(resource, merge(this.opts, options))) + if (options.transform) { + return (async function * () { + for await (const chunk of source) { + yield options.transform(chunk) + } + })() + } + return source + } +} + +/** + * Parses NDJSON chunks from an iterator + * + * @param {AsyncGenerator} source + * @returns {AsyncGenerator} + */ +const ndjson = async function * (source) { + const decoder = new TextDecoder() + let buf = '' + + for await (const chunk of source) { + buf += decoder.decode(chunk, { stream: true }) + const lines = buf.split(/\r?\n/) + + for (let i = 0; i < lines.length - 1; i++) { + const l = lines[i].trim() + if (l.length > 0) { + yield JSON.parse(l) + } + } + buf = lines[lines.length - 1] + } + buf += decoder.decode() + buf = buf.trim() + if (buf.length !== 0) { + yield JSON.parse(buf) + } +} + +const streamToAsyncIterator = function (source) { + if (isAsyncIterator(source)) { + return source + } + + const reader = source.getReader() + + return { + next () { + return reader.read() + }, + return () { + reader.releaseLock() + return {} + }, + [Symbol.asyncIterator] () { + return this + } + } +} + +const isAsyncIterator = (obj) => { + return typeof obj === 'object' && + obj !== null && + // typeof obj.next === 'function' && + typeof obj[Symbol.asyncIterator] === 'function' +} + +HTTP.HTTPError = HTTPError +HTTP.TimeoutError = TimeoutError +HTTP.ndjson = ndjson +HTTP.streamToAsyncIterator = streamToAsyncIterator + +/** + * @param {string | URL | Request} resource + * @param {APIOptions} options + * @returns {Promise} + */ +HTTP.post = (resource, options) => new HTTP(options).post(resource, options) + +/** + * @param {string | URL | Request} resource + * @param {APIOptions} options + * @returns {Promise} + */ +HTTP.get = (resource, options) => new HTTP(options).get(resource, options) + +/** + * @param {string | URL | Request} resource + * @param {APIOptions} options + * @returns {Promise} + */ +HTTP.put = (resource, options) => new HTTP(options).put(resource, options) + +/** + * @param {string | URL | Request} resource + * @param {APIOptions} options + * @returns {Promise} + */ +HTTP.delete = (resource, options) => new HTTP(options).delete(resource, options) + +/** + * @param {string | URL | Request} resource + * @param {APIOptions} options + * @returns {Promise} + */ +HTTP.options = (resource, options) => new HTTP(options).options(resource, options) + +module.exports = HTTP diff --git a/src/text-encoder.browser.js b/src/text-encoder.browser.js new file mode 100644 index 0000000..749a308 --- /dev/null +++ b/src/text-encoder.browser.js @@ -0,0 +1,5 @@ +'use strict' + +const global = require('./globalthis') + +module.exports = global.TextDecoder diff --git a/src/text-encoder.js b/src/text-encoder.js new file mode 100644 index 0000000..44fc233 --- /dev/null +++ b/src/text-encoder.js @@ -0,0 +1,2 @@ +'use strict' +module.exports = require('util').TextDecoder diff --git a/test/files/format-mtime.spec.js b/test/files/format-mtime.spec.js index 80c44ad..c3e9279 100644 --- a/test/files/format-mtime.spec.js +++ b/test/files/format-mtime.spec.js @@ -10,7 +10,7 @@ const expect = chai.expect describe('format-mtime', function () { it('formats mtime', function () { - expect(formatMtime({ secs: 100, nsecs: 0 })).to.include('Jan 1, 1970') + expect(formatMtime({ secs: 15768000, nsecs: 0 })).to.include('1970') }) it('formats empty mtime', function () { diff --git a/test/http.spec.js b/test/http.spec.js new file mode 100644 index 0000000..73c879b --- /dev/null +++ b/test/http.spec.js @@ -0,0 +1,75 @@ +'use strict' + +/* eslint-env mocha */ +const { expect } = require('./utils/chai') +const HTTP = require('../src/http') +const toStream = require('it-to-stream') +const delay = require('delay') +const AbortController = require('abort-controller') +const drain = require('it-drain') +const { isBrowser, isWebWorker } = require('../src/env') + +describe('http', function () { + it('makes a GET request', async function () { + const res = HTTP.get('http://localhost:3000') + + await expect(res).to.eventually.be.fulfilled() + }) + + it('allow async aborting', async function () { + const controller = new AbortController() + + const res = HTTP.get('http://localhost:3000', { + signal: controller.signal + }) + controller.abort() + + await expect(res).to.eventually.be.rejectedWith(/aborted/) + }) + + it.skip('should handle errors in streaming bodies', async function () { + if (isBrowser || isWebWorker) { + // streaming bodies not supported by browsers + return this.skip() + } + + const err = new Error('Should be caught') + const body = (async function * () { + yield Buffer.from('{}\n') + + await delay(100) + + throw err + }()) + + const res = await HTTP.post('http://localhost:3000', { + body: toStream.readable(body) + }) + + await expect(drain(HTTP.ndjson(res.body))).to.eventually.be.rejectedWith(/aborted/) + }) + + it.skip('should handle errors in streaming bodies when a signal is passed', async function () { + if (isBrowser || isWebWorker) { + // streaming bodies not supported by browsers + return this.skip() + } + + const controller = new AbortController() + const err = new Error('Should be caught') + const body = (async function * () { + yield Buffer.from('{}\n') + + await delay(100) + + throw err + }()) + + const res = await HTTP.post('http://localhost:3000', { + body: toStream.readable(body), + signal: controller.signal + }) + + await expect(drain(HTTP.ndjson(res.body))).to.eventually.be.rejectedWith(/aborted/) + }) +}) diff --git a/test/utils/chai.js b/test/utils/chai.js new file mode 100644 index 0000000..2b87a01 --- /dev/null +++ b/test/utils/chai.js @@ -0,0 +1,10 @@ +'use strict' + +const chai = require('chai') + +chai.use(require('dirty-chai')) +chai.use(require('chai-as-promised')) + +module.exports = { + expect: chai.expect +}