Skip to content

Commit

Permalink
fix(grpc): fix client/server span propagation (open-telemetry#325)
Browse files Browse the repository at this point in the history
* fix(grpc): fix client/server span propagation

* fix(test): uncomment grpc patch

* fix: linting, add missing unwrap

* docs(grpc): add supported versions to readme
  • Loading branch information
markwolff authored and mayurkale22 committed Sep 25, 2019
1 parent fa21bef commit 7d31077
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 43 deletions.
2 changes: 1 addition & 1 deletion packages/opentelemetry-plugin-grpc/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
[![devDependencies][devDependencies-image]][devDependencies-url]
[![Apache License][license-image]][license-image]

This module provides automatic instrumentation for [`grpc`](https://grpc.github.io/grpc/node/).
This module provides automatic instrumentation for [`grpc`](https://grpc.github.io/grpc/node/). Currently, version [`1.x`](https://www.npmjs.com/package/grpc?activeTab=versions) of the Node.js gRPC library is supported.

For automatic instrumentation see the
[@opentelemetry/node-sdk](https://github.com/open-telemetry/opentelemetry-js/tree/master/packages/opentelemetry-node-sdk) package.
Expand Down
2 changes: 2 additions & 0 deletions packages/opentelemetry-plugin-grpc/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,15 @@
"@types/mocha": "^5.2.7",
"@types/node": "^12.6.9",
"@types/shimmer": "^1.0.1",
"@types/sinon": "^7.0.13",
"codecov": "^3.5.0",
"grpc": "^1.23.3",
"gts": "^1.1.0",
"mocha": "^6.2.0",
"nyc": "^14.1.1",
"node-pre-gyp": "^0.12.0",
"rimraf": "^3.0.0",
"sinon": "^7.5.0",
"tslint-microsoft-contrib": "^6.2.0",
"tslint-consistent-codestyle": "^1.15.1",
"ts-mocha": "^6.0.0",
Expand Down
93 changes: 54 additions & 39 deletions packages/opentelemetry-plugin-grpc/src/grpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import {
ServerCallWithMeta,
SendUnaryDataCallback,
GrpcClientFunc,
GrpcInternalClientTypes,
} from './types';
import {
findIndex,
Expand All @@ -39,17 +40,18 @@ import {
} from './utils';

import * as events from 'events';
import * as grpcModule from 'grpc';
import * as grpcTypes from 'grpc';
import * as shimmer from 'shimmer';
import * as path from 'path';

/** The metadata key under which span context is stored as a binary value. */
export const GRPC_TRACE_KEY = 'grpc-trace-bin';

let grpcClientModule: object;
let grpcClientModule: GrpcInternalClientTypes;

export class GrpcPlugin extends BasePlugin<grpc> {
static readonly component = 'grpc';
readonly supportedVersions = ['^1.23.3'];

protected _config!: GrpcPluginOptions;

Expand All @@ -64,7 +66,7 @@ export class GrpcPlugin extends BasePlugin<grpc> {
};
protected readonly _basedir = basedir;

protected patch(): typeof grpcModule {
protected patch(): typeof grpcTypes {
this._logger.debug(
'applying patch to %s@%s',
this.moduleName,
Expand All @@ -80,12 +82,24 @@ export class GrpcPlugin extends BasePlugin<grpc> {
);
}

// Wrap the externally exported client constructor
if (this._moduleExports.makeGenericClientConstructor) {
shimmer.wrap(
this._moduleExports,
'makeGenericClientConstructor',
this._patchClient()
);
}

if (this._internalFilesExports['client']) {
grpcClientModule = this._internalFilesExports['client'] as object;
grpcClientModule = this._internalFilesExports[
'client'
] as GrpcInternalClientTypes;

// Wrap the internally used client constructor
shimmer.wrap(
grpcClientModule,
'makeClientConstructor' as never,
'makeClientConstructor',
this._patchClient()
);
}
Expand All @@ -103,12 +117,16 @@ export class GrpcPlugin extends BasePlugin<grpc> {
shimmer.unwrap(this._moduleExports.Server.prototype, 'register');
}

if (this._moduleExports.makeGenericClientConstructor) {
shimmer.unwrap(this._moduleExports, 'makeGenericClientConstructor');
}

if (grpcClientModule) {
shimmer.unwrap(grpcClientModule, 'makeClientConstructor' as never);
shimmer.unwrap(grpcClientModule, 'makeClientConstructor');
}
}

private _getSpanContext(metadata: grpcModule.Metadata): SpanContext | null {
private _getSpanContext(metadata: grpcTypes.Metadata): SpanContext | null {
const metadataValue = metadata.getMap()[GRPC_TRACE_KEY] as Buffer;
// Entry doesn't exist
if (!metadataValue) {
Expand All @@ -118,7 +136,7 @@ export class GrpcPlugin extends BasePlugin<grpc> {
}

private _setSpanContext(
metadata: grpcModule.Metadata,
metadata: grpcTypes.Metadata,
spanContext: SpanContext
): void {
const serializedSpanContext = this._tracer
Expand All @@ -129,17 +147,17 @@ export class GrpcPlugin extends BasePlugin<grpc> {
}

private _patchServer() {
return (originalRegister: typeof grpcModule.Server.prototype.register) => {
return (originalRegister: typeof grpcTypes.Server.prototype.register) => {
const plugin = this;
plugin._logger.debug('patched gRPC server');

return function register<RequestType, ResponseType>(
// tslint:disable-next-line:no-any
this: grpcModule.Server & { handlers: any },
this: grpcTypes.Server & { handlers: any },
name: string,
handler: grpcModule.handleCall<RequestType, ResponseType>,
serialize: grpcModule.serialize<RequestType>,
deserialize: grpcModule.deserialize<RequestType>,
handler: grpcTypes.handleCall<RequestType, ResponseType>,
serialize: grpcTypes.serialize<RequestType>,
deserialize: grpcTypes.deserialize<RequestType>,
type: string
) {
// tslint:disable-next-line:no-any
Expand All @@ -149,7 +167,7 @@ export class GrpcPlugin extends BasePlugin<grpc> {
shimmer.wrap(
handlerSet,
'func',
(originalFunc: grpcModule.handleCall<RequestType, ResponseType>) => {
(originalFunc: grpcTypes.handleCall<RequestType, ResponseType>) => {
return function func(
this: typeof handlerSet,
call: ServerCallWithMeta,
Expand Down Expand Up @@ -216,16 +234,16 @@ export class GrpcPlugin extends BasePlugin<grpc> {
call: ServerCallWithMeta,
callback: SendUnaryDataCallback,
original:
| grpcModule.handleCall<RequestType, ResponseType>
| grpcModule.ClientReadableStream<RequestType>,
| grpcTypes.handleCall<RequestType, ResponseType>
| grpcTypes.ClientReadableStream<RequestType>,
self: {}
) {
function patchedCallback(
err: grpcModule.ServiceError,
err: grpcTypes.ServiceError,
// tslint:disable-next-line:no-any
value: any,
trailer: grpcModule.Metadata,
flags: grpcModule.writeFlags
trailer: grpcTypes.Metadata,
flags: grpcTypes.writeFlags
) {
if (err) {
if (err.code) {
Expand All @@ -246,7 +264,7 @@ export class GrpcPlugin extends BasePlugin<grpc> {
span.setStatus({ code: CanonicalCode.OK });
span.setAttribute(
AttributeNames.GRPC_STATUS_CODE,
grpcModule.status.OK.toString()
plugin._moduleExports.status.OK.toString()
);
}
span.addEvent('received');
Expand All @@ -264,7 +282,7 @@ export class GrpcPlugin extends BasePlugin<grpc> {
plugin: GrpcPlugin,
span: Span,
call: ServerCallWithMeta,
original: grpcModule.handleCall<RequestType, ResponseType>,
original: grpcTypes.handleCall<RequestType, ResponseType>,
self: {}
) {
let spanEnded = false;
Expand All @@ -290,7 +308,7 @@ export class GrpcPlugin extends BasePlugin<grpc> {
}
});

call.on('error', (err: grpcModule.ServiceError) => {
call.on('error', (err: grpcTypes.ServiceError) => {
span.addEvent('finished with error');
span.setAttributes({
[AttributeNames.GRPC_ERROR_NAME]: err.name,
Expand All @@ -305,15 +323,13 @@ export class GrpcPlugin extends BasePlugin<grpc> {

private _patchClient() {
const plugin = this;
return (
original: typeof grpcModule.makeGenericClientConstructor
): never => {
return (original: typeof grpcTypes.makeGenericClientConstructor): never => {
plugin._logger.debug('patching client');
return function makeClientConstructor<ImplementationType>(
this: typeof grpcModule.Client,
methods: grpcModule.ServiceDefinition<ImplementationType>,
this: typeof grpcTypes.Client,
methods: grpcTypes.ServiceDefinition<ImplementationType>,
serviceName: string,
options: grpcModule.GenericClientOptions
options: grpcTypes.GenericClientOptions
) {
// tslint:disable-next-line:no-any
const client = original.apply(this, arguments as any);
Expand All @@ -332,7 +348,7 @@ export class GrpcPlugin extends BasePlugin<grpc> {
const plugin = this;
return (original: GrpcClientFunc) => {
plugin._logger.debug('patch all client methods');
return function clientMethodTrace(this: grpcModule.Client) {
return function clientMethodTrace(this: grpcTypes.Client) {
const name = `grpc.${original.path.replace('/', '')}`;
const args = Array.prototype.slice.call(arguments);
const currentSpan = plugin._tracer.getCurrentSpan();
Expand All @@ -356,7 +372,7 @@ export class GrpcPlugin extends BasePlugin<grpc> {
original: GrpcClientFunc,
// tslint:disable-next-line:no-any
args: any[],
self: grpcModule.Client,
self: grpcTypes.Client,
plugin: GrpcPlugin
) {
/**
Expand All @@ -366,10 +382,10 @@ export class GrpcPlugin extends BasePlugin<grpc> {
function patchedCallback(
span: Span,
callback: SendUnaryDataCallback,
metadata: grpcModule.Metadata
metadata: grpcTypes.Metadata
) {
// tslint:disable-next-line:no-any
const wrappedFn = (err: grpcModule.ServiceError, res: any) => {
const wrappedFn = (err: grpcTypes.ServiceError, res: any) => {
if (err) {
if (err.code) {
span.setStatus(_grpcStatusCodeToSpanStatus(err.code));
Expand All @@ -386,7 +402,7 @@ export class GrpcPlugin extends BasePlugin<grpc> {
span.setStatus({ code: CanonicalCode.OK });
span.setAttribute(
AttributeNames.GRPC_STATUS_CODE,
grpcModule.status.OK.toString()
plugin._moduleExports.status.OK.toString()
);
}

Expand Down Expand Up @@ -439,7 +455,7 @@ export class GrpcPlugin extends BasePlugin<grpc> {
plugin._tracer.bind(call);
((call as unknown) as events.EventEmitter).on(
'error',
(err: grpcModule.ServiceError) => {
(err: grpcTypes.ServiceError) => {
span.setStatus({
code: _grpcStatusCodeToCanonicalCode(err.code),
message: err.message,
Expand Down Expand Up @@ -472,8 +488,8 @@ export class GrpcPlugin extends BasePlugin<grpc> {
original: GrpcClientFunc,
// tslint:disable-next-line:no-any
args: any[]
): grpcModule.Metadata {
let metadata: grpcModule.Metadata;
): grpcTypes.Metadata {
let metadata: grpcTypes.Metadata;

// This finds an instance of Metadata among the arguments.
// A possible issue that could occur is if the 'options' parameter from
Expand All @@ -489,7 +505,7 @@ export class GrpcPlugin extends BasePlugin<grpc> {
);
});
if (metadataIndex === -1) {
metadata = new grpcModule.Metadata();
metadata = new this._moduleExports.Metadata();
if (!original.requestStream) {
// unary or server stream
if (args.length === 0) {
Expand All @@ -516,5 +532,4 @@ export class GrpcPlugin extends BasePlugin<grpc> {

const basedir = path.dirname(require.resolve('grpc'));
const version = require(path.join(basedir, 'package.json')).version;
const plugin = new GrpcPlugin(GrpcPlugin.component, version);
export { plugin };
export const plugin = new GrpcPlugin(GrpcPlugin.component, version);
4 changes: 4 additions & 0 deletions packages/opentelemetry-plugin-grpc/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ export type GrpcClientFunc = typeof Function & {
responseStream: boolean;
};

export type GrpcInternalClientTypes = {
makeClientConstructor: typeof grpcModule.makeGenericClientConstructor;
};

// TODO: Delete if moving internal file loaders to BasePlugin
/**
* Maps a name (key) representing a internal file module and its exports
Expand Down
4 changes: 2 additions & 2 deletions packages/opentelemetry-plugin-grpc/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/

import { CanonicalCode, Status } from '@opentelemetry/types';
import * as grpcModule from 'grpc'; // For types only
import * as grpcTypes from 'grpc'; // For types only

// Equivalent to lodash _.findIndex
export const findIndex: <T>(args: T[], fn: (arg: T) => boolean) => number = (
Expand All @@ -37,7 +37,7 @@ export const findIndex: <T>(args: T[], fn: (arg: T) => boolean) => number = (
* @param status
*/
export const _grpcStatusCodeToCanonicalCode = (
status?: grpcModule.status
status?: grpcTypes.status
): CanonicalCode => {
if (status !== 0 && !status) {
return CanonicalCode.UNKNOWN;
Expand Down
20 changes: 19 additions & 1 deletion packages/opentelemetry-plugin-grpc/test/grpc.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

import { NoopLogger } from '@opentelemetry/core';
import { NoopLogger, NoopTracer } from '@opentelemetry/core';
import {
InMemorySpanExporter,
SimpleSpanProcessor,
Expand All @@ -29,6 +29,7 @@ import { SendUnaryDataCallback } from '../src/types';
import * as assert from 'assert';
import * as semver from 'semver';
import * as grpc from 'grpc';
import * as sinon from 'sinon';

const PROTO_PATH = __dirname + '/fixtures/grpc-test.proto';
const memoryExporter = new InMemorySpanExporter();
Expand Down Expand Up @@ -286,6 +287,23 @@ describe('GrpcPlugin', () => {
assert.deepStrictEqual('grpc', plugin.moduleName);
});

describe('should patch client constructor makeClientConstructor() and makeGenericClientConstructor()', () => {
const clientPatchStub = sinon.stub(
plugin,
'_getPatchedClientMethods' as never
);
after(() => {
clientPatchStub.restore();
plugin.disable();
});

it('should patch client constructor makeClientConstructor() and makeGenericClientConstructor()', () => {
plugin.enable(grpc, new NoopTracer(), new NoopLogger());
(plugin['_moduleExports'] as any).makeGenericClientConstructor({});
assert.strictEqual(clientPatchStub.callCount, 1);
});
});

const requestList: TestRequestResponse[] = [{ num: 100 }, { num: 50 }];
const resultSum = {
num: requestList.reduce((sum, x) => {
Expand Down

0 comments on commit 7d31077

Please sign in to comment.