Skip to content

Commit

Permalink
fix: update retry policy to not retry streams that have not made prog…
Browse files Browse the repository at this point in the history
…ress (#1946)

fix: update retry policy to not retry streams with retryable error that have not made progress receiving documents
  • Loading branch information
MarkDuckworth authored Nov 17, 2023
1 parent f83a995 commit 170d05b
Show file tree
Hide file tree
Showing 5 changed files with 367 additions and 52 deletions.
2 changes: 1 addition & 1 deletion dev/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1637,7 +1637,7 @@ export class Firestore implements firestore.Firestore {
function streamReady(): void {
if (!streamInitialized) {
streamInitialized = true;
logger('Firestore._initializeStream', requestTag, 'Releasing stream');
logger('Firestore._initializeStream', requestTag, 'Stream ready');
resolve(resultStream);
}
}
Expand Down
122 changes: 74 additions & 48 deletions dev/src/reference.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import {defaultConverter} from './types';
import {
autoId,
Deferred,
getTotalTimeout,
isPermanentRpcError,
mapToArray,
requestTag,
Expand Down Expand Up @@ -2569,6 +2570,15 @@ export class Query<
return isPermanentRpcError(err, methodName);
}

_hasRetryTimedOut(methodName: string, startTime: number): boolean {
const totalTimeout = getTotalTimeout(methodName);
if (totalTimeout === 0) {
return false;
}

return Date.now() - startTime >= totalTimeout;
}

/**
* Internal streaming method that accepts an optional transaction ID.
*
Expand All @@ -2579,6 +2589,7 @@ export class Query<
*/
_stream(transactionId?: Uint8Array): NodeJS.ReadableStream {
const tag = requestTag();
const startTime = Date.now();

let lastReceivedDocument: QueryDocumentSnapshot<
AppModelType,
Expand Down Expand Up @@ -2638,8 +2649,9 @@ export class Query<
let streamActive: Deferred<boolean>;
do {
streamActive = new Deferred<boolean>();
const methodName = 'runQuery';
backendStream = await this._firestore.requestStream(
'runQuery',
methodName,
/* bidirectional= */ false,
request,
tag
Expand All @@ -2656,12 +2668,28 @@ export class Query<
'Query failed with retryable stream error:',
err
);
// Enqueue a "no-op" write into the stream and resume the query
// once it is processed. This allows any enqueued results to be
// consumed before resuming the query so that the query resumption
// can start at the correct document.

// Enqueue a "no-op" write into the stream and wait for it to be
// read by the downstream consumer. This ensures that all enqueued
// results in the stream are consumed, which will give us an accurate
// value for `lastReceivedDocument`.
stream.write(NOOP_MESSAGE, () => {
if (lastReceivedDocument) {
if (this._hasRetryTimedOut(methodName, startTime)) {
logger(
'Query._stream',
tag,
'Query failed with retryable stream error but the total retry timeout has exceeded.'
);
stream.destroy(err);
streamActive.resolve(/* active= */ false);
} else if (lastReceivedDocument) {
logger(
'Query._stream',
tag,
'Query failed with retryable stream error and progress was made receiving ' +
'documents, so the stream is being retried.'
);

// Restart the query but use the last document we received as
// the query cursor. Note that we do not use backoff here. The
// call to `requestStream()` will backoff should the restart
Expand All @@ -2673,8 +2701,21 @@ export class Query<
} else {
request = this.startAfter(lastReceivedDocument).toProto();
}

// Set lastReceivedDocument to null before each retry attempt to ensure the retry makes progress
lastReceivedDocument = null;

streamActive.resolve(/* active= */ true);
} else {
logger(
'Query._stream',
tag,
'Query failed with retryable stream error however no progress was made receiving ' +
'documents, so the stream is being closed.'
);
stream.destroy(err);
streamActive.resolve(/* active= */ false);
}
streamActive.resolve(/* active= */ true);
});
} else {
logger(
Expand Down Expand Up @@ -3320,48 +3361,33 @@ export class AggregateQuery<
// catch below.
const request = this.toProto(transactionId);

let streamActive: Deferred<boolean>;
do {
streamActive = new Deferred<boolean>();
const backendStream = await firestore.requestStream(
'runAggregationQuery',
/* bidirectional= */ false,
request,
tag
);
stream.on('close', () => {
backendStream.resume();
backendStream.end();
});
backendStream.on('error', err => {
backendStream.unpipe(stream);
// If a non-transactional query failed, attempt to restart.
// Transactional queries are retried via the transaction runner.
if (
!transactionId &&
!isPermanentRpcError(err, 'runAggregationQuery')
) {
logger(
'AggregateQuery._stream',
tag,
'AggregateQuery failed with retryable stream error:',
err
);
streamActive.resolve(/* active= */ true);
} else {
logger(
'AggregateQuery._stream',
tag,
'AggregateQuery failed with stream error:',
err
);
stream.destroy(err);
streamActive.resolve(/* active= */ false);
}
});
const backendStream = await firestore.requestStream(
'runAggregationQuery',
/* bidirectional= */ false,
request,
tag
);
stream.on('close', () => {
backendStream.resume();
backendStream.pipe(stream);
} while (await streamActive.promise);
backendStream.end();
});
backendStream.on('error', err => {
// TODO(group-by) When group-by queries are supported for aggregates
// consider implementing retries if the stream is making progress
// receiving results for groups. See the use of lastReceivedDocument
// in the retry strategy for runQuery.

backendStream.unpipe(stream);
logger(
'AggregateQuery._stream',
tag,
'AggregateQuery failed with stream error:',
err
);
stream.destroy(err);
});
backendStream.resume();
backendStream.pipe(stream);
})
.catch(e => stream.destroy(e));

Expand Down
15 changes: 15 additions & 0 deletions dev/src/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,21 @@ export function getRetryCodes(methodName: string): number[] {
return getServiceConfig(methodName)?.retry?.retryCodes ?? [];
}

/**
* Gets the total timeout in milliseconds from the retry settings in
* the service config for the given RPC. If the total timeout is not
* set, then `0` is returned.
*
* @private
* @internal
*/
export function getTotalTimeout(methodName: string): number {
return (
getServiceConfig(methodName)?.retry?.backoffSettings?.totalTimeoutMillis ??
0
);
}

/**
* Returns the backoff setting from the service configuration.
* @private
Expand Down
41 changes: 39 additions & 2 deletions dev/test/aggregateQuery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -126,19 +126,31 @@ describe('aggregate query interface', () => {
});
});

it('handles stream exception at initialization', () => {
it('handles stream exception at initialization', async () => {
let attempts = 0;
const query = firestore.collection('collectionId').count();

query._stream = () => {
++attempts;
throw new Error('Expected error');
};

return expect(query.get()).to.eventually.rejectedWith('Expected error');
await query
.get()
.then(() => {
throw new Error('Unexpected success in Promise');
})
.catch(err => {
expect(err.message).to.equal('Expected error');
expect(attempts).to.equal(1);
});
});

it('handles stream exception during initialization', async () => {
let attempts = 0;
const overrides: ApiOverride = {
runAggregationQuery: () => {
++attempts;
return stream(new Error('Expected error'));
},
};
Expand All @@ -152,6 +164,31 @@ describe('aggregate query interface', () => {
})
.catch(err => {
expect(err.message).to.equal('Expected error');
expect(attempts).to.equal(5);
});
});

it('handles message without result during initialization', async () => {
let attempts = 0;
const overrides: ApiOverride = {
runAggregationQuery: () => {
++attempts;
return stream({readTime: {seconds: 5, nanos: 6}});
},
};
firestore = await createInstance(overrides);

const query = firestore.collection('collectionId').count();
await query
.get()
.then(() => {
throw new Error('Unexpected success in Promise');
})
.catch(err => {
expect(err.message).to.equal(
'RunAggregationQueryResponse is missing result'
);
expect(attempts).to.equal(1);
});
});
});
Loading

0 comments on commit 170d05b

Please sign in to comment.