Skip to content

Commit

Permalink
fix: ensure remote file downloads are queued in all cases (#34414)
Browse files Browse the repository at this point in the history
* fix: move queue from remote file node creation to remote file fetching

* reduce number of concurrent requests per CPU core to 50

* rename worker function to mark it as worker

* improve typings

* refactor: remove GATSBY_EXPERIMENTAL_REMOTE_FILE_PLACEHOLDER flag

* fix typing?

Co-authored-by: Ward Peeters <ward@coding-tech.com>
  • Loading branch information
axe312ger and wardpeet authored Jan 28, 2022
1 parent 201c181 commit 6ac1ed6
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 115 deletions.
1 change: 1 addition & 0 deletions packages/gatsby-core-utils/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
"@babel/runtime": "^7.15.4",
"ci-info": "2.0.0",
"configstore": "^5.0.1",
"fastq": "^1.13.0",
"file-type": "^16.5.3",
"fs-extra": "^10.0.0",
"got": "^11.8.3",
Expand Down
59 changes: 58 additions & 1 deletion packages/gatsby-core-utils/src/fetch-remote-file.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import {
} from "./filename-utils"
import type { IncomingMessage } from "http"
import type { GatsbyCache } from "gatsby"
import Queue from "fastq"
import type { queue, done } from "fastq"

export interface IFetchRemoteFileOptions {
url: string
Expand Down Expand Up @@ -72,9 +74,64 @@ const ERROR_CODES_TO_RETRY = [
`ERR_GOT_REQUEST_ERROR`,
]

/********************
* Queue Management *
********************/

const GATSBY_CONCURRENT_DOWNLOAD = process.env.GATSBY_CONCURRENT_DOWNLOAD
? parseInt(process.env.GATSBY_CONCURRENT_DOWNLOAD, 10) || 0
: 50

const q: queue<IFetchRemoteFileOptions, string> = Queue(
fetchWorker,
GATSBY_CONCURRENT_DOWNLOAD
)

/**
* fetchWorker
* --
* Handle fetch requests that are pushed in to the Queue
*/
async function fetchWorker(
task: IFetchRemoteFileOptions,
cb: done<string>
): Promise<void> {
try {
const node = await fetchFile(task)
return void cb(null, node)
} catch (e) {
return void cb(e)
}
}

/**
* pushTask
* --
* pushes a task in to the Queue and the processing cache
*
* Promisfy a task in queue
* @param {CreateRemoteFileNodePayload} task
* @return {Promise<Object>}
*/
async function pushTask(task: IFetchRemoteFileOptions): Promise<string> {
return new Promise((resolve, reject) => {
q.push(task, (err, node) => {
if (!err) {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
resolve(node!)
} else {
reject(err)
}
})
})
}
let fetchCache = new Map()
let latestBuildId = ``

/***************************
* Fetch remote file logic *
***************************/

export async function fetchRemoteFile(
args: IFetchRemoteFileOptions
): Promise<string> {
Expand All @@ -91,7 +148,7 @@ export async function fetchRemoteFile(
}

// Create file fetch promise and store it into cache
const fetchPromise = fetchFile(args)
const fetchPromise = pushTask(args)
fetchCache.set(args.url, fetchPromise)

return fetchPromise.catch(err => {
Expand Down
1 change: 0 additions & 1 deletion packages/gatsby-source-filesystem/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
"dependencies": {
"@babel/runtime": "^7.15.4",
"chokidar": "^3.5.2",
"fastq": "^1.13.0",
"file-type": "^16.5.3",
"fs-extra": "^10.0.0",
"gatsby-core-utils": "^3.7.0-next.0",
Expand Down
126 changes: 13 additions & 113 deletions packages/gatsby-source-filesystem/src/create-remote-file-node.js
Original file line number Diff line number Diff line change
@@ -1,16 +1,6 @@
const fs = require(`fs-extra`)
const {
createContentDigest,
fetchRemoteFile,
createFilePath,
} = require(`gatsby-core-utils`)
const path = require(`path`)
const { fetchRemoteFile } = require(`gatsby-core-utils`)
const { isWebUri } = require(`valid-url`)
const Queue = require(`fastq`)
const { createFileNode } = require(`./create-file-node`)
const { getRemoteFileExtension } = require(`./utils`)

let showFlagWarning = !!process.env.GATSBY_EXPERIMENTAL_REMOTE_FILE_PLACEHOLDER

/********************
* Type Definitions *
Expand Down Expand Up @@ -46,41 +36,6 @@ let showFlagWarning = !!process.env.GATSBY_EXPERIMENTAL_REMOTE_FILE_PLACEHOLDER
* @param {Reporter} [options.reporter]
*/

/********************
* Queue Management *
********************/

const GATSBY_CONCURRENT_DOWNLOAD = process.env.GATSBY_CONCURRENT_DOWNLOAD
? parseInt(process.env.GATSBY_CONCURRENT_DOWNLOAD, 10) || 0
: 200

const queue = Queue(pushToQueue, GATSBY_CONCURRENT_DOWNLOAD)

/**
* @callback {Queue~queueCallback}
* @param {*} error
* @param {*} result
*/

/**
* pushToQueue
* --
* Handle tasks that are pushed in to the Queue
*
*
* @param {CreateRemoteFileNodePayload} task
* @param {Queue~queueCallback} cb
* @return {Promise<null>}
*/
async function pushToQueue(task, cb) {
try {
const node = await processRemoteNode(task)
return cb(null, node)
} catch (e) {
return cb(e)
}
}

/******************
* Core Functions *
******************/
Expand All @@ -104,25 +59,14 @@ async function processRemoteNode({
ext,
name,
}) {
let filename
if (process.env.GATSBY_EXPERIMENTAL_REMOTE_FILE_PLACEHOLDER) {
filename = await fetchPlaceholder({
fromPath: process.env.GATSBY_EXPERIMENTAL_REMOTE_FILE_PLACEHOLDER,
url,
cache,
ext,
name,
})
} else {
filename = await fetchRemoteFile({
url,
cache,
auth,
httpHeaders,
ext,
name,
})
}
const filename = await fetchRemoteFile({
url,
cache,
auth,
httpHeaders,
ext,
name,
})

// Create the file node.
const fileNode = await createFileNode(filename, createNodeId, {})
Expand All @@ -138,42 +82,10 @@ async function processRemoteNode({
return fileNode
}

async function fetchPlaceholder({ fromPath, url, cache, ext, name }) {
const pluginCacheDir = cache.directory
const digest = createContentDigest(url)

if (!ext) {
ext = getRemoteFileExtension(url)
}

const filename = createFilePath(path.join(pluginCacheDir, digest), name, ext)
fs.copySync(fromPath, filename)
return filename
}

/**
* Index of promises resolving to File node from remote url
*/
const processingCache = {}
/**
* pushTask
* --
* pushes a task in to the Queue and the processing cache
*
* Promisfy a task in queue
* @param {CreateRemoteFileNodePayload} task
* @return {Promise<Object>}
*/
const pushTask = task =>
new Promise((resolve, reject) => {
queue.push(task, (err, node) => {
if (!err) {
resolve(node)
} else {
reject(`failed to process ${task.url}\n${err}`)
}
})
})

/***************
* Entry Point *
Expand Down Expand Up @@ -202,20 +114,6 @@ module.exports = function createRemoteFileNode({
ext = null,
name = null,
}) {
if (showFlagWarning) {
showFlagWarning = false
// Note: This will use a placeholder image as the default for every file that is downloaded through this API.
// That may break certain cases, in particular when the file is not meant to be an image or when the image
// is expected to be of a particular type that is other than the placeholder. This API is meant to bypass
// the remote download for local testing only.
console.info(
`GATSBY_EXPERIMENTAL_REMOTE_FILE_PLACEHOLDER: Any file downloaded by \`createRemoteFileNode\` will use the same placeholder image and skip the remote fetch. Note: This is an experimental flag that can change/disappear at any point.`
)
console.info(
`GATSBY_EXPERIMENTAL_REMOTE_FILE_PLACEHOLDER: File to use: \`${process.env.GATSBY_EXPERIMENTAL_REMOTE_FILE_PLACEHOLDER}\``
)
}

// validation of the input
// without this it's notoriously easy to pass in the wrong `createNodeId`
// see gatsbyjs/gatsby#6643
Expand Down Expand Up @@ -245,11 +143,13 @@ module.exports = function createRemoteFileNode({

if (!url || isWebUri(url) === undefined) {
return Promise.reject(
`url passed to createRemoteFileNode is either missing or not a proper web uri: ${url}`
new Error(
`url passed to createRemoteFileNode is either missing or not a proper web uri: ${url}`
)
)
}

const fileDownloadPromise = pushTask({
const fileDownloadPromise = processRemoteNode({
url,
cache,
createNode,
Expand Down

0 comments on commit 6ac1ed6

Please sign in to comment.