Skip to content

Commit

Permalink
Worker Versioning (#277)
Browse files Browse the repository at this point in the history
Co-authored-by: Loren 🤓 <lorensr@gmail.com>
  • Loading branch information
Sushisource and lorensr committed Jul 6, 2023
1 parent be77f7a commit eeb42cb
Show file tree
Hide file tree
Showing 19 changed files with 373 additions and 1 deletion.
1 change: 1 addition & 0 deletions .scripts/copy-shared-files.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ const POST_CREATE_EXCLUDE = [
'nestjs-exchange-rates',
'food-delivery',
'search-attributes',
'worker-versioning',
];

const PRETTIERRC_EXCLUDE = ['food-delivery'];
Expand Down
3 changes: 2 additions & 1 deletion .scripts/list-of-samples.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
"timer-examples",
"timer-progress",
"vscode-debugger",
"worker-specific-task-queues"
"worker-specific-task-queues",
"worker-versioning"
]
}
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ and you'll be given the list of sample options.
- [**Patching**](https://docs.temporal.io/workflows/#workflow-versioning): Patch in new Workflow code when making updates to Workflows that have executions in progress in production.
- [**Sinks**](./sinks): Use Sinks to extract data out of Workflows for alerting/logging/metrics/tracing purposes.
- [**Instrumentation**](./instrumentation): Use a [winston](https://github.com/winstonjs/winston) logger to get logs out of all SDK components and get metrics and traces out of Rust Core.
- [**Worker Versioning**](./worker-versioning): Version Workers with Build IDs in order to deploy incompatible changes to Workflow code.
- [**Protobufs**](./protobufs): Use [Protobufs](https://docs.temporal.io/security/#default-data-converter).
- [**Custom Payload Converter**](./ejson): Customize data serialization by creating a `PayloadConverter` that uses EJSON to convert Dates, binary, and regexes.
- **Monorepos**:
Expand Down
3 changes: 3 additions & 0 deletions worker-versioning/.eslintignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
node_modules
lib
.eslintrc.js
48 changes: 48 additions & 0 deletions worker-versioning/.eslintrc.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
const { builtinModules } = require('module');

const ALLOWED_NODE_BUILTINS = new Set(['assert']);

module.exports = {
root: true,
parser: '@typescript-eslint/parser',
parserOptions: {
project: './tsconfig.json',
tsconfigRootDir: __dirname,
},
plugins: ['@typescript-eslint', 'deprecation'],
extends: [
'eslint:recommended',
'plugin:@typescript-eslint/eslint-recommended',
'plugin:@typescript-eslint/recommended',
'prettier',
],
rules: {
// recommended for safety
'@typescript-eslint/no-floating-promises': 'error', // forgetting to await Activities and Workflow APIs is bad
'deprecation/deprecation': 'warn',

// code style preference
'object-shorthand': ['error', 'always'],

// relaxed rules, for convenience
'@typescript-eslint/no-unused-vars': [
'warn',
{
argsIgnorePattern: '^_',
varsIgnorePattern: '^_',
},
],
'@typescript-eslint/no-explicit-any': 'off',
},
overrides: [
{
files: ['src/workflows.ts', 'src/workflows-*.ts', 'src/workflows/*.ts'],
rules: {
'no-restricted-imports': [
'error',
...builtinModules.filter((m) => !ALLOWED_NODE_BUILTINS.has(m)).flatMap((m) => [m, `node:${m}`]),
],
},
},
],
};
2 changes: 2 additions & 0 deletions worker-versioning/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
lib
node_modules
1 change: 1 addition & 0 deletions worker-versioning/.npmrc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package-lock=false
1 change: 1 addition & 0 deletions worker-versioning/.nvmrc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
16
17 changes: 17 additions & 0 deletions worker-versioning/.post-create
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
To begin development, install the Temporal CLI:

Mac: {cyan brew install temporal}
Other: Download and extract the latest release from https://github.com/temporalio/cli/releases/latest

Start Temporal Server:

{cyan temporal server start-dev}

Use Node version 16+:

Mac: {cyan brew install node@16}
Other: https://nodejs.org/en/download/

Then, in the project directory, run:

{cyan npm run example}
1 change: 1 addition & 0 deletions worker-versioning/.prettierignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
lib
2 changes: 2 additions & 0 deletions worker-versioning/.prettierrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
printWidth: 120
singleQuote: true
16 changes: 16 additions & 0 deletions worker-versioning/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Build ID Based Versioning

This sample illustrates how to use [Build ID based versioning](https://docs.temporal.io/workers#worker-versioning) to help you appropriately roll out
incompatible and compatible changes to workflow and activity code for the same task queue.

## Description

The sample shows you how to roll out both a compatible change and an incompatible change to a
workflow.

## Running

1. `temporal server start-dev --dynamic-config-value frontend.workerVersioningDataAPIs=true --dynamic-config-value frontend.workerVersioningWorkflowAPIs=true --dynamic-config-value worker.buildIdScavengerEnabled=true`
to start [Temporal Server](https://github.com/temporalio/cli/#installation) with Worker Versioning enabled.
1. `npm install` to install dependencies.
1. `npm run example` to run the example. It starts multiple workers and workflows and demonstrates their interaction.
38 changes: 38 additions & 0 deletions worker-versioning/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
{
"name": "temporal-worker-versioning",
"version": "0.1.0",
"private": true,
"scripts": {
"build": "tsc --build",
"example": "ts-node src/example.ts"
},
"nodemonConfig": {
"execMap": {
"ts": "ts-node"
},
"ext": "ts",
"watch": [
"src"
]
},
"dependencies": {
"@temporalio/activity": "1.8.0",
"@temporalio/client": "1.8.0",
"@temporalio/worker": "1.8.0",
"@temporalio/workflow": "1.8.0",
"nanoid": "3.x"
},
"devDependencies": {
"@tsconfig/node16": "^1.0.0",
"@types/node": "^16.11.43",
"@typescript-eslint/eslint-plugin": "^5.0.0",
"@typescript-eslint/parser": "^5.0.0",
"eslint": "^7.32.0",
"eslint-config-prettier": "^8.3.0",
"eslint-plugin-deprecation": "^1.2.1",
"nodemon": "^2.0.12",
"prettier": "^2.3.2",
"ts-node": "^10.8.1",
"typescript": "^4.4.2"
}
}
16 changes: 16 additions & 0 deletions worker-versioning/src/activities.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
export async function greet(name: string): Promise<string> {
return `Hello, ${name}!`;
}

// This function represents the need to change the interface to `greet`. Perhaps we didn't realize
// we would need to pass additional data, and we change the string parameter to an object. (Hint:
// It's a great idea to always start with objects for this reason, as they can be extended without
// breaking compatibility as long as you use a wire format that maintains compatibility.)
export async function superGreet(input: SuperGreetInput): Promise<string> {
return `Hello, ${input.name}! You are number ${input.aNumber}`;
}

export interface SuperGreetInput {
name: string;
aNumber: number;
}
125 changes: 125 additions & 0 deletions worker-versioning/src/example.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
import { Connection, Client } from '@temporalio/client';
import { uuid4 } from '@temporalio/workflow';
import { proceeder, versioningExample } from './workflowsV1';
import { Worker } from '@temporalio/worker';
import * as activities from './activities';

async function run() {
const connection = await Connection.connect();
const client = new Client({
connection,
});
const taskQueue = 'versioned-queue_' + uuid4();

// First, let's make the task queue use the build id versioning feature by adding an initial
// default version to the queue:
await client.taskQueue.updateBuildIdCompatibility(taskQueue, {
operation: 'addNewIdInNewDefaultSet',
buildId: '1.0',
});

// Start a 1.0 worker
const worker1 = await Worker.create({
workflowsPath: require.resolve('./workflowsV1'),
activities,
taskQueue,
buildId: '1.0',
useVersioning: true,
});
const worker1Run = worker1.run();

// Start a workflow that will run on the 1.0 worker
const firstWorkflowID = 'worker-versioning-first_' + uuid4();
const firstWorkflow = await client.workflow.start(versioningExample, {
workflowId: firstWorkflowID,
taskQueue,
workflowExecutionTimeout: '5 minutes',
});

// Signal the workflow to drive it
await firstWorkflow.signal(proceeder, 'go');

// Give a chance for the worker to process the signal
await new Promise((resolve) => setTimeout(resolve, 3000));

// Add a new compatible version to the queue
await client.taskQueue.updateBuildIdCompatibility(taskQueue, {
operation: 'addNewCompatibleVersion',
buildId: '1.1',
existingCompatibleBuildId: '1.0',
});

// Stop the old worker, and start a 1.1 worker. We do this to speed along the example, since the
// 1.0 worker may continue to process tasks briefly after the version update.
worker1.shutdown();
await worker1Run;
const worker11 = await Worker.create({
workflowsPath: require.resolve('./workflowsV11'),
activities,
taskQueue,
buildId: '1.1',
useVersioning: true,
});
const worker11Run = worker11.run();

// Continue driving the workflow. Take note that the new version of the workflow run by the 1.1
// worker is the one that takes over! You might see a workflow task timeout, if the 1.0 worker is
// processing a task as the version update happens. That's normal.
await firstWorkflow.signal(proceeder, 'go');

// Add a new *incompatible* version to the task queue, which will become the new overall default
// for the queue.
await client.taskQueue.updateBuildIdCompatibility(taskQueue, {
operation: 'addNewIdInNewDefaultSet',
buildId: '2.0',
});
// Start a 2.0 worker
const worker2 = await Worker.create({
workflowsPath: require.resolve('./workflowsV2'),
activities,
taskQueue,
buildId: '2.0',
useVersioning: true,
});
const worker2Run = worker2.run();

// Start a new workflow. Note that it will run on the new 2.0 version, without the client
// invocation changing at all!
const secondWorkflowID = 'worker-versioning-second_' + uuid4();
const secondWorkflow = await client.workflow.start(versioningExample, {
workflowId: secondWorkflowID,
taskQueue,
workflowExecutionTimeout: '5 minutes',
});

// Drive both workflows once more before concluding them
// firstWorkflow will continue to be run on the 1.1 Worker.
await firstWorkflow.signal(proceeder, 'go');
await secondWorkflow.signal(proceeder, 'go');
await firstWorkflow.signal(proceeder, 'finish');
await secondWorkflow.signal(proceeder, 'finish');

// Wait for workflows to finish
await Promise.all([firstWorkflow.result(), secondWorkflow.result()]);

// Lastly we'll demonstrate how you can use the gRPC api to determine if certain build IDs are
// ready to be retired. There's more information in the documentation, but here's a quick example
// that will show us that we can retire the 1.0 worker:
const reachability = await client.taskQueue.getReachability({
buildIds: ['1.0'],
});
console.log('Reachability:', reachability);
if (reachability.buildIdReachability['1.0'].taskQueueReachability[taskQueue].length === 0) {
console.log('Confirmed that 1.0 is ready to be retired!');
}

// Stop all workers
worker11.shutdown();
worker2.shutdown();
await Promise.all([worker11Run, worker2Run]);
}

run().catch((err) => {
console.error(err);
process.exit(1);
});
24 changes: 24 additions & 0 deletions worker-versioning/src/workflowsV1.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import { condition, defineSignal, proxyActivities, setHandler, log } from '@temporalio/workflow';
import type * as activities from './activities';

const { greet } = proxyActivities<typeof activities>({
startToCloseTimeout: '1 minute',
});

export const proceeder = defineSignal<[string]>('proceed');

/**
* The 1.0 version of the workflow we'll be making changes to
*/
export async function versioningExample(): Promise<string> {
log.info('Workflow V1 started!', {});
let shouldFinish = false;
setHandler(proceeder, async (input: string) => {
await greet('from V1 worker!');
if (input == 'finish') {
shouldFinish = true;
}
});
await condition(() => shouldFinish);
return 'Concluded workflow on v1';
}
36 changes: 36 additions & 0 deletions worker-versioning/src/workflowsV11.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import { condition, defineSignal, proxyActivities, setHandler, log, patched } from '@temporalio/workflow';
import type * as activities from './activities';

const { greet, superGreet } = proxyActivities<typeof activities>({
startToCloseTimeout: '1 minute',
});

export const proceeder = defineSignal<[string]>('proceed');

/**
* The 1.1 version of the workflow, which is compatible with the first version.
*
* The compatible changes we've made are:
* - Altering the log lines
* - Using the `patched` API to properly introduce branching behavior while maintaining
* compatibility
*/
export async function versioningExample(): Promise<string> {
log.info('Workflow V1.1 started!', {});
let shouldFinish = false;
setHandler(proceeder, async (input: string) => {
if (patched('different-activity')) {
await superGreet({ name: 'from V1.1 worker!', aNumber: 100 });
} else {
// Note it is a valid compatible change to alter the input to an activity. However, because
// we're using the patched API, this branch would only be taken if the workflow was started on
// a v1 worker.
await greet('from V1.1 worker!');
}
if (input == 'finish') {
shouldFinish = true;
}
});
await condition(() => shouldFinish);
return 'Concluded workflow on v1.1';
}
Loading

0 comments on commit eeb42cb

Please sign in to comment.