Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

community[minor]: vercel kv graph checkpointer #5948

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions libs/langchain-community/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -1042,6 +1042,10 @@ chains/graph_qa/cypher.cjs
chains/graph_qa/cypher.js
chains/graph_qa/cypher.d.ts
chains/graph_qa/cypher.d.cts
langgraph/checkpointers/vercel_kv.cjs
langgraph/checkpointers/vercel_kv.js
langgraph/checkpointers/vercel_kv.d.ts
langgraph/checkpointers/vercel_kv.d.cts
node_modules
dist
.yarn
8 changes: 6 additions & 2 deletions libs/langchain-community/langchain.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,9 @@ export const config = {
"experimental/chat_models/ollama_functions": "experimental/chat_models/ollama_functions",
"experimental/llms/chrome_ai": "experimental/llms/chrome_ai",
// chains
"chains/graph_qa/cypher": "chains/graph_qa/cypher"
"chains/graph_qa/cypher": "chains/graph_qa/cypher",
// langgraph checkpointers
"langgraph/checkpointers/vercel_kv": "langgraph/checkpointers/vercel_kv"
},
requiresOptionalDependency: [
"tools/aws_sfn",
Expand Down Expand Up @@ -517,7 +519,9 @@ export const config = {
"experimental/multimodal_embeddings/googlevertexai",
"experimental/hubs/makersuite/googlemakersuitehub",
// chains
"chains/graph_qa/cypher"
"chains/graph_qa/cypher",
// langgraph checkpointers
"langgraph/checkpointers/vercel_kv"
],
packageSuffix: "community",
tsConfigPath: resolve("./tsconfig.json"),
Expand Down
20 changes: 19 additions & 1 deletion libs/langchain-community/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
"@gradientai/nodejs-sdk": "^1.2.0",
"@huggingface/inference": "^2.6.4",
"@jest/globals": "^29.5.0",
"@langchain/langgraph": "~0.0.26",
"@langchain/scripts": "~0.0.14",
"@langchain/standard-tests": "0.0.0",
"@layerup/layerup-security": "^1.5.12",
Expand Down Expand Up @@ -239,6 +240,7 @@
"@google-cloud/storage": "^6.10.1 || ^7.7.0",
"@gradientai/nodejs-sdk": "^1.2.0",
"@huggingface/inference": "^2.6.4",
"@langchain/langgraph": "~0.0.26",
"@layerup/layerup-security": "^1.5.12",
"@mendable/firecrawl-js": "^0.0.13",
"@mlc-ai/web-llm": "0.2.46",
Expand Down Expand Up @@ -413,6 +415,9 @@
"@huggingface/inference": {
chentschel marked this conversation as resolved.
Show resolved Hide resolved
"optional": true
},
"@langchain/langgraph": {
"optional": true
},
"@layerup/layerup-security": {
"optional": true
},
Expand Down Expand Up @@ -3049,6 +3054,15 @@
"import": "./chains/graph_qa/cypher.js",
"require": "./chains/graph_qa/cypher.cjs"
},
"./langgraph/checkpointers/vercel_kv": {
"types": {
"import": "./langgraph/checkpointers/vercel_kv.d.ts",
"require": "./langgraph/checkpointers/vercel_kv.d.cts",
"default": "./langgraph/checkpointers/vercel_kv.d.ts"
},
"import": "./langgraph/checkpointers/vercel_kv.js",
"require": "./langgraph/checkpointers/vercel_kv.cjs"
},
"./package.json": "./package.json"
},
"files": [
Expand Down Expand Up @@ -4096,6 +4110,10 @@
"chains/graph_qa/cypher.cjs",
"chains/graph_qa/cypher.js",
"chains/graph_qa/cypher.d.ts",
"chains/graph_qa/cypher.d.cts"
"chains/graph_qa/cypher.d.cts",
"langgraph/checkpointers/vercel_kv.cjs",
"langgraph/checkpointers/vercel_kv.js",
"langgraph/checkpointers/vercel_kv.d.ts",
"langgraph/checkpointers/vercel_kv.d.cts"
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/* eslint-disable no-process-env */

import { describe, test, expect } from "@jest/globals";
import { Checkpoint, CheckpointTuple } from "@langchain/langgraph";
import { VercelKVSaver } from "../vercel_kv.js";

const checkpoint1: Checkpoint = {
v: 1,
id: "1ef390c8-3ed9-6132-ffff-12d236274621",
ts: "2024-04-19T17:19:07.952Z",
channel_values: {
someKey1: "someValue1",
},
channel_versions: {
someKey2: 1,
},
versions_seen: {
someKey3: {
someKey4: 1,
},
},
};

const checkpoint2: Checkpoint = {
v: 1,
id: "1ef390c8-3ed9-6133-8001-419c612dad04",
ts: "2024-04-20T17:19:07.952Z",
channel_values: {
someKey1: "someValue2",
},
channel_versions: {
someKey2: 2,
},
versions_seen: {
someKey3: {
someKey4: 2,
},
},
};

describe("VercelKVSaver", () => {
const vercelSaver = new VercelKVSaver({
url: process.env.VERCEL_KV_API_URL!,
token: process.env.VERCEL_KV_API_TOKEN!,
});

test("should save and retrieve checkpoints correctly", async () => {
// save checkpoint
const runnableConfig = await vercelSaver.put(
{ configurable: { thread_id: "1" } },
checkpoint1,
{ source: "update", step: -1, writes: null }
);
expect(runnableConfig).toEqual({
configurable: {
thread_id: "1",
checkpoint_id: checkpoint1.id,
},
});

// get checkpoint tuple
const checkpointTuple = await vercelSaver.getTuple({
configurable: { thread_id: "1" },
});
expect(checkpointTuple?.config).toEqual({
configurable: {
thread_id: "1",
checkpoint_id: checkpoint1.id,
},
});
expect(checkpointTuple?.checkpoint).toEqual(checkpoint1);

// save another checkpoint
await vercelSaver.put(
{
configurable: {
thread_id: "1",
},
},
checkpoint2,
{ source: "update", step: -1, writes: null }
);
// list checkpoints
const checkpointTupleGenerator = vercelSaver.list({
configurable: { thread_id: "1" },
});

const checkpointTuples: CheckpointTuple[] = [];

for await (const checkpoint of checkpointTupleGenerator) {
checkpointTuples.push(checkpoint);
}
expect(checkpointTuples.length).toBe(2);

const checkpointTuple1 = checkpointTuples[0];
const checkpointTuple2 = checkpointTuples[1];

expect(checkpointTuple1.checkpoint.ts).toBe("2024-04-20T17:19:07.952Z");
expect(checkpointTuple2.checkpoint.ts).toBe("2024-04-19T17:19:07.952Z");
});
});
164 changes: 164 additions & 0 deletions libs/langchain-community/src/langgraph/checkpointers/vercel_kv.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
import { VercelKV, createClient } from "@vercel/kv";

import { RunnableConfig } from "@langchain/core/runnables";
import {
BaseCheckpointSaver,
Checkpoint,
CheckpointMetadata,
CheckpointTuple,
SerializerProtocol,
} from "@langchain/langgraph/web";

// snake_case is used to match Python implementation
interface KVRow {
checkpoint: string;
metadata: string;
}

interface KVConfig {
url: string;
token: string;
}

export class VercelKVSaver extends BaseCheckpointSaver {
private kv: VercelKV;

constructor(config: KVConfig, serde?: SerializerProtocol<unknown>) {
super(serde);
this.kv = createClient(config);
}

async getTuple(config: RunnableConfig): Promise<CheckpointTuple | undefined> {
const thread_id = config.configurable?.thread_id;
const checkpoint_id = config.configurable?.checkpoint_id;

if (!thread_id) {
return undefined;
}

const key = checkpoint_id
? `${thread_id}:${checkpoint_id}`
: `${thread_id}:last`;

const row: KVRow | null = await this.kv.get(key);

if (!row) {
return undefined;
}

const [checkpoint, metadata] = await Promise.all([
this.serde.parse(row.checkpoint),
this.serde.parse(row.metadata),
]);

return {
checkpoint: checkpoint as Checkpoint,
metadata: metadata as CheckpointMetadata,
config: checkpoint_id
? config
: {
configurable: {
thread_id,
checkpoint_id: (checkpoint as Checkpoint).id,
},
},
};
}

async *list(
config: RunnableConfig,
limit?: number,
before?: RunnableConfig
): AsyncGenerator<CheckpointTuple> {
const thread_id: string = config.configurable?.thread_id;

// LUA script to get keys excluding those starting with "last"
const luaScript = `
local prefix = ARGV[1]
local cursor = '0'
local result = {}
repeat
local scanResult = redis.call('SCAN', cursor, 'MATCH', prefix .. '*', 'COUNT', 1000)
cursor = scanResult[1]
local keys = scanResult[2]
for _, key in ipairs(keys) do
if key:sub(-5) ~= ':last' then
table.insert(result, key)
end
end
until cursor == '0'
return result
`;

// Execute the LUA script with the thread_id as an argument
const keys: string[] = await this.kv.eval(luaScript, [], [thread_id]);

const filteredKeys = keys.filter((key: string) => {
const [, checkpoint_id] = key.split(":");

return !before || checkpoint_id < before?.configurable?.checkpoint_id;
});

const sortedKeys = filteredKeys
.sort((a: string, b: string) => b.localeCompare(a))
.slice(0, limit);

const rows: (KVRow | null)[] = await this.kv.mget(...sortedKeys);
for (const row of rows) {
if (row) {
const [checkpoint, metadata] = await Promise.all([
this.serde.parse(row.checkpoint),
this.serde.parse(row.metadata),
]);

yield {
config: {
configurable: {
thread_id,
checkpoint_id: (checkpoint as Checkpoint).id,
},
},
checkpoint: checkpoint as Checkpoint,
metadata: metadata as CheckpointMetadata,
};
}
}
}

async put(
config: RunnableConfig,
checkpoint: Checkpoint,
metadata: CheckpointMetadata
): Promise<RunnableConfig> {
const thread_id = config.configurable?.thread_id;

if (!thread_id || !checkpoint.id) {
throw new Error("Thread ID and Checkpoint ID must be defined");
}

const row: KVRow = {
checkpoint: this.serde.stringify(checkpoint),
metadata: this.serde.stringify(metadata),
};

// LUA script to set checkpoint data atomically"
const luaScript = `
local thread_id = ARGV[1]
local checkpoint_id = ARGV[2]
local row = ARGV[3]

redis.call('SET', thread_id .. ':' .. checkpoint_id, row)
redis.call('SET', thread_id .. ':last', row)
`;

// Save the checkpoint and the last checkpoint
await this.kv.eval(luaScript, [], [thread_id, checkpoint.id, row]);

return {
configurable: {
thread_id,
checkpoint_id: checkpoint.id,
},
};
}
}
1 change: 1 addition & 0 deletions libs/langchain-community/src/load/import_constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -183,4 +183,5 @@ export const optionalImportEntrypoints: string[] = [
"langchain_community/experimental/multimodal_embeddings/googlevertexai",
"langchain_community/experimental/hubs/makersuite/googlemakersuitehub",
"langchain_community/chains/graph_qa/cypher",
"langchain_community/langgraph/checkpointers/vercel_kv",
];
Loading
Loading