diff --git a/HandWriting/tools/Step/Step.js b/HandWriting/tools/Step/Step.js new file mode 100644 index 0000000..9fd564d --- /dev/null +++ b/HandWriting/tools/Step/Step.js @@ -0,0 +1,116 @@ +function Step(...steps) { + let pending; // 待执行并行任务任务数 + let counter; // 当前已执行并行任务数 + let lock; // 当前串行任务是否在执行 + let err; // 执行并行任务时的错误对象 + let results; // 并行任务结果数组 + + function next(error, ...prevResults) { + // 串行任务执行结果 + let result; + // 每执行一个串行任务,就重置 + counter = pending = 0; + results = []; + + // 检查是否还有剩余的步骤 + if (steps.length === 0) { + if (error) throw new Error(error); + return; + } + + // 获取下一个任务并执行 + const step = steps.shift(); + try { + lock = true; + result = step.apply(next, [error, ...prevResults]); + } catch (e) { + next(e); + } + + // 单个串行任务执行完 + if (result !== undefined) next(undefined, result); + // 确保在当前串行任务执行结束前,并行任务不会调用next + lock = false; + } + + next.parallel = function () { + const idx = counter++; + pending++; + + return function (error, result) { + pending--; + if (error) err = error; + // 保存并行执行结果 + results[idx] = result; + // 当前串行任务的全部并行任务执行完毕 + if (!lock && pending === 0) next(err, ...results); + }; + }; + // 相当于一个小型Step了 + next.group = function () { + const localCallback = next.parallel(); + const result = []; + let counter = 0; + let pending = 0; + let error = undefined; + + function check() { + if (pending === 0) localCallback(error, result); + } + queueMicrotask(check); + + return function () { + let idx = counter++; + pending++; + + return function (...args) { + pending--; + if (args[0]) error = args[0]; + result[idx] = args[1]; + if (!lock) check(); + }; + }; + }; + + next(); +} + +Step.fn = function StepFn(...steps) { + return () => Step(...steps); +}; + +const fs = require("fs"); + +// const task = Step.fn( +// function () { +// return `${__dirname}/test.txt`; +// }, +// function (err, name) { +// fs.readFile(name, "utf8", this); +// }, +// function capitalize(err, text) { +// if (err) throw err; +// return text.toUpperCase(); +// }, +// function showAll(err, result) { +// if (err) throw err; +// console.log(result); +// } +// ); +// task(); + +/* parallel */ +Step( + function loadStuff() { + fs.readFile(`${__dirname}/test.txt`, "utf8", this.parallel()); + fs.readFile(`${__dirname}/test2.txt`, "utf8", this.parallel()); + }, + function capitalize(err, ...args) { + return args.map((txt) => txt.toUpperCase()); + }, + function showStuff(err, [code, txt]) { + if (err) throw err; + // console.log(code); + // console.log(txt); + } +); diff --git a/HandWriting/tools/Step/parallel.js b/HandWriting/tools/Step/parallel.js new file mode 100644 index 0000000..a36d7b1 --- /dev/null +++ b/HandWriting/tools/Step/parallel.js @@ -0,0 +1,55 @@ +function Step(...steps) { + let counter, pending, lock, err, results; + + function next(error, ...prevResults) { + let result; + counter = pending = 0; // 赋初值和重置结果数组 + results = []; // 赋初值和重置结果数组 + + if (steps.length === 0) { + if (error) throw new Error(error); + return; + } + + const task = steps.shift(); + try { + lock = true; + result = task.apply(next, [error, ...prevResults]); + } catch (err) { + next(err); + } + if (result !== undefined) { + // 串行任务执行结束后 + next(undefined, result); + } + lock = false; // 防止当前串行任务还未执行,parallel却已经执行完毕调用下一个next,确保当前串行任务被完全执行 + } + next.parallel = function () { + const idx = counter++; // 更新已执行任务数,并保存结果保存的位置下标 + pending++; // 更新待执行任务数 + + return function (error, result) { + pending--; + if (error) err = error; + results[idx] = result; // 将并行任务执行结果存入结果数组 + if (!lock && pending === 0) { + next(err, ...results); + } + }; + }; + next(); +} + +const fs = require("node:fs"); + +Step( + function loadStuff() { + fs.readFile(`${__dirname}/test.txt`, "utf8", this.parallel()); + fs.readFile(`${__dirname}/test2.txt`, "utf8", this.parallel()); + }, + function showStuff(err, code, txt) { + if (err) throw err; + console.log(code); + console.log(txt); + } +); diff --git a/HandWriting/tools/Step/series.js b/HandWriting/tools/Step/series.js new file mode 100644 index 0000000..d53e54c --- /dev/null +++ b/HandWriting/tools/Step/series.js @@ -0,0 +1,63 @@ +function normalize(opts, restSteps) { + let steps; + if (Array.isArray(opts)) { + steps = opts; + } else if (typeof opts === "function") { + steps = [opts, ...restSteps]; + } + opts = { steps, ...opts }; + return opts; +} + +function Step(opts = {}, ...restSteps) { + const { immediate = false, steps = [] } = normalize(opts, restSteps); + + function next(error, ...prevResults) { + let result; + + // 检查是否还有剩余的步骤 + if (steps.length === 0) { + if (error) throw new Error(error); + return; + } + + // 获取下一个步骤 + const step = steps.shift(); + // 执行下一个步骤 + try { + result = step.apply(next, [error, ...prevResults]); + } catch (e) { + next(e); + } + + if (result !== undefined) { + // 单个普通任务执行完 + next(undefined, result); + } + } + + return immediate ? next() : next; +} + +const fs = require("fs"); + +Step({ + immediate: true, + steps: [ + function () { + return `${__dirname}/test.txt`; + }, + function (err, name) { + if (err) throw err; + fs.readFile(name, "utf8", this); + }, + function capitalize(err, text) { + if (err) throw err; + return text.toUpperCase(); + }, + function showAll(err, result) { + if (err) throw err; + console.log(result); + }, + ], +}); diff --git a/HandWriting/tools/Step/test.txt b/HandWriting/tools/Step/test.txt new file mode 100644 index 0000000..5f4bb68 --- /dev/null +++ b/HandWriting/tools/Step/test.txt @@ -0,0 +1,2 @@ +1 +lowercase diff --git a/HandWriting/tools/Step/test2.txt b/HandWriting/tools/Step/test2.txt new file mode 100644 index 0000000..0cfbf08 --- /dev/null +++ b/HandWriting/tools/Step/test2.txt @@ -0,0 +1 @@ +2