diff --git a/src/change_stream.ts b/src/change_stream.ts index 3148578d12..5041f1ee50 100644 --- a/src/change_stream.ts +++ b/src/change_stream.ts @@ -162,6 +162,14 @@ export interface ChangeStreamDocumentKey { documentKey: { _id: InferIdType; [shardKey: string]: any }; } +/** @public */ +export interface ChangeStreamSplitEvent { + /** Which fragment of the change this is. */ + fragment: number; + /** The total number of fragments. */ + of: number; +} + /** @public */ export interface ChangeStreamDocumentCommon { /** @@ -192,6 +200,13 @@ export interface ChangeStreamDocumentCommon { * Only present if the operation is part of a multi-document transaction. */ lsid?: ServerSessionId; + + /** + * When the change stream's backing aggregation pipeline contains the $changeStreamSplitLargeEvent + * stage, events larger than 16MB will be split into multiple events and contain the + * following information about which fragment the current event is. + */ + splitEvent?: ChangeStreamSplitEvent; } /** @public */ diff --git a/src/index.ts b/src/index.ts index 881029d524..a35c41565d 100644 --- a/src/index.ts +++ b/src/index.ts @@ -194,6 +194,7 @@ export type { ChangeStreamReplaceDocument, ChangeStreamReshardCollectionDocument, ChangeStreamShardCollectionDocument, + ChangeStreamSplitEvent, ChangeStreamUpdateDocument, OperationTime, ResumeOptions, diff --git a/test/integration/change-streams/change_streams.prose.test.ts b/test/integration/change-streams/change_streams.prose.test.ts index 6b155c05c6..776790ae23 100644 --- a/test/integration/change-streams/change_streams.prose.test.ts +++ b/test/integration/change-streams/change_streams.prose.test.ts @@ -1,8 +1,11 @@ import { expect } from 'chai'; +import { once } from 'events'; import * as sinon from 'sinon'; import { setTimeout } from 'timers'; +import { promisify } from 'util'; import { + AbstractCursor, type ChangeStream, type CommandFailedEvent, type CommandStartedEvent, @@ -16,6 +19,7 @@ import { Timestamp } from '../../mongodb'; import * as mock from '../../tools/mongodb-mock/index'; +import { getSymbolFrom } from '../../tools/utils'; import { setupDatabase } from '../shared'; /** @@ -68,6 +72,14 @@ function triggerResumableError( triggerError(); } +const initIteratorMode = async (cs: ChangeStream) => { + const init = getSymbolFrom(AbstractCursor.prototype, 'kInit'); + const initEvent = once(cs.cursor, 'init'); + await promisify(cs.cursor[init].bind(cs.cursor))(); + await initEvent; + return; +}; + /** Waits for a change stream to start */ function waitForStarted(changeStream, callback) { changeStream.cursor.once('init', () => { @@ -938,4 +950,52 @@ describe('Change Stream prose tests', function () { } }); }); + + describe('19. Validate that large ChangeStream events are split when using $changeStreamSplitLargeEvent', function () { + let client; + let db; + let collection; + let changeStream; + + beforeEach(async function () { + const configuration = this.configuration; + client = configuration.newClient(); + db = client.db('test'); + // Create a new collection _C_ with changeStreamPreAndPostImages enabled. + await db.createCollection('changeStreamSplitTests', { + changeStreamPreAndPostImages: { enabled: true } + }); + collection = db.collection('changeStreamSplitTests'); + }); + + afterEach(async function () { + await changeStream.close(); + await collection.drop(); + await client.close(); + }); + + it('splits the event into multiple fragments', { + metadata: { requires: { topology: '!single', mongodb: '>=7.0.0' } }, + test: async function () { + // Insert into _C_ a document at least 10mb in size, e.g. { "value": "q"*10*1024*1024 } + await collection.insertOne({ value: 'q'.repeat(10 * 1024 * 1024) }); + // Create a change stream _S_ by calling watch on _C_ with pipeline + // [{ "$changeStreamSplitLargeEvent": {} }] and fullDocumentBeforeChange=required. + changeStream = collection.watch([{ $changeStreamSplitLargeEvent: {} }], { + fullDocumentBeforeChange: 'required' + }); + await initIteratorMode(changeStream); + // Call updateOne on _C_ with an empty query and an update setting the field to a new + // large value, e.g. { "$set": { "value": "z"*10*1024*1024 } }. + await collection.updateOne({}, { $set: { value: 'z'.repeat(10 * 1024 * 1024) } }); + // Collect two events from _S_. + const eventOne = await changeStream.next(); + const eventTwo = await changeStream.next(); + // Assert that the events collected have splitEvent fields { "fragment": 1, "of": 2 } + // and { "fragment": 2, "of": 2 }, in that order. + expect(eventOne.splitEvent).to.deep.equal({ fragment: 1, of: 2 }); + expect(eventTwo.splitEvent).to.deep.equal({ fragment: 2, of: 2 }); + } + }); + }); });