diff --git a/convex/crons.ts b/convex/crons.ts index 38764a6a..3181a4d4 100644 --- a/convex/crons.ts +++ b/convex/crons.ts @@ -102,6 +102,49 @@ export const vacuumOldEntries = internalMutation({ }, }); +export const vacuumOldMemories = internalMutation({ + handler: async ( + ctx, + { + age, + ...args + }: { + untilTs?: number; + age: number; + cursor: null | string; + soFar: number; + }, + ) => { + const untilTs = args.untilTs ?? Date.now() - age; + const results = await ctx.db + .query('memories') + .withIndex('by_creation_time', (q) => q.lt('_creationTime', untilTs)) + .paginate({ cursor: args.cursor, numItems: VACUUM_BATCH_SIZE }); + const vectorsToDelete = []; + for (const doc of results.page) { + await ctx.db.delete(doc._id); + await ctx.db.delete(doc.embeddingId); + vectorsToDelete.push(doc.embeddingId); + } + if (vectorsToDelete.length) { + await ctx.scheduler.runAfter(0, internal.lib.pinecone.deleteVectors, { + tableName: 'embeddings', + ids: vectorsToDelete, + }); + } + if (results.isDone) { + console.debug(`Vacuumed ${results.page.length} old memories.`); + } else { + await ctx.scheduler.runAfter(0, internal.crons.vacuumOldMemories, { + untilTs, + age, + cursor: results.continueCursor, + soFar: args.soFar + results.page.length, + }); + } + }, +}); + const crons = cronJobs(); crons.interval('restart idle agents', { seconds: 60 }, internal.crons.recoverStoppedAgents); crons.interval('restart thinking agents', { seconds: 60 }, internal.crons.recoverThinkingAgents); @@ -111,8 +154,7 @@ crons.interval('vacuum old journal entries', { hours: 1 }, internal.crons.vacuum cursor: null, soFar: 0, }); -crons.interval('vacuum old memory entries', { hours: 6 }, internal.crons.vacuumOldEntries, { - table: 'memories', +crons.interval('vacuum old memory entries', { hours: 6 }, internal.crons.vacuumOldMemories, { age: VACUUM_MEMORIES_AGE, cursor: null, soFar: 0, diff --git a/convex/lib/pinecone.ts b/convex/lib/pinecone.ts index 112e4d6b..8c86f638 100644 --- a/convex/lib/pinecone.ts +++ b/convex/lib/pinecone.ts @@ -40,6 +40,19 @@ export async function pineconeIndex() { return client.Index(orThrow(process.env.PINECONE_INDEX_NAME)); } +export const deleteVectors = internalAction({ + handler: async (ctx, { tableName, ids }: { tableName: TableNames; ids: Id[] }) => { + const pinecone = await pineconeIndex(); + await pinecone.delete1({ + // NOTE: Pinecone namespaces are a paid feature. Uncomment this line + // to use multiple Convex instances on the same Pinecone index: + // + // namespace: `${tableName} [${process.env.CONVEX_CLOUD_URL}]`, + ids, + }); + }, +}); + export const deleteAllVectors = internalAction({ args: {}, handler: async (ctx, args) => {