Skip to content

Commit

Permalink
feat: implemented execution record query functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
towersxu committed Jul 11, 2023
1 parent c2a7044 commit d73aa46
Show file tree
Hide file tree
Showing 11 changed files with 281 additions and 77 deletions.
44 changes: 22 additions & 22 deletions packages/engine/__test__/index.test.js
Original file line number Diff line number Diff line change
@@ -1,27 +1,27 @@
import Engine from '../src/index';

describe('流程引擎', () => {
// test('初始化流程引擎', () => {
// const engine = new Engine();
// expect(engine).toBeInstanceOf(Engine);
// });
// test('加载图数据', async () => {
// const engine = new Engine();
// const flowData = {
// graphData: {
// nodes: [
// {
// id: 'node1',
// type: 'StartNode',
// }
// ]
// },
// global: {},
// }
// const flowModel = engine.load(flowData);
// expect(flowModel.tasks.length).toBe(flowData.graphData.nodes.length);
// });
test('执行流程', async () => {
test('初始化流程引擎', () => {
const engine = new Engine();
expect(engine).toBeInstanceOf(Engine);
});
test('加载图数据', async () => {
const engine = new Engine();
const flowData = {
graphData: {
nodes: [
{
id: 'node1',
type: 'StartNode',
}
]
},
global: {},
}
const flowModel = engine.load(flowData);
expect(flowModel.nodeConfigMap.size).toBe(flowData.graphData.nodes.length);
});
test('执行流程完成, 返回数据包含executionId', async () => {
const engine = new Engine();
const flowData = {
graphData: {
Expand All @@ -48,5 +48,5 @@ describe('流程引擎', () => {
engine.load(flowData);
const result = await engine.execute();
expect(result).toHaveProperty('executionId');
})
});
});
56 changes: 56 additions & 0 deletions packages/engine/__test__/recorder.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import Engine from '../src/index';

describe('流程引擎执行记录器', () => {
test('获取流程执行记录', async () => {
const engine = new Engine();
const flowData = {
graphData: {
nodes: [
{
id: 'node1',
type: 'StartNode',
properties: {}
},
{
id: 'node2',
type: 'TaskNode',
properties: {}
}
],
edges: [
{
id: 'edge1',
sourceNodeId: 'node1',
targetNodeId: 'node2',
}
]
},
global: {},
}
engine.load(flowData);
const result = await engine.execute();
const executionId = result.executionId;
/**
* [
* {
* taskId: '',
* nodeId: '',
* instanceId: '',
* nodeType: '',
* timestamp: '',
* properties: {},
* }
* ]
*/
const execution = await engine.getExecutionRecord(executionId);
expect(execution.length).toBe(2);
expect(execution[1]).toHaveProperty('taskId');
expect(execution[1]).toHaveProperty('nodeId');
expect(execution[1]).toHaveProperty('executionId');
expect(execution[1]).toHaveProperty('nodeType');
expect(execution[1]).toHaveProperty('timestamp');
expect(execution[1]).toHaveProperty('properties');
expect(execution[1].nodeId).toBe('node2');
expect(execution[1].nodeType).toBe('TaskNode');
});
});
58 changes: 23 additions & 35 deletions packages/engine/src/FlowModel.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,7 @@
// import type { GraphConfigData } from '@logicflow/core';
import type BaseNode from './nodes/BaseNode';
import type { NodeConfig, NodeConstructor } from './nodes/BaseNode';
import {
ErrorCode,
getErrorMsg,
getWarningMsg,
WarningCode,
} from './constant/LogCode';
import type {
NodeConfig,
NodeConstructor,
} from './nodes/BaseNode';
import {
EVENT_INSTANCE_COMPLETE,
} from './constant/constant';
Expand Down Expand Up @@ -59,7 +54,7 @@ export default class FlowModel {
/**
* 当前流程模型中的所有节点,边会被转换成节点的incoming和outgoing属性。
*/
nodeMap: Map<string, NodeConfig>;
nodeConfigMap: Map<string, NodeConfig>;
/**
* 当流程正在执行时,如果再次触发执行。那么会将执行参数放入到队列中,等待上一次执行完成后再执行。
*/
Expand All @@ -72,18 +67,25 @@ export default class FlowModel {
* 当前流程中开始节点组成的数组。
*/
startNodes: NodeConfig[] = [];
constructor(nodeModelMap: Map<string, NodeConstructor>) {
constructor({
nodeModelMap,
recorder,
}: {
nodeModelMap: Map<string, NodeConstructor>;
recorder?: any;
}) {
// 流程包含的节点类型
this.nodeModelMap = nodeModelMap;
// 需要执行的队列
this.executeQueue = [];
// 执行中的任务
this.executingInstance = null;
this.nodeMap = new Map();
this.nodeConfigMap = new Map();
this.isRunning = false;
this.NodeManager = new NodeManager();
this.scheduler = new Scheduler({
flowModel: this,
recorder,
});
this.scheduler.on(EVENT_INSTANCE_COMPLETE, (result) => {
this.onTaskFinished(result);
Expand Down Expand Up @@ -119,7 +121,7 @@ export default class FlowModel {
incoming: [],
outgoing: [],
};
this.nodeMap.set(node.id, nodeConfig);
this.nodeConfigMap.set(node.id, nodeConfig);
if (node.type === this.startNodeType) {
this.startNodes.push(nodeConfig);
}
Expand All @@ -128,8 +130,8 @@ export default class FlowModel {
}
});
edges.forEach((edge) => {
const sourceNode = this.nodeMap.get(edge.sourceNodeId);
const targetNode = this.nodeMap.get(edge.targetNodeId);
const sourceNode = this.nodeConfigMap.get(edge.sourceNodeId);
const targetNode = this.nodeConfigMap.get(edge.targetNodeId);
if (sourceNode) {
sourceNode.outgoing.push({
id: edge.id,
Expand Down Expand Up @@ -178,28 +180,14 @@ export default class FlowModel {
});
}
/**
* 在没有指定开始节点的情况下,创建一个新的流程实例,从流程的所有开始节点开始执行。
* 创建节点实例
* @param nodeId 节点Id
* @returns 节点示例
*/
// async createInstance() {
// this.executionId = createExecId();
// const startNodes = this.NodeManager.getStartTasks();
// startNodes.forEach((startNode) => {
// this.scheduler.addTask({
// executionId: this.executionId,
// taskId: startNode.nodeId,
// nodeId: startNode.nodeId,
// });
// });
// const result = await this.scheduler.run({
// executionId: this.executionId,
// });
// return result;
// }

createTask(nodeId: string) {
const nodeConfig = this.nodeMap.get(nodeId);
const nodeConfig = this.nodeConfigMap.get(nodeId);
const NodeModel = this.nodeModelMap.get(nodeConfig.type);
const model = new NodeModel(nodeConfig);
return model;
const task = new NodeModel(nodeConfig);
return task;
}
}
21 changes: 19 additions & 2 deletions packages/engine/src/Scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import type FlowModel from './FlowModel';
import { EVENT_INSTANCE_COMPLETE, FlowStatus } from './constant/constant';
import type { NextTaskUnit } from './nodes/BaseNode';
import { createTaskId } from './util/ID';
import type Recorder from './recorder';

type TaskUnitMap = Map<string, TaskUnit>;

Expand All @@ -15,12 +16,14 @@ export default class Scheduler extends EventEmitter {
taskQueueMap: Map<string, TaskUnit[]>;
taskRunningMap: Map<string, TaskUnitMap>;
flowModel: FlowModel;
recorder: Recorder;
currentTask: TaskUnit | null;
constructor(config) {
super();
this.taskQueueMap = new Map();
this.taskRunningMap = new Map();
this.flowModel = config.flowModel;
this.recorder = config.recorder;
this.currentTask = null;
}
run(executionId) {
Expand Down Expand Up @@ -65,14 +68,17 @@ export default class Scheduler extends EventEmitter {
}
async exec(taskUnit: TaskUnit) {
const model = this.flowModel.createTask(taskUnit.nodeId);
model.execute({
const r = await model.execute({
executionId: taskUnit.executionId,
taskId: taskUnit.taskId,
nodeId: taskUnit.nodeId,
next: this.next.bind(this),
});
if (!r) this.cancel(taskUnit);
}
cancel(taskUnit: TaskUnit) {
// TODO: 流程执行异常中断
}

async next(data: NextTaskUnit) {
if (data.outgoing && data.outgoing.length > 0) {
data.outgoing.forEach((item) => {
Expand All @@ -82,9 +88,20 @@ export default class Scheduler extends EventEmitter {
});
});
}
this.saveTaskResult(data);
this.removeRunningTask(data);
this.run(data.executionId);
}
saveTaskResult(data: NextTaskUnit) {
this.recorder.addTask({
executionId: data.executionId,
taskId: data.taskId,
nodeId: data.nodeId,
nodeType: data.nodeType,
timestamp: Date.now(),
properties: data.properties,
});
}
getNextTask(executionId) {
const currentTaskQueue = this.taskQueueMap.get(executionId);
if (!currentTaskQueue || currentTaskQueue.length === 0) {
Expand Down
38 changes: 32 additions & 6 deletions packages/engine/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@ import type { GraphConfigData } from '@logicflow/core';
import FlowModel, { TaskParams } from './FlowModel';
import StartNode from './nodes/StartNode';
import TaskNode from './nodes/TaskNode';
import Recorder from './recorder';

export default class Engine {
global: Record<string, any>;
graphData: GraphConfigData;
modelMap: Map<string, any>;
nodeModelMap: Map<string, any>;
flowModel: FlowModel;
recorder: Recorder;
constructor() {
this.modelMap = new Map();
this.nodeModelMap = new Map();
this.recorder = new Recorder();
// register node
this.register({
type: StartNode.nodeTypeName,
Expand All @@ -25,13 +28,30 @@ export default class Engine {
* @param nodeConfig { type: 'custom-node', model: Class }
*/
register(nodeConfig) {
this.modelMap.set(nodeConfig.type, nodeConfig.model);
this.nodeModelMap.set(nodeConfig.type, nodeConfig.model);
}
/**
* 自定义执行记录的存储,默认浏览器使用 sessionStorage,nodejs 使用内存存储。
* 注意:由于执行记录不会主动删除,所以需要自行清理。
* nodejs环境建议自定义为持久化存储。
* engine.setCustomRecorder({
* async addTask(task) {}
* async getTask(taskId) {}
* async getExecutionTasks(executionId) {}
* clear() {}
* });
*/
setCustomRecorder(recorder: Recorder) {
this.recorder = recorder;
}
/**
* 加载流程图数据
*/
load({ graphData, startNodeType = 'StartNode' }) {
this.flowModel = new FlowModel(this.modelMap);
this.flowModel = new FlowModel({
nodeModelMap: this.nodeModelMap,
recorder: this.recorder,
});
this.flowModel.setStartNodeType(startNodeType);
this.flowModel.load(graphData);
return this.flowModel;
Expand All @@ -40,8 +60,6 @@ export default class Engine {
* 执行流程,允许多次调用。
*/
async execute(execParam?: TaskParams) {
// const result = await this.flowModel.execute(flowResult);
// return result;
return new Promise((resolve) => {
if (!execParam) {
execParam = {};
Expand All @@ -54,6 +72,14 @@ export default class Engine {
});
});
}
async getExecutionRecord(executionId) {
const tasks = await this.recorder.getExecutionTasks(executionId);
const records = [];
for (let i = 0; i < tasks.length; i++) {
records.push(this.recorder.getTask(tasks[i]));
}
return Promise.all(records);
}
}

export {
Expand Down
Loading

0 comments on commit d73aa46

Please sign in to comment.