diff --git a/packages/opentelemetry-instrumentation-grpc/README.md b/packages/opentelemetry-instrumentation-grpc/README.md index 503241a501..269064a18e 100644 --- a/packages/opentelemetry-instrumentation-grpc/README.md +++ b/packages/opentelemetry-instrumentation-grpc/README.md @@ -5,7 +5,7 @@ [![devDependencies][devDependencies-image]][devDependencies-url] [![Apache License][license-image]][license-image] -This module provides automatic instrumentation for [`grpc`](https://grpc.github.io/grpc/node/). Currently, version [`1.x`](https://www.npmjs.com/package/grpc?activeTab=versions) of the Node.js gRPC library is supported. +This module provides automatic instrumentation for [`grpc`](https://grpc.github.io/grpc/node/) and [`@grpc/grpc-js`](https://grpc.io/blog/grpc-js-1.0/). Currently, version [`1.x`](https://www.npmjs.com/package/grpc?activeTab=versions) of `grpc` and version [`1.x`](https://www.npmjs.com/package/@grpc/grpc-js?activeTab=versions) of `@grpc/grpc-js` is supported. For automatic instrumentation see the [@opentelemetry/node](https://github.com/open-telemetry/opentelemetry-js/tree/main/packages/opentelemetry-node) package. @@ -18,7 +18,7 @@ npm install --save @opentelemetry/instrumentation-grpc ## Usage -OpenTelemetry gRPC Instrumentation allows the user to automatically collect trace data and export them to the backend of choice, to give observability to distributed systems when working with [gRPC](https://www.npmjs.com/package/grpc). +OpenTelemetry gRPC Instrumentation allows the user to automatically collect trace data and export them to the backend of choice, to give observability to distributed systems when working with [gRPC](https://www.npmjs.com/package/grpc) or ([grpc-js](https://www.npmjs.com/package/@grpc/grpc-js)). To load a specific instrumentation (**gRPC** in this case), specify it in the Node Tracer's configuration. @@ -42,7 +42,7 @@ provider.addSpanProcessor(new SimpleSpanProcessor(new ConsoleSpanExporter())); provider.register(); ``` -See [examples/grpc](https://github.com/open-telemetry/opentelemetry-js/tree/main/examples/grpc) for a short example. +See [examples/grpc](https://github.com/open-telemetry/opentelemetry-js/tree/main/examples/grpc) or [examples/grpc-js](https://github.com/open-telemetry/opentelemetry-js/tree/main/examples/grpc-js) for examples. ### gRPC Instrumentation Options diff --git a/packages/opentelemetry-instrumentation-grpc/package.json b/packages/opentelemetry-instrumentation-grpc/package.json index a54f21917b..10efa653eb 100644 --- a/packages/opentelemetry-instrumentation-grpc/package.json +++ b/packages/opentelemetry-instrumentation-grpc/package.json @@ -69,6 +69,7 @@ "dependencies": { "@opentelemetry/api": "^0.16.0", "@opentelemetry/instrumentation": "^0.16.0", - "@opentelemetry/semantic-conventions": "^0.16.0" + "@opentelemetry/semantic-conventions": "^0.16.0", + "@opentelemetry/api-metrics": "^0.16.0" } } diff --git a/packages/opentelemetry-instrumentation-grpc/src/grpc-js/clientUtils.ts b/packages/opentelemetry-instrumentation-grpc/src/grpc-js/clientUtils.ts new file mode 100644 index 0000000000..645149c051 --- /dev/null +++ b/packages/opentelemetry-instrumentation-grpc/src/grpc-js/clientUtils.ts @@ -0,0 +1,227 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { GrpcJsInstrumentation } from './'; +import type { GrpcClientFunc, SendUnaryDataCallback } from './types'; +import { + SpanKind, + Span, + SpanStatusCode, + SpanStatus, + propagation, + context, +} from '@opentelemetry/api'; +import { RpcAttribute } from '@opentelemetry/semantic-conventions'; +import type * as grpcJs from '@grpc/grpc-js'; +import { + _grpcStatusCodeToSpanStatus, + _grpcStatusCodeToOpenTelemetryStatusCode, + _methodIsIgnored, +} from '../utils'; +import { CALL_SPAN_ENDED } from './serverUtils'; +import { EventEmitter } from 'events'; + +/** + * Parse a package method list and return a list of methods to patch + * with both possible casings e.g. "TestMethod" & "testMethod" + */ +export function getMethodsToWrap( + this: GrpcJsInstrumentation, + client: typeof grpcJs.Client, + methods: { [key: string]: { originalName?: string } } +): string[] { + const methodList: string[] = []; + + // For a method defined in .proto as "UnaryMethod" + Object.entries(methods).forEach(([name, { originalName }]) => { + if (!_methodIsIgnored(name, this._config.ignoreGrpcMethods)) { + methodList.push(name); // adds camel case method name: "unaryMethod" + if ( + originalName && + // eslint-disable-next-line no-prototype-builtins + client.prototype.hasOwnProperty(originalName) && + name !== originalName // do not add duplicates + ) { + // adds original method name: "UnaryMethod", + methodList.push(originalName); + } + } + }); + + return methodList; +} + +/** + * Execute grpc client call. Apply completitionspan properties and end the + * span on callback or receiving an emitted event. + */ +export function makeGrpcClientRemoteCall( + original: GrpcClientFunc, + args: unknown[], + metadata: grpcJs.Metadata, + self: grpcJs.Client +): (span: Span) => EventEmitter { + /** + * Patches a callback so that the current span for this trace is also ended + * when the callback is invoked. + */ + function patchedCallback( + span: Span, + callback: SendUnaryDataCallback + ) { + const wrappedFn: SendUnaryDataCallback = ( + err: grpcJs.ServiceError | null, + res: any + ) => { + if (err) { + if (err.code) { + span.setStatus(_grpcStatusCodeToSpanStatus(err.code)); + span.setAttribute(RpcAttribute.GRPC_STATUS_CODE, err.code.toString()); + } + span.setAttributes({ + [RpcAttribute.GRPC_ERROR_NAME]: err.name, + [RpcAttribute.GRPC_ERROR_MESSAGE]: err.message, + }); + } else { + span.setStatus({ code: SpanStatusCode.UNSET }); + span.setAttribute( + RpcAttribute.GRPC_STATUS_CODE, + SpanStatusCode.UNSET.toString() + ); + } + + span.end(); + callback(err, res); + }; + return context.bind(wrappedFn); + } + + return (span: Span) => { + // if unary or clientStream + if (!original.responseStream) { + const callbackFuncIndex = args.findIndex(arg => { + return typeof arg === 'function'; + }); + if (callbackFuncIndex !== -1) { + args[callbackFuncIndex] = patchedCallback( + span, + args[callbackFuncIndex] as SendUnaryDataCallback + ); + } + } + + span.setAttributes({ + [RpcAttribute.GRPC_METHOD]: original.path, + [RpcAttribute.GRPC_KIND]: SpanKind.CLIENT, + }); + + setSpanContext(metadata); + const call = original.apply(self, args); + + // if server stream or bidi + if (original.responseStream) { + // Both error and status events can be emitted + // the first one emitted set spanEnded to true + let spanEnded = false; + const endSpan = () => { + if (!spanEnded) { + span.end(); + spanEnded = true; + } + }; + context.bind(call); + call.on('error', (err: grpcJs.ServiceError) => { + if (call[CALL_SPAN_ENDED]) { + return; + } + call[CALL_SPAN_ENDED] = true; + + span.setStatus({ + code: _grpcStatusCodeToOpenTelemetryStatusCode(err.code), + message: err.message, + }); + span.setAttributes({ + [RpcAttribute.GRPC_ERROR_NAME]: err.name, + [RpcAttribute.GRPC_ERROR_MESSAGE]: err.message, + }); + + endSpan(); + }); + + call.on('status', (status: SpanStatus) => { + if (call[CALL_SPAN_ENDED]) { + return; + } + call[CALL_SPAN_ENDED] = true; + + span.setStatus(_grpcStatusCodeToSpanStatus(status.code)); + + endSpan(); + }); + } + return call; + }; +} + +/** + * Returns the metadata argument from user provided arguments (`args`) + */ +export function getMetadata( + this: GrpcJsInstrumentation, + grpcClient: typeof grpcJs, + original: GrpcClientFunc, + args: Array +): grpcJs.Metadata { + let metadata: grpcJs.Metadata; + + // This finds an instance of Metadata among the arguments. + // A possible issue that could occur is if the 'options' parameter from + // the user contains an '_internal_repr' as well as a 'getMap' function, + // but this is an extremely rare case. + let metadataIndex = args.findIndex((arg: unknown | grpcJs.Metadata) => { + return ( + arg && + typeof arg === 'object' && + (arg as grpcJs.Metadata)['internalRepr'] && // changed from _internal_repr in grpc --> @grpc/grpc-js https://github.com/grpc/grpc-node/blob/95289edcaf36979cccf12797cc27335da8d01f03/packages/grpc-js/src/metadata.ts#L88 + typeof (arg as grpcJs.Metadata).getMap === 'function' + ); + }); + if (metadataIndex === -1) { + metadata = new grpcClient.Metadata(); + if (!original.requestStream) { + // unary or server stream + metadataIndex = 1; + } else { + // client stream or bidi + metadataIndex = 0; + } + args.splice(metadataIndex, 0, metadata); + } else { + metadata = args[metadataIndex] as grpcJs.Metadata; + } + return metadata; +} + +/** + * Inject opentelemetry trace context into `metadata` for use by another + * grpc receiver + * @param metadata + */ +export function setSpanContext(metadata: grpcJs.Metadata): void { + propagation.inject(context.active(), metadata, { + set: (metadata, k, v) => metadata.set(k, v as grpcJs.MetadataValue), + }); +} diff --git a/packages/opentelemetry-instrumentation-grpc/src/grpc-js/index.ts b/packages/opentelemetry-instrumentation-grpc/src/grpc-js/index.ts new file mode 100644 index 0000000000..9729c9c024 --- /dev/null +++ b/packages/opentelemetry-instrumentation-grpc/src/grpc-js/index.ts @@ -0,0 +1,331 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import type * as grpcJs from '@grpc/grpc-js'; +import { + InstrumentationNodeModuleDefinition, + isWrapped, +} from '@opentelemetry/instrumentation'; +import { + InstrumentationBase, + InstrumentationConfig, +} from '@opentelemetry/instrumentation'; +import { GrpcInstrumentationConfig } from '../types'; +import { + ServerCallWithMeta, + SendUnaryDataCallback, + ServerRegisterFunction, + HandleCall, + MakeClientConstructorFunction, + PackageDefinition, + GrpcClientFunc, +} from './types'; +import { + context, + SpanOptions, + SpanKind, + propagation, + ROOT_CONTEXT, + setSpan, +} from '@opentelemetry/api'; +import { RpcAttribute } from '@opentelemetry/semantic-conventions'; +import { + shouldNotTraceServerCall, + handleServerFunction, + handleUntracedServerFunction, +} from './serverUtils'; +import { + getMethodsToWrap, + makeGrpcClientRemoteCall, + getMetadata, +} from './clientUtils'; +import { EventEmitter } from 'events'; + +export class GrpcJsInstrumentation extends InstrumentationBase { + constructor( + protected _config: GrpcInstrumentationConfig & InstrumentationConfig = {}, + name: string, + version: string + ) { + super(name, version, _config); + } + + public setConfig( + config: GrpcInstrumentationConfig & InstrumentationConfig = {} + ) { + this._config = Object.assign({}, config); + } + + init() { + return [ + new InstrumentationNodeModuleDefinition( + '@grpc/grpc-js', + ['1.*'], + (moduleExports, version) => { + this._logger.debug(`Applying patch for @grpc/grpc-js@${version}`); + if (isWrapped(moduleExports.Server.prototype.register)) { + this._unwrap(moduleExports.Server.prototype, 'register'); + } + // Patch Server methods + this._wrap( + moduleExports.Server.prototype, + 'register', + this._patchServer() as any + ); + // Patch Client methods + if (isWrapped(moduleExports.makeGenericClientConstructor)) { + this._unwrap(moduleExports, 'makeGenericClientConstructor'); + } + this._wrap( + moduleExports, + 'makeGenericClientConstructor', + this._patchClient(moduleExports) + ); + if (isWrapped(moduleExports.makeClientConstructor)) { + this._unwrap(moduleExports, 'makeClientConstructor'); + } + this._wrap( + moduleExports, + 'makeClientConstructor', + this._patchClient(moduleExports) + ); + if (isWrapped(moduleExports.loadPackageDefinition)) { + this._unwrap(moduleExports, 'loadPackageDefinition'); + } + this._wrap( + moduleExports, + 'loadPackageDefinition', + this._patchLoadPackageDefinition(moduleExports) + ); + return moduleExports; + }, + (moduleExports, version) => { + if (moduleExports === undefined) return; + this._logger.debug(`Removing patch for @grpc/grpc-js@${version}`); + + this._unwrap(moduleExports.Server.prototype, 'register'); + this._unwrap(moduleExports, 'makeClientConstructor'); + this._unwrap(moduleExports, 'makeGenericClientConstructor'); + this._unwrap(moduleExports, 'loadPackageDefinition'); + } + ), + ]; + } + + /** + * Patch for grpc.Server.prototype.register(...) function. Provides auto-instrumentation for + * client_stream, server_stream, bidi, unary server handler calls. + */ + private _patchServer(): ( + originalRegister: ServerRegisterFunction + ) => ServerRegisterFunction { + const instrumentation = this; + return (originalRegister: ServerRegisterFunction) => { + const config = this._config; + instrumentation._logger.debug('patched gRPC server'); + return function register( + this: grpcJs.Server, + name: string, + handler: HandleCall, + serialize: grpcJs.serialize, + deserialize: grpcJs.deserialize, + type: string + ): boolean { + const originalRegisterResult = originalRegister.call( + this, + name, + handler, + serialize, + deserialize, + type + ); + const handlerSet = this['handlers'].get(name); + + instrumentation._wrap( + handlerSet, + 'func', + (originalFunc: HandleCall) => { + return function func( + this: typeof handlerSet, + call: ServerCallWithMeta, + callback: SendUnaryDataCallback + ) { + const self = this; + + if ( + shouldNotTraceServerCall( + call.metadata, + name, + config.ignoreGrpcMethods + ) + ) { + return handleUntracedServerFunction( + type, + originalFunc, + call, + callback + ); + } + + const spanName = `grpc.${name.replace('/', '')}`; + const spanOptions: SpanOptions = { + kind: SpanKind.SERVER, + }; + + instrumentation._logger.debug( + 'patch func: %s', + JSON.stringify(spanOptions) + ); + + context.with( + propagation.extract(ROOT_CONTEXT, call.metadata, { + get: (carrier, key) => carrier.get(key).map(String), + keys: carrier => Object.keys(carrier.getMap()), + }), + () => { + const span = instrumentation.tracer + .startSpan(spanName, spanOptions) + .setAttributes({ + [RpcAttribute.GRPC_KIND]: spanOptions.kind, + }); + + context.with(setSpan(context.active(), span), () => { + handleServerFunction.call( + self, + span, + type, + originalFunc, + call, + callback + ); + }); + } + ); + }; + } + ); + return originalRegisterResult; + } as typeof grpcJs.Server.prototype.register; + }; + } + + /** + * Entry point for applying client patches to `grpc.makeClientConstructor(...)` equivalents + * @param this GrpcJsPlugin + */ + private _patchClient( + grpcClient: typeof grpcJs + ): ( + original: MakeClientConstructorFunction + ) => MakeClientConstructorFunction { + const instrumentation = this; + return (original: MakeClientConstructorFunction) => { + instrumentation._logger.debug('patching client'); + return function makeClientConstructor( + this: typeof grpcJs.Client, + methods: grpcJs.ServiceDefinition, + serviceName: string, + options?: object + ) { + const client = original.call(this, methods, serviceName, options); + instrumentation._massWrap( + client.prototype, + getMethodsToWrap.call(instrumentation, client, methods), + instrumentation._getPatchedClientMethods(grpcClient) + ); + return client; + }; + }; + } + + /** + * Entry point for client patching for grpc.loadPackageDefinition(...) + * @param this - GrpcJsPlugin + */ + private _patchLoadPackageDefinition(grpcClient: typeof grpcJs) { + const instrumentation = this; + instrumentation._logger.debug('patching loadPackageDefinition'); + return (original: typeof grpcJs.loadPackageDefinition) => { + return function patchedLoadPackageDefinition( + this: null, + packageDef: PackageDefinition + ) { + const result: grpcJs.GrpcObject = original.call( + this, + packageDef + ) as grpcJs.GrpcObject; + instrumentation._patchLoadedPackage(grpcClient, result); + return result; + } as typeof grpcJs.loadPackageDefinition; + }; + } + + /** + * Parse initial client call properties and start a span to trace its execution + */ + private _getPatchedClientMethods( + grpcClient: typeof grpcJs + ): (original: GrpcClientFunc) => () => EventEmitter { + const instrumentation = this; + return (original: GrpcClientFunc) => { + instrumentation._logger.debug('patch all client methods'); + return function clientMethodTrace(this: grpcJs.Client) { + const name = `grpc.${original.path.replace('/', '')}`; + const args = [...arguments]; + const metadata = getMetadata.call( + instrumentation, + grpcClient, + original, + args + ); + const span = instrumentation.tracer.startSpan(name, { + kind: SpanKind.CLIENT, + }); + return context.with(setSpan(context.active(), span), () => + makeGrpcClientRemoteCall(original, args, metadata, this)(span) + ); + }; + }; + } + + /** + * Utility function to patch *all* functions loaded through a proto file. + * Recursively searches for Client classes and patches all methods, reversing the + * parsing done by grpc.loadPackageDefinition + * https://github.com/grpc/grpc-node/blob/1d14203c382509c3f36132bd0244c99792cb6601/packages/grpc-js/src/make-client.ts#L200-L217 + */ + private _patchLoadedPackage( + grpcClient: typeof grpcJs, + result: grpcJs.GrpcObject + ): void { + Object.values(result).forEach(service => { + if (typeof service === 'function') { + this._massWrap( + service.prototype, + getMethodsToWrap.call(this, service, service.service), + this._getPatchedClientMethods.call(this, grpcClient) + ); + } else if (typeof service.format !== 'string') { + // GrpcObject + this._patchLoadedPackage.call( + this, + grpcClient, + service as grpcJs.GrpcObject + ); + } + }); + } +} diff --git a/packages/opentelemetry-instrumentation-grpc/src/grpc-js/serverUtils.ts b/packages/opentelemetry-instrumentation-grpc/src/grpc-js/serverUtils.ts new file mode 100644 index 0000000000..eac6238350 --- /dev/null +++ b/packages/opentelemetry-instrumentation-grpc/src/grpc-js/serverUtils.ts @@ -0,0 +1,221 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Symbol to include on grpc call if it has already emitted an error event. + * grpc events that emit 'error' will also emit 'finish' and so only the + * error event should be processed. + */ + +import { context, Span, SpanStatusCode } from '@opentelemetry/api'; +import { RpcAttribute } from '@opentelemetry/semantic-conventions'; +import type * as grpcJs from '@grpc/grpc-js'; +import type { + ServerCallWithMeta, + SendUnaryDataCallback, + GrpcEmitter, + HandleCall, +} from './types'; +import { + _grpcStatusCodeToOpenTelemetryStatusCode, + _methodIsIgnored, +} from '../utils'; +import { IgnoreMatcher } from '../types'; + +export const CALL_SPAN_ENDED = Symbol('opentelemetry call span ended'); + +/** + * Handle patching for serverStream and Bidi type server handlers + */ +function serverStreamAndBidiHandler( + span: Span, + call: GrpcEmitter, + original: + | grpcJs.handleBidiStreamingCall + | grpcJs.handleServerStreamingCall +): void { + let spanEnded = false; + const endSpan = () => { + if (!spanEnded) { + spanEnded = true; + span.end(); + } + }; + + context.bind(call); + call.on('finish', () => { + // @grpc/js does not expose a way to check if this call also emitted an error, + // e.g. call.status.code !== 0 + if (call[CALL_SPAN_ENDED]) { + return; + } + + // Set the "grpc call had an error" flag + call[CALL_SPAN_ENDED] = true; + + span.setStatus({ + code: SpanStatusCode.UNSET, + }); + span.setAttribute( + RpcAttribute.GRPC_STATUS_CODE, + SpanStatusCode.OK.toString() + ); + + endSpan(); + }); + + call.on('error', (err: grpcJs.ServiceError) => { + if (call[CALL_SPAN_ENDED]) { + return; + } + + // Set the "grpc call had an error" flag + call[CALL_SPAN_ENDED] = true; + + span.setStatus({ + code: _grpcStatusCodeToOpenTelemetryStatusCode(err.code), + message: err.message, + }); + span.setAttributes({ + [RpcAttribute.GRPC_ERROR_NAME]: err.name, + [RpcAttribute.GRPC_ERROR_MESSAGE]: err.message, + }); + endSpan(); + }); + + // Types of parameters 'call' and 'call' are incompatible. + return (original as Function).call({}, call); +} + +/** + * Handle patching for clientStream and unary type server handlers + */ +function clientStreamAndUnaryHandler( + span: Span, + call: ServerCallWithMeta, + callback: SendUnaryDataCallback, + original: + | grpcJs.handleUnaryCall + | grpcJs.ClientReadableStream +): void { + const patchedCallback: SendUnaryDataCallback = ( + err: grpcJs.ServiceError | null, + value?: ResponseType + ) => { + if (err) { + if (err.code) { + span.setStatus({ + code: _grpcStatusCodeToOpenTelemetryStatusCode(err.code), + message: err.message, + }); + span.setAttribute(RpcAttribute.GRPC_STATUS_CODE, err.code.toString()); + } + span.setAttributes({ + [RpcAttribute.GRPC_ERROR_NAME]: err.name, + [RpcAttribute.GRPC_ERROR_MESSAGE]: err.message, + }); + } else { + span.setStatus({ code: SpanStatusCode.UNSET }); + span.setAttribute( + RpcAttribute.GRPC_STATUS_CODE, + SpanStatusCode.OK.toString() + ); + } + + span.end(); + return callback(err, value); + }; + + context.bind(call); + return (original as Function).call({}, call, patchedCallback); +} + +/** + * Patch callback or EventEmitter provided by `originalFunc` and set appropriate `span` + * properties based on its result. + */ +export function handleServerFunction( + span: Span, + type: string, + originalFunc: HandleCall, + call: ServerCallWithMeta, + callback: SendUnaryDataCallback +): void { + switch (type) { + case 'unary': + case 'clientStream': + case 'client_stream': + return clientStreamAndUnaryHandler( + span, + call, + callback, + originalFunc as + | grpcJs.handleUnaryCall + | grpcJs.ClientReadableStream + ); + case 'serverStream': + case 'server_stream': + case 'bidi': + return serverStreamAndBidiHandler( + span, + call, + originalFunc as + | grpcJs.handleBidiStreamingCall + | grpcJs.handleServerStreamingCall + ); + default: + break; + } +} + +/** + * Does not patch any callbacks or EventEmitters to omit tracing on requests + * that should not be traced. + */ +export function handleUntracedServerFunction( + type: string, + originalFunc: HandleCall, + call: ServerCallWithMeta, + callback: SendUnaryDataCallback +): void { + switch (type) { + case 'unary': + case 'clientStream': + case 'client_stream': + return (originalFunc as Function).call({}, call, callback); + case 'serverStream': + case 'server_stream': + case 'bidi': + return (originalFunc as Function).call({}, call); + default: + break; + } +} + +/** + * Returns true if the server call should not be traced. + */ +export function shouldNotTraceServerCall( + metadata: grpcJs.Metadata, + methodName: string, + ignoreGrpcMethods?: IgnoreMatcher[] +): boolean { + const parsedName = methodName.split('/'); + return _methodIsIgnored( + parsedName[parsedName.length - 1] || methodName, + ignoreGrpcMethods + ); +} diff --git a/packages/opentelemetry-instrumentation-grpc/src/grpc-js/types.ts b/packages/opentelemetry-instrumentation-grpc/src/grpc-js/types.ts new file mode 100644 index 0000000000..06ee4125d0 --- /dev/null +++ b/packages/opentelemetry-instrumentation-grpc/src/grpc-js/types.ts @@ -0,0 +1,61 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import type * as grpcJs from '@grpc/grpc-js'; +import type { EventEmitter } from 'events'; +import type { CALL_SPAN_ENDED } from './serverUtils'; + +/** + * Server Unary callback type + */ +export type SendUnaryDataCallback = grpcJs.requestCallback; + +/** + * Intersection type of all grpc server call types + */ +export type ServerCall = + | grpcJs.ServerUnaryCall + | grpcJs.ServerReadableStream + | grpcJs.ServerWritableStream + | grpcJs.ServerDuplexStream; + +/** + * {@link ServerCall} ServerCall extended with misc. missing utility types + */ +export type ServerCallWithMeta = ServerCall & { + metadata: grpcJs.Metadata; +}; + +/** + * EventEmitter with span ended symbol indicator + */ +export type GrpcEmitter = EventEmitter & { [CALL_SPAN_ENDED]?: boolean }; + +/** + * Grpc client callback function extended with missing utility types + */ +export type GrpcClientFunc = ((...args: unknown[]) => GrpcEmitter) & { + path: string; + requestStream: boolean; + responseStream: boolean; +}; + +export type ServerRegisterFunction = typeof grpcJs.Server.prototype.register; + +export type MakeClientConstructorFunction = typeof grpcJs.makeGenericClientConstructor; + +export type { HandleCall } from '@grpc/grpc-js/build/src/server-call'; +export type { PackageDefinition } from '@grpc/grpc-js/build/src/make-client'; diff --git a/packages/opentelemetry-instrumentation-grpc/src/grpc.ts b/packages/opentelemetry-instrumentation-grpc/src/grpc.ts deleted file mode 100644 index 0ffc69e98e..0000000000 --- a/packages/opentelemetry-instrumentation-grpc/src/grpc.ts +++ /dev/null @@ -1,581 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import { - SpanStatusCode, - context, - propagation, - Span, - SpanKind, - SpanOptions, - SpanStatus, - setSpan, -} from '@opentelemetry/api'; -import { RpcAttribute } from '@opentelemetry/semantic-conventions'; -import { - InstrumentationBase, - InstrumentationConfig, - InstrumentationNodeModuleDefinition, - InstrumentationNodeModuleFile, - isWrapped, -} from '@opentelemetry/instrumentation'; -import type * as events from 'events'; -import type * as grpcTypes from 'grpc'; -import { - GrpcClientFunc, - GrpcInternalClientTypes, - GrpcInstrumentationConfig, - SendUnaryDataCallback, - ServerCallWithMeta, -} from './types'; -import { - findIndex, - _grpcStatusCodeToOpenTelemetryStatusCode, - _grpcStatusCodeToSpanStatus, - _methodIsIgnored, -} from './utils'; -import { VERSION } from './version'; - -/** The metadata key under which span context is stored as a binary value. */ -export const GRPC_TRACE_KEY = 'grpc-trace-bin'; - -/** - * Holding reference to grpc module here to access constant of grpc modules - * instead of just requiring it avoid directly depending on grpc itself. - */ -let grpcClient: typeof grpcTypes; - -export class GrpcInstrumentation extends InstrumentationBase { - constructor( - protected _config: GrpcInstrumentationConfig & InstrumentationConfig = {} - ) { - super('@opentelemetry/instrumentation-grpc', VERSION, _config); - } - - public setConfig( - config: GrpcInstrumentationConfig & InstrumentationConfig = {} - ) { - this._config = Object.assign({}, config); - } - - init() { - return [ - new InstrumentationNodeModuleDefinition( - 'grpc', - ['1.*'], - (moduleExports, version) => { - this._logger.debug(`Applying patch for grpc@${version}`); - grpcClient = moduleExports; - if (isWrapped(moduleExports.Server.prototype.register)) { - this._unwrap(moduleExports.Server.prototype, 'register'); - } - this._wrap( - moduleExports.Server.prototype, - 'register', - this._patchServer() as any - ); - // Wrap the externally exported client constructor - if (isWrapped(moduleExports.makeGenericClientConstructor)) { - this._unwrap(moduleExports, 'makeGenericClientConstructor'); - } - this._wrap( - moduleExports, - 'makeGenericClientConstructor', - this._patchClient() - ); - return moduleExports; - }, - (moduleExports, version) => { - if (moduleExports === undefined) return; - this._logger.debug(`Removing patch for grpc@${version}`); - - this._unwrap(moduleExports.Server.prototype, 'register'); - }, - this._getInternalPatchs() - ), - ]; - } - - private _getInternalPatchs() { - const onPatch = ( - moduleExports: GrpcInternalClientTypes, - version?: string - ) => { - this._logger.debug(`Applying internal patch for grpc@${version}`); - if (isWrapped(moduleExports.makeClientConstructor)) { - this._unwrap(moduleExports, 'makeClientConstructor'); - } - this._wrap(moduleExports, 'makeClientConstructor', this._patchClient()); - return moduleExports; - }; - const onUnPatch = ( - moduleExports?: GrpcInternalClientTypes, - version?: string - ) => { - if (moduleExports === undefined) return; - this._logger.debug(`Removing internal patch for grpc@${version}`); - this._unwrap(moduleExports, 'makeClientConstructor'); - }; - return [ - new InstrumentationNodeModuleFile( - 'grpc/src/node/src/client.js', - ['0.13 - 1.6'], - onPatch, - onUnPatch - ), - new InstrumentationNodeModuleFile( - 'grpc/src/client.js', - ['^1.7'], - onPatch, - onUnPatch - ), - ]; - } - - private _setSpanContext(metadata: grpcTypes.Metadata): void { - propagation.inject(context.active(), metadata, { - set: (metadata, k, v) => metadata.set(k, v as grpcTypes.MetadataValue), - }); - } - - private _patchServer() { - return (originalRegister: typeof grpcTypes.Server.prototype.register) => { - const plugin = this; - plugin._logger.debug('patched gRPC server'); - - return function register( - this: grpcTypes.Server & { handlers: any }, - name: string, - handler: grpcTypes.handleCall, - serialize: grpcTypes.serialize, - deserialize: grpcTypes.deserialize, - type: string - ) { - const originalResult = originalRegister.apply(this, arguments as any); - const handlerSet = this.handlers[name]; - - plugin._wrap( - handlerSet, - 'func', - (originalFunc: grpcTypes.handleCall) => { - return function func( - this: typeof handlerSet, - call: ServerCallWithMeta, - callback: SendUnaryDataCallback - ) { - const self = this; - if (plugin._shouldNotTraceServerCall(call, name)) { - switch (type) { - case 'unary': - case 'client_stream': - return (originalFunc as Function).call( - self, - call, - callback - ); - case 'server_stream': - case 'bidi': - return (originalFunc as Function).call(self, call); - default: - return originalResult; - } - } - const spanName = `grpc.${name.replace('/', '')}`; - const spanOptions: SpanOptions = { - kind: SpanKind.SERVER, - }; - - plugin._logger.debug( - 'patch func: %s', - JSON.stringify(spanOptions) - ); - - context.with( - propagation.extract(context.active(), call.metadata, { - get: (metadata, key) => metadata.get(key).map(String), - keys: metadata => Object.keys(metadata.getMap()), - }), - () => { - const span = plugin.tracer - .startSpan(spanName, spanOptions) - .setAttributes({ - [RpcAttribute.GRPC_KIND]: spanOptions.kind, - }); - - context.with(setSpan(context.active(), span), () => { - switch (type) { - case 'unary': - case 'client_stream': - return plugin._clientStreamAndUnaryHandler( - plugin, - span, - call, - callback, - originalFunc, - self - ); - case 'server_stream': - case 'bidi': - return plugin._serverStreamAndBidiHandler( - plugin, - span, - call, - originalFunc, - self - ); - default: - break; - } - }); - } - ); - }; - } - ); - - return originalResult; - }; - }; - } - - /** - * Returns true if the server call should not be traced. - */ - private _shouldNotTraceServerCall( - call: ServerCallWithMeta, - name: string - ): boolean { - const parsedName = name.split('/'); - return _methodIsIgnored( - parsedName[parsedName.length - 1] || name, - this._config.ignoreGrpcMethods - ); - } - - private _clientStreamAndUnaryHandler( - plugin: GrpcInstrumentation, - span: Span, - call: ServerCallWithMeta, - callback: SendUnaryDataCallback, - original: - | grpcTypes.handleCall - | grpcTypes.ClientReadableStream, - self: {} - ) { - function patchedCallback( - err: grpcTypes.ServiceError, - value: any, - trailer: grpcTypes.Metadata, - flags: grpcTypes.writeFlags - ) { - if (err) { - if (err.code) { - span.setStatus({ - code: _grpcStatusCodeToOpenTelemetryStatusCode(err.code), - message: err.message, - }); - span.setAttribute(RpcAttribute.GRPC_STATUS_CODE, err.code.toString()); - } - span.setAttributes({ - [RpcAttribute.GRPC_ERROR_NAME]: err.name, - [RpcAttribute.GRPC_ERROR_MESSAGE]: err.message, - }); - } else { - span.setStatus({ code: SpanStatusCode.UNSET }); - span.setAttribute( - RpcAttribute.GRPC_STATUS_CODE, - grpcClient.status.OK.toString() - ); - } - span.addEvent('received'); - - // end the span - span.end(); - return callback(err, value, trailer, flags); - } - - context.bind(call); - return (original as Function).call(self, call, patchedCallback); - } - - private _serverStreamAndBidiHandler( - plugin: GrpcInstrumentation, - span: Span, - call: ServerCallWithMeta, - original: grpcTypes.handleCall, - self: {} - ) { - let spanEnded = false; - const endSpan = () => { - if (!spanEnded) { - spanEnded = true; - span.end(); - } - }; - - context.bind(call); - call.on('finish', () => { - span.setStatus(_grpcStatusCodeToSpanStatus(call.status.code)); - span.setAttribute( - RpcAttribute.GRPC_STATUS_CODE, - call.status.code.toString() - ); - - // if there is an error, span will be ended on error event, otherwise end it here - if (call.status.code === 0) { - span.addEvent('finished'); - endSpan(); - } - }); - - call.on('error', (err: grpcTypes.ServiceError) => { - span.setStatus({ - code: _grpcStatusCodeToOpenTelemetryStatusCode(err.code), - message: err.message, - }); - span.addEvent('finished with error'); - span.setAttributes({ - [RpcAttribute.GRPC_ERROR_NAME]: err.name, - [RpcAttribute.GRPC_ERROR_MESSAGE]: err.message, - }); - endSpan(); - }); - - return (original as any).call(self, call); - } - - private _patchClient() { - const plugin = this; - return (original: typeof grpcTypes.makeGenericClientConstructor): never => { - plugin._logger.debug('patching client'); - return function makeClientConstructor( - this: typeof grpcTypes.Client, - methods: { [key: string]: { originalName?: string } }, - _serviceName: string, - _options: grpcTypes.GenericClientOptions - ) { - const client = original.apply(this, arguments as any); - plugin._massWrap( - client.prototype as never, - plugin._getMethodsToWrap(client, methods) as never[], - plugin._getPatchedClientMethods() as any - ); - return client; - } as never; - }; - } - - private _getMethodsToWrap( - client: typeof grpcTypes.Client, - methods: { [key: string]: { originalName?: string } } - ): string[] { - const methodList: string[] = []; - - // For a method defined in .proto as "UnaryMethod" - Object.entries(methods).forEach(([name, { originalName }]) => { - if (!_methodIsIgnored(name, this._config.ignoreGrpcMethods)) { - methodList.push(name); // adds camel case method name: "unaryMethod" - if ( - originalName && - // eslint-disable-next-line no-prototype-builtins - client.prototype.hasOwnProperty(originalName) && - name !== originalName // do not add duplicates - ) { - // adds original method name: "UnaryMethod", - methodList.push(originalName); - } - } - }); - return methodList; - } - - private _getPatchedClientMethods() { - const plugin = this; - return (original: GrpcClientFunc) => { - plugin._logger.debug('patch all client methods'); - return function clientMethodTrace(this: grpcTypes.Client) { - const name = `grpc.${original.path.replace('/', '')}`; - const args = Array.prototype.slice.call(arguments); - const metadata = plugin._getMetadata(original, args); - const span = plugin.tracer.startSpan(name, { - kind: SpanKind.CLIENT, - }); - return context.with(setSpan(context.active(), span), () => - plugin._makeGrpcClientRemoteCall(original, args, metadata, this)(span) - ); - }; - }; - } - - /** - * This method handles the client remote call - */ - private _makeGrpcClientRemoteCall( - original: GrpcClientFunc, - args: any[], - metadata: grpcTypes.Metadata, - self: grpcTypes.Client - ) { - /** - * Patches a callback so that the current span for this trace is also ended - * when the callback is invoked. - */ - function patchedCallback( - span: Span, - callback: SendUnaryDataCallback, - _metadata: grpcTypes.Metadata - ) { - const wrappedFn = (err: grpcTypes.ServiceError, res: any) => { - if (err) { - if (err.code) { - span.setStatus(_grpcStatusCodeToSpanStatus(err.code)); - span.setAttribute( - RpcAttribute.GRPC_STATUS_CODE, - err.code.toString() - ); - } - span.setAttributes({ - [RpcAttribute.GRPC_ERROR_NAME]: err.name, - [RpcAttribute.GRPC_ERROR_MESSAGE]: err.message, - }); - } else { - span.setStatus({ code: SpanStatusCode.UNSET }); - span.setAttribute( - RpcAttribute.GRPC_STATUS_CODE, - grpcClient.status.OK.toString() - ); - } - - span.end(); - callback(err, res); - }; - return context.bind(wrappedFn); - } - - return (span: Span) => { - if (!span) { - return original.apply(self, args); - } - - // if unary or clientStream - if (!original.responseStream) { - const callbackFuncIndex = findIndex(args, arg => { - return typeof arg === 'function'; - }); - if (callbackFuncIndex !== -1) { - args[callbackFuncIndex] = patchedCallback( - span, - args[callbackFuncIndex], - metadata - ); - } - } - - span.addEvent('sent'); - span.setAttributes({ - [RpcAttribute.GRPC_METHOD]: original.path, - [RpcAttribute.GRPC_KIND]: SpanKind.CLIENT, - }); - - this._setSpanContext(metadata); - const call = original.apply(self, args); - - // if server stream or bidi - if (original.responseStream) { - // Both error and status events can be emitted - // the first one emitted set spanEnded to true - let spanEnded = false; - const endSpan = () => { - if (!spanEnded) { - span.end(); - spanEnded = true; - } - }; - context.bind(call); - ((call as unknown) as events.EventEmitter).on( - 'error', - (err: grpcTypes.ServiceError) => { - span.setStatus({ - code: _grpcStatusCodeToOpenTelemetryStatusCode(err.code), - message: err.message, - }); - span.setAttributes({ - [RpcAttribute.GRPC_ERROR_NAME]: err.name, - [RpcAttribute.GRPC_ERROR_MESSAGE]: err.message, - }); - endSpan(); - } - ); - - ((call as unknown) as events.EventEmitter).on( - 'status', - (status: SpanStatus) => { - span.setStatus({ code: SpanStatusCode.UNSET }); - span.setAttribute( - RpcAttribute.GRPC_STATUS_CODE, - status.code.toString() - ); - endSpan(); - } - ); - } - return call; - }; - } - - private _getMetadata( - original: GrpcClientFunc, - args: any[] - ): grpcTypes.Metadata { - let metadata: grpcTypes.Metadata; - - // This finds an instance of Metadata among the arguments. - // A possible issue that could occur is if the 'options' parameter from - // the user contains an '_internal_repr' as well as a 'getMap' function, - // but this is an extremely rare case. - let metadataIndex = findIndex(args, (arg: any) => { - return ( - arg && - typeof arg === 'object' && - arg._internal_repr && - typeof arg.getMap === 'function' - ); - }); - if (metadataIndex === -1) { - metadata = new grpcClient.Metadata(); - if (!original.requestStream) { - // unary or server stream - if (args.length === 0) { - // No argument (for the gRPC call) was provided, so we will have to - // provide one, since metadata cannot be the first argument. - // The internal representation of argument defaults to undefined - // in its non-presence. - // Note that we can't pass null instead of undefined because the - // serializer within gRPC doesn't accept it. - args.push(undefined); - } - metadataIndex = 1; - } else { - // client stream or bidi - metadataIndex = 0; - } - args.splice(metadataIndex, 0, metadata); - } else { - metadata = args[metadataIndex]; - } - return metadata; - } -} diff --git a/packages/opentelemetry-instrumentation-grpc/src/grpc/clientUtils.ts b/packages/opentelemetry-instrumentation-grpc/src/grpc/clientUtils.ts new file mode 100644 index 0000000000..fda7b4c7ed --- /dev/null +++ b/packages/opentelemetry-instrumentation-grpc/src/grpc/clientUtils.ts @@ -0,0 +1,197 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import type * as grpcTypes from 'grpc'; +import type * as events from 'events'; +import { SendUnaryDataCallback, GrpcClientFunc } from './types'; +import { RpcAttribute } from '@opentelemetry/semantic-conventions'; +import { + context, + Span, + SpanStatusCode, + SpanKind, + SpanStatus, + propagation, +} from '@opentelemetry/api'; +import { + _grpcStatusCodeToSpanStatus, + _grpcStatusCodeToOpenTelemetryStatusCode, + findIndex, +} from '../utils'; + +/** + * This method handles the client remote call + */ +export const makeGrpcClientRemoteCall = function ( + grpcClient: typeof grpcTypes, + original: GrpcClientFunc, + args: any[], + metadata: grpcTypes.Metadata, + self: grpcTypes.Client +) { + /** + * Patches a callback so that the current span for this trace is also ended + * when the callback is invoked. + */ + function patchedCallback( + span: Span, + callback: SendUnaryDataCallback, + _metadata: grpcTypes.Metadata + ) { + const wrappedFn = (err: grpcTypes.ServiceError, res: any) => { + if (err) { + if (err.code) { + span.setStatus(_grpcStatusCodeToSpanStatus(err.code)); + span.setAttribute(RpcAttribute.GRPC_STATUS_CODE, err.code.toString()); + } + span.setAttributes({ + [RpcAttribute.GRPC_ERROR_NAME]: err.name, + [RpcAttribute.GRPC_ERROR_MESSAGE]: err.message, + }); + } else { + span.setStatus({ code: SpanStatusCode.UNSET }); + span.setAttribute( + RpcAttribute.GRPC_STATUS_CODE, + grpcClient.status.OK.toString() + ); + } + + span.end(); + callback(err, res); + }; + return context.bind(wrappedFn); + } + + return (span: Span) => { + if (!span) { + return original.apply(self, args); + } + + // if unary or clientStream + if (!original.responseStream) { + const callbackFuncIndex = findIndex(args, arg => { + return typeof arg === 'function'; + }); + if (callbackFuncIndex !== -1) { + args[callbackFuncIndex] = patchedCallback( + span, + args[callbackFuncIndex], + metadata + ); + } + } + + span.addEvent('sent'); + span.setAttributes({ + [RpcAttribute.GRPC_METHOD]: original.path, + [RpcAttribute.GRPC_KIND]: SpanKind.CLIENT, + }); + + setSpanContext(metadata); + const call = original.apply(self, args); + + // if server stream or bidi + if (original.responseStream) { + // Both error and status events can be emitted + // the first one emitted set spanEnded to true + let spanEnded = false; + const endSpan = () => { + if (!spanEnded) { + span.end(); + spanEnded = true; + } + }; + context.bind(call); + ((call as unknown) as events.EventEmitter).on( + 'error', + (err: grpcTypes.ServiceError) => { + span.setStatus({ + code: _grpcStatusCodeToOpenTelemetryStatusCode(err.code), + message: err.message, + }); + span.setAttributes({ + [RpcAttribute.GRPC_ERROR_NAME]: err.name, + [RpcAttribute.GRPC_ERROR_MESSAGE]: err.message, + }); + endSpan(); + } + ); + + ((call as unknown) as events.EventEmitter).on( + 'status', + (status: SpanStatus) => { + span.setStatus({ code: SpanStatusCode.UNSET }); + span.setAttribute( + RpcAttribute.GRPC_STATUS_CODE, + status.code.toString() + ); + endSpan(); + } + ); + } + return call; + }; +}; + +export const getMetadata = function ( + grpcClient: typeof grpcTypes, + original: GrpcClientFunc, + args: any[] +): grpcTypes.Metadata { + let metadata: grpcTypes.Metadata; + + // This finds an instance of Metadata among the arguments. + // A possible issue that could occur is if the 'options' parameter from + // the user contains an '_internal_repr' as well as a 'getMap' function, + // but this is an extremely rare case. + let metadataIndex = findIndex(args, (arg: any) => { + return ( + arg && + typeof arg === 'object' && + arg._internal_repr && + typeof arg.getMap === 'function' + ); + }); + if (metadataIndex === -1) { + metadata = new grpcClient.Metadata(); + if (!original.requestStream) { + // unary or server stream + if (args.length === 0) { + // No argument (for the gRPC call) was provided, so we will have to + // provide one, since metadata cannot be the first argument. + // The internal representation of argument defaults to undefined + // in its non-presence. + // Note that we can't pass null instead of undefined because the + // serializer within gRPC doesn't accept it. + args.push(undefined); + } + metadataIndex = 1; + } else { + // client stream or bidi + metadataIndex = 0; + } + args.splice(metadataIndex, 0, metadata); + } else { + metadata = args[metadataIndex]; + } + return metadata; +}; + +const setSpanContext = function (metadata: grpcTypes.Metadata): void { + propagation.inject(context.active(), metadata, { + set: (metadata, k, v) => metadata.set(k, v as grpcTypes.MetadataValue), + }); +}; diff --git a/packages/opentelemetry-instrumentation-grpc/src/grpc/index.ts b/packages/opentelemetry-instrumentation-grpc/src/grpc/index.ts new file mode 100644 index 0000000000..1963173945 --- /dev/null +++ b/packages/opentelemetry-instrumentation-grpc/src/grpc/index.ts @@ -0,0 +1,316 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import type * as grpcTypes from 'grpc'; +import { + InstrumentationNodeModuleDefinition, + InstrumentationNodeModuleFile, + InstrumentationBase, + InstrumentationConfig, + isWrapped, +} from '@opentelemetry/instrumentation'; +import { + GrpcInternalClientTypes, + ServerCallWithMeta, + SendUnaryDataCallback, + GrpcClientFunc, +} from './types'; +import { GrpcInstrumentationConfig } from '../types'; +import { RpcAttribute } from '@opentelemetry/semantic-conventions'; +import { + context, + propagation, + SpanOptions, + SpanKind, + setSpan, +} from '@opentelemetry/api'; +import { + clientStreamAndUnaryHandler, + shouldNotTraceServerCall, + serverStreamAndBidiHandler, +} from './serverUtils'; +import { makeGrpcClientRemoteCall, getMetadata } from './clientUtils'; +import { _methodIsIgnored } from '../utils'; + +/** + * Holding reference to grpc module here to access constant of grpc modules + * instead of just requiring it avoid directly depending on grpc itself. + */ +let grpcClient: typeof grpcTypes; + +export class GrpcNativeInstrumentation extends InstrumentationBase< + typeof grpcTypes +> { + constructor( + protected _config: GrpcInstrumentationConfig & InstrumentationConfig = {}, + name: string, + version: string + ) { + super(name, version, _config); + } + + public setConfig( + config: GrpcInstrumentationConfig & InstrumentationConfig = {} + ) { + this._config = Object.assign({}, config); + } + + init() { + return [ + new InstrumentationNodeModuleDefinition( + 'grpc', + ['1.*'], + (moduleExports, version) => { + this._logger.debug(`Applying patch for grpc@${version}`); + if (isWrapped(moduleExports.Server.prototype.register)) { + this._unwrap(moduleExports.Server.prototype, 'register'); + } + grpcClient = moduleExports; + this._wrap( + moduleExports.Server.prototype, + 'register', + this._patchServer(moduleExports) as any + ); + // Wrap the externally exported client constructor + if (isWrapped(moduleExports.makeGenericClientConstructor)) { + this._unwrap(moduleExports, 'makeGenericClientConstructor'); + } + this._wrap( + moduleExports, + 'makeGenericClientConstructor', + this._patchClient(moduleExports) + ); + return moduleExports; + }, + (moduleExports, version) => { + if (moduleExports === undefined) return; + this._logger.debug(`Removing patch for grpc@${version}`); + + this._unwrap(moduleExports.Server.prototype, 'register'); + }, + this._getInternalPatchs() + ), + ]; + } + + private _getInternalPatchs() { + const onPatch = ( + moduleExports: GrpcInternalClientTypes, + version?: string + ) => { + this._logger.debug(`Applying internal patch for grpc@${version}`); + if (isWrapped(moduleExports.makeClientConstructor)) { + this._unwrap(moduleExports, 'makeClientConstructor'); + } + this._wrap( + moduleExports, + 'makeClientConstructor', + this._patchClient(grpcClient) + ); + return moduleExports; + }; + const onUnPatch = ( + moduleExports?: GrpcInternalClientTypes, + version?: string + ) => { + if (moduleExports === undefined) return; + this._logger.debug(`Removing internal patch for grpc@${version}`); + this._unwrap(moduleExports, 'makeClientConstructor'); + }; + return [ + new InstrumentationNodeModuleFile( + 'grpc/src/node/src/client.js', + ['0.13 - 1.6'], + onPatch, + onUnPatch + ), + new InstrumentationNodeModuleFile( + 'grpc/src/client.js', + ['^1.7'], + onPatch, + onUnPatch + ), + ]; + } + + private _patchServer(grpcModule: typeof grpcTypes) { + const instrumentation = this; + return (originalRegister: typeof grpcTypes.Server.prototype.register) => { + instrumentation._logger.debug('patched gRPC server'); + + return function register( + this: grpcTypes.Server & { handlers: any }, + name: string, + handler: grpcTypes.handleCall, + serialize: grpcTypes.serialize, + deserialize: grpcTypes.deserialize, + type: string + ) { + const originalResult = originalRegister.apply(this, arguments as any); + const handlerSet = this.handlers[name]; + + instrumentation._wrap( + handlerSet, + 'func', + (originalFunc: grpcTypes.handleCall) => { + return function func( + this: typeof handlerSet, + call: ServerCallWithMeta, + callback: SendUnaryDataCallback + ) { + const self = this; + if (shouldNotTraceServerCall.call(instrumentation, call, name)) { + switch (type) { + case 'unary': + case 'client_stream': + return (originalFunc as Function).call( + self, + call, + callback + ); + case 'server_stream': + case 'bidi': + return (originalFunc as Function).call(self, call); + default: + return originalResult; + } + } + const spanName = `grpc.${name.replace('/', '')}`; + const spanOptions: SpanOptions = { + kind: SpanKind.SERVER, + }; + + instrumentation._logger.debug( + 'patch func: %s', + JSON.stringify(spanOptions) + ); + + context.with( + propagation.extract(context.active(), call.metadata, { + get: (metadata, key) => metadata.get(key).map(String), + keys: metadata => Object.keys(metadata.getMap()), + }), + () => { + const span = instrumentation.tracer + .startSpan(spanName, spanOptions) + .setAttributes({ + [RpcAttribute.GRPC_KIND]: spanOptions.kind, + }); + + context.with(setSpan(context.active(), span), () => { + switch (type) { + case 'unary': + case 'client_stream': + return clientStreamAndUnaryHandler( + grpcModule, + span, + call, + callback, + originalFunc, + self + ); + case 'server_stream': + case 'bidi': + return serverStreamAndBidiHandler( + span, + call, + originalFunc, + self + ); + default: + break; + } + }); + } + ); + }; + } + ); + + return originalResult; + }; + }; + } + + private _patchClient(grpcClient: typeof grpcTypes) { + const instrumentation = this; + return (original: typeof grpcTypes.makeGenericClientConstructor): never => { + instrumentation._logger.debug('patching client'); + return function makeClientConstructor( + this: typeof grpcTypes.Client, + methods: { [key: string]: { originalName?: string } }, + _serviceName: string, + _options: grpcTypes.GenericClientOptions + ) { + const client = original.apply(this, arguments as any); + instrumentation._massWrap( + client.prototype as never, + instrumentation._getMethodsToWrap(client, methods) as never[], + instrumentation._getPatchedClientMethods(grpcClient) as any + ); + return client; + } as never; + }; + } + + private _getMethodsToWrap( + client: typeof grpcTypes.Client, + methods: { [key: string]: { originalName?: string } } + ): string[] { + const methodList: string[] = []; + + // For a method defined in .proto as "UnaryMethod" + Object.entries(methods).forEach(([name, { originalName }]) => { + if (!_methodIsIgnored(name, this._config.ignoreGrpcMethods)) { + methodList.push(name); // adds camel case method name: "unaryMethod" + if ( + originalName && + // eslint-disable-next-line no-prototype-builtins + client.prototype.hasOwnProperty(originalName) && + name !== originalName // do not add duplicates + ) { + // adds original method name: "UnaryMethod", + methodList.push(originalName); + } + } + }); + return methodList; + } + + private _getPatchedClientMethods(grpcClient: typeof grpcTypes) { + const instrumentation = this; + return (original: GrpcClientFunc) => { + instrumentation._logger.debug('patch all client methods'); + return function clientMethodTrace(this: grpcTypes.Client) { + const name = `grpc.${original.path.replace('/', '')}`; + const args = Array.prototype.slice.call(arguments); + const metadata = getMetadata(grpcClient, original, args); + const span = instrumentation.tracer.startSpan(name, { + kind: SpanKind.CLIENT, + }); + return context.with(setSpan(context.active(), span), () => + makeGrpcClientRemoteCall( + grpcClient, + original, + args, + metadata, + this + )(span) + ); + }; + }; + } +} diff --git a/packages/opentelemetry-instrumentation-grpc/src/grpc/serverUtils.ts b/packages/opentelemetry-instrumentation-grpc/src/grpc/serverUtils.ts new file mode 100644 index 0000000000..cd928a4665 --- /dev/null +++ b/packages/opentelemetry-instrumentation-grpc/src/grpc/serverUtils.ts @@ -0,0 +1,132 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import type * as grpcTypes from 'grpc'; +import { SendUnaryDataCallback, ServerCallWithMeta } from './types'; +import { GrpcNativeInstrumentation } from './'; +import { RpcAttribute } from '@opentelemetry/semantic-conventions'; +import { context, Span, SpanStatusCode } from '@opentelemetry/api'; +import { + _grpcStatusCodeToOpenTelemetryStatusCode, + _grpcStatusCodeToSpanStatus, + _methodIsIgnored, +} from '../utils'; + +export const clientStreamAndUnaryHandler = function ( + grpcClient: typeof grpcTypes, + span: Span, + call: ServerCallWithMeta, + callback: SendUnaryDataCallback, + original: + | grpcTypes.handleCall + | grpcTypes.ClientReadableStream, + self: {} +) { + function patchedCallback( + err: grpcTypes.ServiceError, + value: any, + trailer: grpcTypes.Metadata, + flags: grpcTypes.writeFlags + ) { + if (err) { + if (err.code) { + span.setStatus({ + code: _grpcStatusCodeToOpenTelemetryStatusCode(err.code), + message: err.message, + }); + span.setAttribute(RpcAttribute.GRPC_STATUS_CODE, err.code.toString()); + } + span.setAttributes({ + [RpcAttribute.GRPC_ERROR_NAME]: err.name, + [RpcAttribute.GRPC_ERROR_MESSAGE]: err.message, + }); + } else { + span.setStatus({ code: SpanStatusCode.UNSET }); + span.setAttribute( + RpcAttribute.GRPC_STATUS_CODE, + grpcClient.status.OK.toString() + ); + } + span.addEvent('received'); + + // end the span + span.end(); + return callback(err, value, trailer, flags); + } + + context.bind(call); + return (original as Function).call(self, call, patchedCallback); +}; + +export const serverStreamAndBidiHandler = function ( + span: Span, + call: ServerCallWithMeta, + original: grpcTypes.handleCall, + self: {} +) { + let spanEnded = false; + const endSpan = () => { + if (!spanEnded) { + spanEnded = true; + span.end(); + } + }; + + context.bind(call); + call.on('finish', () => { + span.setStatus(_grpcStatusCodeToSpanStatus(call.status.code)); + span.setAttribute( + RpcAttribute.GRPC_STATUS_CODE, + call.status.code.toString() + ); + + // if there is an error, span will be ended on error event, otherwise end it here + if (call.status.code === 0) { + span.addEvent('finished'); + endSpan(); + } + }); + + call.on('error', (err: grpcTypes.ServiceError) => { + span.setStatus({ + code: _grpcStatusCodeToOpenTelemetryStatusCode(err.code), + message: err.message, + }); + span.addEvent('finished with error'); + span.setAttributes({ + [RpcAttribute.GRPC_ERROR_NAME]: err.name, + [RpcAttribute.GRPC_ERROR_MESSAGE]: err.message, + }); + endSpan(); + }); + + return (original as any).call(self, call); +}; + +/** + * Returns true if the server call should not be traced. + */ +export const shouldNotTraceServerCall = function ( + this: GrpcNativeInstrumentation, + call: ServerCallWithMeta, + name: string +): boolean { + const parsedName = name.split('/'); + return _methodIsIgnored( + parsedName[parsedName.length - 1] || name, + this._config.ignoreGrpcMethods + ); +}; diff --git a/packages/opentelemetry-instrumentation-grpc/src/grpc/types.ts b/packages/opentelemetry-instrumentation-grpc/src/grpc/types.ts new file mode 100644 index 0000000000..ef3e4ef9bb --- /dev/null +++ b/packages/opentelemetry-instrumentation-grpc/src/grpc/types.ts @@ -0,0 +1,53 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import type * as grpcTypes from 'grpc'; +import * as events from 'events'; + +export type SendUnaryDataCallback = ( + error: grpcTypes.ServiceError | null, + value?: any, + trailer?: grpcTypes.Metadata, + flags?: grpcTypes.writeFlags +) => void; + +interface GrpcStatus { + code: number; + details: string; + metadata: grpcTypes.Metadata; +} + +export type ServerCall = + | typeof grpcTypes.ServerUnaryCall + | typeof grpcTypes.ServerReadableStream + | typeof grpcTypes.ServerWritableStream + | typeof grpcTypes.ServerDuplexStream; + +export type ServerCallWithMeta = ServerCall & { + metadata: grpcTypes.Metadata; + status: GrpcStatus; + request?: unknown; +} & events.EventEmitter; + +export type GrpcClientFunc = typeof Function & { + path: string; + requestStream: boolean; + responseStream: boolean; +}; + +export type GrpcInternalClientTypes = { + makeClientConstructor: typeof grpcTypes.makeGenericClientConstructor; +}; diff --git a/packages/opentelemetry-instrumentation-grpc/src/index.ts b/packages/opentelemetry-instrumentation-grpc/src/index.ts index 4ffcf69671..24c76056a1 100644 --- a/packages/opentelemetry-instrumentation-grpc/src/index.ts +++ b/packages/opentelemetry-instrumentation-grpc/src/index.ts @@ -14,4 +14,4 @@ * limitations under the License. */ -export * from './grpc'; +export * from './instrumentation'; diff --git a/packages/opentelemetry-instrumentation-grpc/src/instrumentation.ts b/packages/opentelemetry-instrumentation-grpc/src/instrumentation.ts new file mode 100644 index 0000000000..0245f367d2 --- /dev/null +++ b/packages/opentelemetry-instrumentation-grpc/src/instrumentation.ts @@ -0,0 +1,100 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { InstrumentationConfig } from '@opentelemetry/instrumentation'; +import { GrpcInstrumentationConfig } from './types'; +import { VERSION } from './version'; +import { GrpcNativeInstrumentation } from './grpc'; +import { GrpcJsInstrumentation } from './grpc-js'; +import * as api from '@opentelemetry/api'; +import { MeterProvider } from '@opentelemetry/api-metrics'; + +/** The metadata key under which span context is stored as a binary value. */ +export const GRPC_TRACE_KEY = 'grpc-trace-bin'; + +export class GrpcInstrumentation { + private _grpcNativeInstrumentation: GrpcNativeInstrumentation; + private _grpcJsInstrumentation: GrpcJsInstrumentation; + + public readonly instrumentationName: string = + '@opentelemetry/instrumentation-grpc'; + public readonly instrumentationVersion: string = VERSION; + + constructor( + protected _config: GrpcInstrumentationConfig & InstrumentationConfig = {} + ) { + this._grpcJsInstrumentation = new GrpcJsInstrumentation( + _config, + this.instrumentationName, + this.instrumentationVersion + ); + this._grpcNativeInstrumentation = new GrpcNativeInstrumentation( + _config, + this.instrumentationName, + this.instrumentationVersion + ); + } + + public setConfig( + config: GrpcInstrumentationConfig & InstrumentationConfig = {} + ) { + this._config = Object.assign({}, config); + this._grpcJsInstrumentation.setConfig(this._config); + this._grpcNativeInstrumentation.setConfig(this._config); + } + + /** + * @internal + * Public reference to the protected BaseInstrumentation `_config` instance to be used by this + * plugin's external helper functions + */ + public getConfig() { + return this._config; + } + + init() { + // sub instrumentations will already be init when constructing them + return; + } + + enable() { + this._grpcJsInstrumentation.enable(); + this._grpcNativeInstrumentation.enable(); + } + + disable() { + this._grpcJsInstrumentation.disable(); + this._grpcNativeInstrumentation.disable(); + } + + /** + * Sets MeterProvider to this plugin + * @param meterProvider + */ + public setMeterProvider(meterProvider: MeterProvider) { + this._grpcJsInstrumentation.setMeterProvider(meterProvider); + this._grpcNativeInstrumentation.setMeterProvider(meterProvider); + } + + /** + * Sets TraceProvider to this plugin + * @param tracerProvider + */ + public setTracerProvider(tracerProvider: api.TracerProvider) { + this._grpcJsInstrumentation.setTracerProvider(tracerProvider); + this._grpcNativeInstrumentation.setTracerProvider(tracerProvider); + } +} diff --git a/packages/opentelemetry-instrumentation-grpc/src/types.ts b/packages/opentelemetry-instrumentation-grpc/src/types.ts index 14b605d263..b5ac3e537b 100644 --- a/packages/opentelemetry-instrumentation-grpc/src/types.ts +++ b/packages/opentelemetry-instrumentation-grpc/src/types.ts @@ -14,50 +14,13 @@ * limitations under the License. */ -import type * as grpcTypes from 'grpc'; -import * as events from 'events'; import { InstrumentationConfig } from '@opentelemetry/instrumentation'; export type IgnoreMatcher = string | RegExp | ((str: string) => boolean); -export type SendUnaryDataCallback = ( - error: grpcTypes.ServiceError | null, - value?: any, - trailer?: grpcTypes.Metadata, - flags?: grpcTypes.writeFlags -) => void; - export interface GrpcInstrumentationConfig extends InstrumentationConfig { /* Omits tracing on any gRPC methods that match any of * the IgnoreMatchers in the ignoreGrpcMethods list */ ignoreGrpcMethods?: IgnoreMatcher[]; } - -interface GrpcStatus { - code: number; - details: string; - metadata: grpcTypes.Metadata; -} - -export type ServerCall = - | typeof grpcTypes.ServerUnaryCall - | typeof grpcTypes.ServerReadableStream - | typeof grpcTypes.ServerWritableStream - | typeof grpcTypes.ServerDuplexStream; - -export type ServerCallWithMeta = ServerCall & { - metadata: grpcTypes.Metadata; - status: GrpcStatus; - request?: unknown; -} & events.EventEmitter; - -export type GrpcClientFunc = typeof Function & { - path: string; - requestStream: boolean; - responseStream: boolean; -}; - -export type GrpcInternalClientTypes = { - makeClientConstructor: typeof grpcTypes.makeGenericClientConstructor; -}; diff --git a/packages/opentelemetry-instrumentation-grpc/src/utils.ts b/packages/opentelemetry-instrumentation-grpc/src/utils.ts index 22318cecfa..14e0a86eb9 100644 --- a/packages/opentelemetry-instrumentation-grpc/src/utils.ts +++ b/packages/opentelemetry-instrumentation-grpc/src/utils.ts @@ -16,6 +16,7 @@ import { SpanStatusCode, SpanStatus } from '@opentelemetry/api'; import type * as grpcTypes from 'grpc'; +import type * as grpcJsTypes from '@grpc/grpc-js'; import { IgnoreMatcher } from './types'; // Equivalent to lodash _.findIndex @@ -38,7 +39,7 @@ export const findIndex: (args: T[], fn: (arg: T) => boolean) => number = ( * @param status */ export const _grpcStatusCodeToOpenTelemetryStatusCode = ( - status?: grpcTypes.status + status?: grpcTypes.status | grpcJsTypes.status ): SpanStatusCode => { if (status !== undefined && status === 0) { return SpanStatusCode.UNSET; diff --git a/packages/opentelemetry-instrumentation-grpc/test/grpc-js.test.ts b/packages/opentelemetry-instrumentation-grpc/test/grpc-js.test.ts new file mode 100644 index 0000000000..624d267a21 --- /dev/null +++ b/packages/opentelemetry-instrumentation-grpc/test/grpc-js.test.ts @@ -0,0 +1,28 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { runTests } from './helper'; +import { GrpcInstrumentation } from '../src/instrumentation'; + +const instrumentation = new GrpcInstrumentation(); +instrumentation.enable(); +instrumentation.disable(); + +import * as grpcJs from '@grpc/grpc-js'; + +describe('#grpc-js', () => { + runTests(instrumentation, 'grpc', grpcJs, 12346); +}); diff --git a/packages/opentelemetry-instrumentation-grpc/test/grpc.test.ts b/packages/opentelemetry-instrumentation-grpc/test/grpc.test.ts index 1636c7f4b8..453c6d6ef9 100644 --- a/packages/opentelemetry-instrumentation-grpc/test/grpc.test.ts +++ b/packages/opentelemetry-instrumentation-grpc/test/grpc.test.ts @@ -15,7 +15,7 @@ */ import { runTests } from './helper'; -import { GrpcInstrumentation } from '../src/grpc'; +import { GrpcInstrumentation } from '../src/instrumentation'; const instrumentation = new GrpcInstrumentation(); instrumentation.enable(); diff --git a/packages/opentelemetry-instrumentation-grpc/tsconfig.json b/packages/opentelemetry-instrumentation-grpc/tsconfig.json index ea143a7be3..d133c18257 100644 --- a/packages/opentelemetry-instrumentation-grpc/tsconfig.json +++ b/packages/opentelemetry-instrumentation-grpc/tsconfig.json @@ -12,6 +12,9 @@ { "path": "../opentelemetry-api" }, + { + "path": "../opentelemetry-api-metrics" + }, { "path": "../opentelemetry-context-async-hooks" },