Skip to content

Commit

Permalink
refactor(NODE-5471): refactor crud operations to use async/await synt…
Browse files Browse the repository at this point in the history
…ax (#3777)
  • Loading branch information
malikj2000 authored Aug 2, 2023
1 parent 11631a2 commit 0c5c0b4
Show file tree
Hide file tree
Showing 12 changed files with 247 additions and 281 deletions.
16 changes: 8 additions & 8 deletions src/bulk/common.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { promisify } from 'util';

import { type BSONSerializeOptions, type Document, ObjectId, resolveBSONOptions } from '../bson';
import type { Collection } from '../collection';
import {
Expand All @@ -13,7 +15,7 @@ import type { CollationOptions, CommandOperationOptions } from '../operations/co
import { DeleteOperation, type DeleteStatement, makeDeleteStatement } from '../operations/delete';
import { executeOperation } from '../operations/execute_operation';
import { InsertOperation } from '../operations/insert';
import { AbstractCallbackOperation, type Hint } from '../operations/operation';
import { AbstractOperation, type Hint } from '../operations/operation';
import { makeUpdateStatement, UpdateOperation, type UpdateStatement } from '../operations/update';
import type { Server } from '../sdam/server';
import type { Topology } from '../sdam/topology';
Expand Down Expand Up @@ -828,33 +830,31 @@ export interface BulkWriteOptions extends CommandOperationOptions {
let?: Document;
}

const executeCommandsAsync = promisify(executeCommands);

/**
* TODO(NODE-4063)
* BulkWrites merge complexity is implemented in executeCommands
* This provides a vehicle to treat bulkOperations like any other operation (hence "shim")
* We would like this logic to simply live inside the BulkWriteOperation class
* @internal
*/
class BulkWriteShimOperation extends AbstractCallbackOperation {
class BulkWriteShimOperation extends AbstractOperation {
bulkOperation: BulkOperationBase;
constructor(bulkOperation: BulkOperationBase, options: BulkWriteOptions) {
super(options);
this.bulkOperation = bulkOperation;
}

executeCallback(
server: Server,
session: ClientSession | undefined,
callback: Callback<any>
): void {
execute(_server: Server, session: ClientSession | undefined): Promise<any> {
if (this.options.session == null) {
// An implicit session could have been created by 'executeOperation'
// So if we stick it on finalOptions here, each bulk operation
// will use this same session, it'll be passed in the same way
// an explicit session would be
this.options.session = session;
}
return executeCommands(this.bulkOperation, this.options, callback);
return executeCommandsAsync(this.bulkOperation, this.options);
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/bulk/unordered.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import type { Collection } from '../collection';
import { MongoInvalidArgumentError } from '../error';
import type { DeleteStatement } from '../operations/delete';
import type { UpdateStatement } from '../operations/update';
import type { Callback } from '../utils';
import { type Callback } from '../utils';
import {
Batch,
BatchType,
Expand Down
25 changes: 13 additions & 12 deletions src/operations/aggregate.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
import type { Document } from '../bson';
import { MongoInvalidArgumentError } from '../error';
import { type TODO_NODE_3286 } from '../mongo_types';
import type { Server } from '../sdam/server';
import type { ClientSession } from '../sessions';
import { type Callback, maxWireVersion, type MongoDBNamespace } from '../utils';
import { WriteConcern } from '../write_concern';
import {
type CollationOptions,
CommandCallbackOperation,
type CommandOperationOptions
} from './command';
import { type CollationOptions, CommandOperation, type CommandOperationOptions } from './command';
import { Aspect, defineAspects, type Hint } from './operation';

/** @internal */
Expand Down Expand Up @@ -40,7 +37,7 @@ export interface AggregateOptions extends CommandOperationOptions {
}

/** @internal */
export class AggregateOperation<T = Document> extends CommandCallbackOperation<T> {
export class AggregateOperation<T = Document> extends CommandOperation<T> {
override options: AggregateOptions;
target: string | typeof DB_AGGREGATE_COLLECTION;
pipeline: Document[];
Expand Down Expand Up @@ -93,11 +90,7 @@ export class AggregateOperation<T = Document> extends CommandCallbackOperation<T
this.pipeline.push(stage);
}

override executeCallback(
server: Server,
session: ClientSession | undefined,
callback: Callback<T>
): void {
override async execute(server: Server, session: ClientSession | undefined): Promise<T> {
const options: AggregateOptions = this.options;
const serverWireVersion = maxWireVersion(server);
const command: Document = { aggregate: this.target, pipeline: this.pipeline };
Expand Down Expand Up @@ -137,7 +130,15 @@ export class AggregateOperation<T = Document> extends CommandCallbackOperation<T
command.cursor.batchSize = options.batchSize;
}

super.executeCommandCallback(server, session, command, callback);
return super.executeCommand(server, session, command) as TODO_NODE_3286;
}

protected override executeCallback(
_server: Server,
_session: ClientSession | undefined,
_callback: Callback<T>
): void {
throw new Error('Method not implemented.');
}
}

Expand Down
26 changes: 9 additions & 17 deletions src/operations/bulk_write.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@ import type {
import type { Collection } from '../collection';
import type { Server } from '../sdam/server';
import type { ClientSession } from '../sessions';
import type { Callback } from '../utils';
import { AbstractCallbackOperation, Aspect, defineAspects } from './operation';
import { AbstractOperation, Aspect, defineAspects } from './operation';

/** @internal */
export class BulkWriteOperation extends AbstractCallbackOperation<BulkWriteResult> {
export class BulkWriteOperation extends AbstractOperation<BulkWriteResult> {
override options: BulkWriteOptions;
collection: Collection;
operations: AnyBulkWriteOperation[];
Expand All @@ -27,11 +26,10 @@ export class BulkWriteOperation extends AbstractCallbackOperation<BulkWriteResul
this.operations = operations;
}

override executeCallback(
override async execute(
server: Server,
session: ClientSession | undefined,
callback: Callback<BulkWriteResult>
): void {
session: ClientSession | undefined
): Promise<BulkWriteResult> {
const coll = this.collection;
const operations = this.operations;
const options = { ...this.options, ...this.bsonOptions, readPreference: this.readPreference };
Expand All @@ -43,19 +41,13 @@ export class BulkWriteOperation extends AbstractCallbackOperation<BulkWriteResul
: coll.initializeOrderedBulkOp(options);

// for each op go through and add to the bulk
try {
for (let i = 0; i < operations.length; i++) {
bulk.raw(operations[i]);
}
} catch (err) {
return callback(err);
for (let i = 0; i < operations.length; i++) {
bulk.raw(operations[i]);
}

// Execute the bulk
bulk.execute({ ...options, session }).then(
result => callback(undefined, result),
error => callback(error)
);
const result = await bulk.execute({ ...options, session });
return result;
}
}

Expand Down
23 changes: 13 additions & 10 deletions src/operations/count.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import type { Collection } from '../collection';
import type { Server } from '../sdam/server';
import type { ClientSession } from '../sessions';
import type { Callback, MongoDBNamespace } from '../utils';
import { CommandCallbackOperation, type CommandOperationOptions } from './command';
import { CommandOperation, type CommandOperationOptions } from './command';
import { Aspect, defineAspects } from './operation';

/** @public */
Expand All @@ -19,7 +19,7 @@ export interface CountOptions extends CommandOperationOptions {
}

/** @internal */
export class CountOperation extends CommandCallbackOperation<number> {
export class CountOperation extends CommandOperation<number> {
override options: CountOptions;
collectionName?: string;
query: Document;
Expand All @@ -32,11 +32,7 @@ export class CountOperation extends CommandCallbackOperation<number> {
this.query = filter;
}

override executeCallback(
server: Server,
session: ClientSession | undefined,
callback: Callback<number>
): void {
override async execute(server: Server, session: ClientSession | undefined): Promise<number> {
const options = this.options;
const cmd: Document = {
count: this.collectionName,
Expand All @@ -59,9 +55,16 @@ export class CountOperation extends CommandCallbackOperation<number> {
cmd.maxTimeMS = options.maxTimeMS;
}

super.executeCommandCallback(server, session, cmd, (err, result) => {
callback(err, result ? result.n : 0);
});
const result = await super.executeCommand(server, session, cmd);
return result ? result.n : 0;
}

protected override executeCallback(
_server: Server,
_session: ClientSession | undefined,
_callback: Callback<number>
): void {
throw new Error('Method not implemented.');
}
}

Expand Down
33 changes: 11 additions & 22 deletions src/operations/count_documents.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import type { Document } from '../bson';
import type { Collection } from '../collection';
import type { Server } from '../sdam/server';
import type { ClientSession } from '../sessions';
import type { Callback } from '../utils';
import { AggregateOperation, type AggregateOptions } from './aggregate';

/** @public */
Expand Down Expand Up @@ -32,26 +31,16 @@ export class CountDocumentsOperation extends AggregateOperation<number> {
super(collection.s.namespace, pipeline, options);
}

override executeCallback(
server: Server,
session: ClientSession | undefined,
callback: Callback<number>
): void {
super.executeCallback(server, session, (err, result) => {
if (err || !result) {
callback(err);
return;
}

// NOTE: We're avoiding creating a cursor here to reduce the callstack.
const response = result as unknown as Document;
if (response.cursor == null || response.cursor.firstBatch == null) {
callback(undefined, 0);
return;
}

const docs = response.cursor.firstBatch;
callback(undefined, docs.length ? docs[0].n : 0);
});
override async execute(server: Server, session: ClientSession | undefined): Promise<number> {
const result = await super.execute(server, session);

// NOTE: We're avoiding creating a cursor here to reduce the callstack.
const response = result as unknown as Document;
if (response.cursor == null || response.cursor.firstBatch == null) {
return 0;
}

const docs = response.cursor.firstBatch;
return docs.length ? docs[0].n : 0;
}
}
Loading

0 comments on commit 0c5c0b4

Please sign in to comment.