diff --git a/packages/grpc-native-core/index.js b/packages/grpc-native-core/index.js index c994c35d5..4e4289fa1 100644 --- a/packages/grpc-native-core/index.js +++ b/packages/grpc-native-core/index.js @@ -223,6 +223,8 @@ exports.writeFlags = constants.writeFlags; exports.logVerbosity = constants.logVerbosity; +exports.methodTypes = constants.methodTypes; + exports.credentials = require('./src/credentials.js'); /** @@ -266,6 +268,11 @@ exports.getClientChannel = client.getClientChannel; exports.waitForClientReady = client.waitForClientReady; +exports.StatusBuilder = client.StatusBuilder; +exports.ListenerBuilder = client.ListenerBuilder; +exports.RequesterBuilder = client.RequesterBuilder; +exports.InterceptingCall = client.InterceptingCall; + /** * @memberof grpc * @alias grpc.closeClient diff --git a/packages/grpc-native-core/src/client.js b/packages/grpc-native-core/src/client.js index a917cbc9a..79868df46 100644 --- a/packages/grpc-native-core/src/client.js +++ b/packages/grpc-native-core/src/client.js @@ -34,6 +34,7 @@ var _ = require('lodash'); +var client_interceptors = require('./client_interceptors'); var grpc = require('./grpc_extension'); var common = require('./common'); @@ -49,25 +50,10 @@ var stream = require('stream'); var Readable = stream.Readable; var Writable = stream.Writable; var Duplex = stream.Duplex; +var methodTypes = constants.methodTypes; var util = require('util'); var version = require('../package.json').version; -/** - * Create an Error object from a status object - * @private - * @param {grpc~StatusObject} status The status object - * @return {Error} The resulting Error - */ -function createStatusError(status) { - let statusName = _.invert(constants.status)[status.code]; - let message = `${status.code} ${statusName}: ${status.details}`; - let error = new Error(message); - error.code = status.code; - error.metadata = status.metadata; - error.details = status.details; - return error; -} - /** * Initial response metadata sent by the server when it starts processing the * call @@ -107,18 +93,15 @@ util.inherits(ClientWritableStream, Writable); * grpc~ClientWritableStream#metadata * @borrows grpc~ClientUnaryCall#event:status as * grpc~ClientWritableStream#status - * @param {grpc.internal~Call} call The call object to send data with - * @param {grpc~serialize=} [serialize=identity] Serialization - * function for writes. + * @param {InterceptingCall} call Exposes gRPC request operations, processed by + * an interceptor stack. */ -function ClientWritableStream(call, serialize) { +function ClientWritableStream(call) { Writable.call(this, {objectMode: true}); this.call = call; - this.serialize = common.wrapIgnoreNull(serialize); + var self = this; this.on('finish', function() { - var batch = {}; - batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true; - call.startBatch(batch, function() {}); + self.call.halfClose(); }); } @@ -145,8 +128,6 @@ function ClientWritableStream(call, serialize) { */ function _write(chunk, encoding, callback) { /* jshint validthis: true */ - var batch = {}; - var message; var self = this; if (this.writeFailed) { /* Once a write fails, just call the callback immediately to let the caller @@ -154,26 +135,7 @@ function _write(chunk, encoding, callback) { setImmediate(callback); return; } - try { - message = this.serialize(chunk); - } catch (e) { - /* Sending this error to the server and emitting it immediately on the - client may put the call in a slightly weird state on the client side, - but passing an object that causes a serialization failure is a misuse - of the API anyway, so that's OK. The primary purpose here is to give the - programmer a useful error and to stop the stream properly */ - this.call.cancelWithStatus(constants.status.INTERNAL, - 'Serialization failure'); - callback(e); - return; - } - if (_.isFinite(encoding)) { - /* Attach the encoding if it is a finite number. This is the closest we - * can get to checking that it is valid flags */ - message.grpcWriteFlags = encoding; - } - batch[grpc.opType.SEND_MESSAGE] = message; - this.call.startBatch(batch, function(err, event) { + var outerCallback = function(err, event) { if (err) { /* Assume that the call is complete and that writing failed because a status was received. In that case, set a flag to discard all future @@ -181,7 +143,12 @@ function _write(chunk, encoding, callback) { self.writeFailed = true; } callback(); - }); + }; + var context = { + encoding: encoding, + callback: outerCallback + }; + this.call.sendMessageWithContext(context, chunk); } ClientWritableStream.prototype._write = _write; @@ -199,16 +166,14 @@ util.inherits(ClientReadableStream, Readable); * grpc~ClientReadableStream#metadata * @borrows grpc~ClientUnaryCall#event:status as * grpc~ClientReadableStream#status - * @param {grpc.internal~Call} call The call object to read data with - * @param {grpc~deserialize=} [deserialize=identity] - * Deserialization function for reads + * @param {InterceptingCall} call Exposes gRPC request operations, processed by + * an interceptor stack. */ -function ClientReadableStream(call, deserialize) { +function ClientReadableStream(call) { Readable.call(this, {objectMode: true}); this.call = call; this.finished = false; this.reading = false; - this.deserialize = common.wrapIgnoreNull(deserialize); /* Status generated from reading messages from the server. Overrides the * status from the server if not OK */ this.read_status = null; @@ -267,7 +232,7 @@ function _emitStatusIfDone() { if (status.code === constants.status.OK) { this.push(null); } else { - var error = createStatusError(status); + var error = common.createStatusError(status); this.emit('error', error); } this.emit('status', status); @@ -283,48 +248,15 @@ ClientReadableStream.prototype._emitStatusIfDone = _emitStatusIfDone; */ function _read(size) { /* jshint validthis: true */ - var self = this; - /** - * Callback to be called when a READ event is received. Pushes the data onto - * the read queue and starts reading again if applicable - * @param {grpc.Event} event READ event object - */ - function readCallback(err, event) { - if (err) { - // Something has gone wrong. Stop reading and wait for status - self.finished = true; - self._readsDone(); - return; - } - var data = event.read; - var deserialized; - try { - deserialized = self.deserialize(data); - } catch (e) { - self._readsDone({code: constants.status.INTERNAL, - details: 'Failed to parse server response'}); - return; - } - if (data === null) { - self._readsDone(); - return; - } - if (self.push(deserialized) && data !== null) { - var read_batch = {}; - read_batch[grpc.opType.RECV_MESSAGE] = true; - self.call.startBatch(read_batch, readCallback); - } else { - self.reading = false; - } - } - if (self.finished) { - self.push(null); + if (this.finished) { + this.push(null); } else { - if (!self.reading) { - self.reading = true; - var read_batch = {}; - read_batch[grpc.opType.RECV_MESSAGE] = true; - self.call.startBatch(read_batch, readCallback); + if (!this.reading) { + this.reading = true; + var context = { + stream: this + }; + this.call.recvMessageWithContext(context); } } } @@ -345,26 +277,20 @@ util.inherits(ClientDuplexStream, Duplex); * grpc~ClientDuplexStream#metadata * @borrows grpc~ClientUnaryCall#event:status as * grpc~ClientDuplexStream#status - * @param {grpc.internal~Call} call Call object to proxy - * @param {grpc~serialize=} [serialize=identity] Serialization - * function for requests - * @param {grpc~deserialize=} [deserialize=identity] - * Deserialization function for responses + * @param {InterceptingCall} call Exposes gRPC request operations, processed by + * an interceptor stack. */ -function ClientDuplexStream(call, serialize, deserialize) { +function ClientDuplexStream(call) { Duplex.call(this, {objectMode: true}); - this.serialize = common.wrapIgnoreNull(serialize); - this.deserialize = common.wrapIgnoreNull(deserialize); this.call = call; /* Status generated from reading messages from the server. Overrides the * status from the server if not OK */ this.read_status = null; /* Status received from the server. */ this.received_status = null; + var self = this; this.on('finish', function() { - var batch = {}; - batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true; - call.startBatch(batch, function() {}); + self.call.halfClose(); }); } @@ -429,45 +355,17 @@ ClientDuplexStream.prototype.getPeer = getPeer; * should be used to make this particular call. */ -/** - * Get a call object built with the provided options. - * @access private - * @param {grpc.Client~CallOptions=} options Options object. - */ -function getCall(channel, method, options) { - var deadline; - var host; - var parent; - var propagate_flags; - var credentials; - if (options) { - deadline = options.deadline; - host = options.host; - parent = _.get(options, 'parent.call'); - propagate_flags = options.propagate_flags; - credentials = options.credentials; - } - if (deadline === undefined) { - deadline = Infinity; - } - var call = new grpc.Call(channel, method, deadline, host, - parent, propagate_flags); - if (credentials) { - call.setCredentials(credentials); - } - return call; -} - /** * A generic gRPC client. Primarily useful as a base class for generated clients * @memberof grpc * @constructor * @param {string} address Server address to connect to - * @param {grpc.credentials~ChannelCredentials} credentials Credentials to use to connect to - * the server + * @param {grpc.credentials~ChannelCredentials} credentials Credentials to use + * to connect to the server * @param {Object} options Options to apply to channel creation */ function Client(address, credentials, options) { + var self = this; if (!options) { options = {}; } @@ -480,9 +378,27 @@ function Client(address, credentials, options) { options['grpc.primary_user_agent'] = ''; } options['grpc.primary_user_agent'] += 'grpc-node/' + version; + + // Resolve interceptor options and assign interceptors to each method + var interceptor_providers = options.interceptor_providers || []; + var interceptors = options.interceptors || []; + if (interceptor_providers.length && interceptors.length) { + throw new client_interceptors.InterceptorConfigurationError( + 'Both interceptors and interceptor_providers were passed as options ' + + 'to the client constructor. Only one of these is allowed.'); + } + _.each(self.$method_definitions, function(method_definition, method_name) { + self[method_name].interceptors = client_interceptors + .resolveInterceptorProviders(interceptor_providers, method_definition) + .concat(interceptors); + }); + + // Exclude interceptor options which have already been consumed + var channel_options = _.omit(options, + ['interceptors', 'interceptor_providers']); /* Private fields use $ as a prefix instead of _ because it is an invalid * prefix of a method name */ - this.$channel = new grpc.Channel(address, credentials, options); + this.$channel = new grpc.Channel(address, credentials, channel_options); } exports.Client = Client; @@ -497,7 +413,7 @@ exports.Client = Client; /** * Make a unary request to the given method, using the given serialize * and deserialize functions, with the given argument. - * @param {string} method The name of the method to request + * @param {string} path The path of the method to request * @param {grpc~serialize} serialize The serialization function for * inputs * @param {grpc~deserialize} deserialize The deserialization @@ -506,11 +422,11 @@ exports.Client = Client; * serialize * @param {grpc.Metadata=} metadata Metadata to add to the call * @param {grpc.Client~CallOptions=} options Options map - * @param {grpc.Client~requestCallback} callback The callback to + * @param {grpc.Client~requestCallback} callback The callback * for when the response is received * @return {grpc~ClientUnaryCall} An event emitter for stream related events */ -Client.prototype.makeUnaryRequest = function(method, serialize, deserialize, +Client.prototype.makeUnaryRequest = function(path, serialize, deserialize, argument, metadata, options, callback) { if (options instanceof Function) { @@ -533,67 +449,50 @@ Client.prototype.makeUnaryRequest = function(method, serialize, deserialize, options = {}; } if (!((metadata instanceof Metadata) && - (options instanceof Object) && - (callback instanceof Function))) { - throw new Error("Argument mismatch in makeUnaryRequest"); + (options instanceof Object) && + (callback instanceof Function))) { + throw new Error('Argument mismatch in makeUnaryRequest'); } - var call = getCall(this.$channel, method, options); - var emitter = new ClientUnaryCall(call); + + var method_name = this.$method_names[path]; + var constructor_interceptors = this[method_name] ? + this[method_name].interceptors : + null; + var method_definition = options.method_definition = { + path: path, + requestStream: false, + responseStream: false, + requestSerialize: serialize, + responseDeserialize: deserialize + }; + metadata = metadata.clone(); - var client_batch = {}; - var message = serialize(argument); - if (options) { - message.grpcWriteFlags = options.flags; - } - client_batch[grpc.opType.SEND_INITIAL_METADATA] = - metadata._getCoreRepresentation(); - client_batch[grpc.opType.SEND_MESSAGE] = message; - client_batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true; - client_batch[grpc.opType.RECV_INITIAL_METADATA] = true; - client_batch[grpc.opType.RECV_MESSAGE] = true; - client_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true; - call.startBatch(client_batch, function(err, response) { - response.status.metadata = Metadata._fromCoreRepresentation( - response.status.metadata); - var status = response.status; - var error; - var deserialized; - emitter.emit('metadata', Metadata._fromCoreRepresentation( - response.metadata)); - if (status.code === constants.status.OK) { - if (err) { - // Got a batch error, but OK status. Something went wrong - callback(err); - return; - } else { - try { - deserialized = deserialize(response.read); - } catch (e) { - /* Change status to indicate bad server response. This will result - * in passing an error to the callback */ - status = { - code: constants.status.INTERNAL, - details: 'Failed to parse server response' - }; - } - } - } - if (status.code !== constants.status.OK) { - error = new createStatusError(status); - callback(error); - } else { - callback(null, deserialized); - } - emitter.emit('status', status); - }); + var intercepting_call = client_interceptors.getInterceptingCall( + method_definition, + options, + constructor_interceptors, + this.$channel, + callback + ); + var emitter = new ClientUnaryCall(intercepting_call); + var last_listener = client_interceptors.getLastListener( + method_definition, + emitter, + callback + ); + + intercepting_call.start(metadata, last_listener); + intercepting_call.sendMessage(argument); + intercepting_call.halfClose(); + return emitter; }; /** * Make a client stream request to the given method, using the given serialize * and deserialize functions, with the given argument. - * @param {string} method The name of the method to request + * @param {string} path The path of the method to request * @param {grpc~serialize} serialize The serialization function for * inputs * @param {grpc~deserialize} deserialize The deserialization @@ -601,14 +500,14 @@ Client.prototype.makeUnaryRequest = function(method, serialize, deserialize, * @param {grpc.Metadata=} metadata Array of metadata key/value pairs to add to * the call * @param {grpc.Client~CallOptions=} options Options map - * @param {grpc.Client~requestCallback} callback The callback to for when the + * @param {grpc.Client~requestCallback} callback The callback for when the * response is received * @return {grpc~ClientWritableStream} An event emitter for stream related * events */ -Client.prototype.makeClientStreamRequest = function(method, serialize, - deserialize, metadata, - options, callback) { +Client.prototype.makeClientStreamRequest = function(path, serialize, + deserialize, metadata, + options, callback) { if (options instanceof Function) { callback = options; if (metadata instanceof Metadata) { @@ -629,68 +528,48 @@ Client.prototype.makeClientStreamRequest = function(method, serialize, options = {}; } if (!((metadata instanceof Metadata) && - (options instanceof Object) && - (callback instanceof Function))) { - throw new Error("Argument mismatch in makeClientStreamRequest"); + (options instanceof Object) && + (callback instanceof Function))) { + throw new Error('Argument mismatch in makeClientStreamRequest'); } - var call = getCall(this.$channel, method, options); + + var method_name = this.$method_names[path]; + var constructor_interceptors = this[method_name] ? + this[method_name].interceptors : + null; + var method_definition = options.method_definition = { + path: path, + requestStream: true, + responseStream: false, + requestSerialize: serialize, + responseDeserialize: deserialize + }; + metadata = metadata.clone(); - var stream = new ClientWritableStream(call, serialize); - var metadata_batch = {}; - metadata_batch[grpc.opType.SEND_INITIAL_METADATA] = - metadata._getCoreRepresentation(); - metadata_batch[grpc.opType.RECV_INITIAL_METADATA] = true; - call.startBatch(metadata_batch, function(err, response) { - if (err) { - // The call has stopped for some reason. A non-OK status will arrive - // in the other batch. - return; - } - stream.emit('metadata', Metadata._fromCoreRepresentation( - response.metadata)); - }); - var client_batch = {}; - client_batch[grpc.opType.RECV_MESSAGE] = true; - client_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true; - call.startBatch(client_batch, function(err, response) { - response.status.metadata = Metadata._fromCoreRepresentation( - response.status.metadata); - var status = response.status; - var error; - var deserialized; - if (status.code === constants.status.OK) { - if (err) { - // Got a batch error, but OK status. Something went wrong - callback(err); - return; - } else { - try { - deserialized = deserialize(response.read); - } catch (e) { - /* Change status to indicate bad server response. This will result - * in passing an error to the callback */ - status = { - code: constants.status.INTERNAL, - details: 'Failed to parse server response' - }; - } - } - } - if (status.code !== constants.status.OK) { - error = createStatusError(status); - callback(error); - } else { - callback(null, deserialized); - } - stream.emit('status', status); - }); - return stream; + + var intercepting_call = client_interceptors.getInterceptingCall( + method_definition, + options, + constructor_interceptors, + this.$channel, + callback + ); + var emitter = new ClientWritableStream(intercepting_call); + var last_listener = client_interceptors.getLastListener( + method_definition, + emitter, + callback + ); + + intercepting_call.start(metadata, last_listener); + + return emitter; }; /** * Make a server stream request to the given method, with the given serialize * and deserialize function, using the given argument - * @param {string} method The name of the method to request + * @param {string} path The path of the method to request * @param {grpc~serialize} serialize The serialization function for inputs * @param {grpc~deserialize} deserialize The deserialization * function for outputs @@ -702,7 +581,7 @@ Client.prototype.makeClientStreamRequest = function(method, serialize, * @return {grpc~ClientReadableStream} An event emitter for stream related * events */ -Client.prototype.makeServerStreamRequest = function(method, serialize, +Client.prototype.makeServerStreamRequest = function(path, serialize, deserialize, argument, metadata, options) { if (!(metadata instanceof Metadata)) { @@ -713,48 +592,47 @@ Client.prototype.makeServerStreamRequest = function(method, serialize, options = {}; } if (!((metadata instanceof Metadata) && (options instanceof Object))) { - throw new Error("Argument mismatch in makeServerStreamRequest"); + throw new Error('Argument mismatch in makeServerStreamRequest'); } - var call = getCall(this.$channel, method, options); + + var method_name = this.$method_names[path]; + var constructor_interceptors = this[method_name] ? + this[method_name].interceptors : + null; + var method_definition = options.method_definition = { + path: path, + requestStream: false, + responseStream: true, + requestSerialize: serialize, + responseDeserialize: deserialize + }; + metadata = metadata.clone(); - var stream = new ClientReadableStream(call, deserialize); - var start_batch = {}; - var message = serialize(argument); - if (options) { - message.grpcWriteFlags = options.flags; - } - start_batch[grpc.opType.SEND_INITIAL_METADATA] = - metadata._getCoreRepresentation(); - start_batch[grpc.opType.RECV_INITIAL_METADATA] = true; - start_batch[grpc.opType.SEND_MESSAGE] = message; - start_batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true; - call.startBatch(start_batch, function(err, response) { - if (err) { - // The call has stopped for some reason. A non-OK status will arrive - // in the other batch. - return; - } - stream.emit('metadata', Metadata._fromCoreRepresentation( - response.metadata)); - }); - var status_batch = {}; - status_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true; - call.startBatch(status_batch, function(err, response) { - if (err) { - stream.emit('error', err); - return; - } - response.status.metadata = Metadata._fromCoreRepresentation( - response.status.metadata); - stream._receiveStatus(response.status); - }); - return stream; -}; + var emitter = new ClientReadableStream(); + var intercepting_call = client_interceptors.getInterceptingCall( + method_definition, + options, + constructor_interceptors, + this.$channel, + emitter + ); + emitter.call = intercepting_call; + var last_listener = client_interceptors.getLastListener( + method_definition, + emitter + ); + + intercepting_call.start(metadata, last_listener); + intercepting_call.sendMessage(argument); + intercepting_call.halfClose(); + + return emitter; +}; /** * Make a bidirectional stream request with this method on the given channel. - * @param {string} method The name of the method to request + * @param {string} path The path of the method to request * @param {grpc~serialize} serialize The serialization function for inputs * @param {grpc~deserialize} deserialize The deserialization * function for outputs @@ -763,7 +641,7 @@ Client.prototype.makeServerStreamRequest = function(method, serialize, * @param {grpc.Client~CallOptions=} options Options map * @return {grpc~ClientDuplexStream} An event emitter for stream related events */ -Client.prototype.makeBidiStreamRequest = function(method, serialize, +Client.prototype.makeBidiStreamRequest = function(path, serialize, deserialize, metadata, options) { if (!(metadata instanceof Metadata)) { @@ -774,36 +652,40 @@ Client.prototype.makeBidiStreamRequest = function(method, serialize, options = {}; } if (!((metadata instanceof Metadata) && (options instanceof Object))) { - throw new Error("Argument mismatch in makeBidiStreamRequest"); + throw new Error('Argument mismatch in makeBidiStreamRequest'); } - var call = getCall(this.$channel, method, options); + + var method_name = this.$method_names[path]; + var constructor_interceptors = this[method_name] ? + this[method_name].interceptors : + null; + var method_definition = options.method_definition = { + path: path, + requestStream: true, + responseStream: true, + requestSerialize: serialize, + responseDeserialize: deserialize + }; + metadata = metadata.clone(); - var stream = new ClientDuplexStream(call, serialize, deserialize); - var start_batch = {}; - start_batch[grpc.opType.SEND_INITIAL_METADATA] = - metadata._getCoreRepresentation(); - start_batch[grpc.opType.RECV_INITIAL_METADATA] = true; - call.startBatch(start_batch, function(err, response) { - if (err) { - // The call has stopped for some reason. A non-OK status will arrive - // in the other batch. - return; - } - stream.emit('metadata', Metadata._fromCoreRepresentation( - response.metadata)); - }); - var status_batch = {}; - status_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true; - call.startBatch(status_batch, function(err, response) { - if (err) { - stream.emit('error', err); - return; - } - response.status.metadata = Metadata._fromCoreRepresentation( - response.status.metadata); - stream._receiveStatus(response.status); - }); - return stream; + + var emitter = new ClientDuplexStream(); + var intercepting_call = client_interceptors.getInterceptingCall( + method_definition, + options, + constructor_interceptors, + this.$channel, + emitter + ); + emitter.call = intercepting_call; + var last_listener = client_interceptors.getLastListener( + method_definition, + emitter + ); + + intercepting_call.start(metadata, last_listener); + + return emitter; }; /** @@ -859,10 +741,10 @@ Client.prototype.waitForReady = function(deadline, callback) { * @private */ var requester_funcs = { - unary: Client.prototype.makeUnaryRequest, - server_stream: Client.prototype.makeServerStreamRequest, - client_stream: Client.prototype.makeClientStreamRequest, - bidi: Client.prototype.makeBidiStreamRequest + [methodTypes.UNARY]: Client.prototype.makeUnaryRequest, + [methodTypes.CLIENT_STREAMING]: Client.prototype.makeClientStreamRequest, + [methodTypes.SERVER_STREAMING]: Client.prototype.makeServerStreamRequest, + [methodTypes.BIDI_STREAMING]: Client.prototype.makeBidiStreamRequest }; function getDefaultValues(metadata, options) { @@ -878,7 +760,7 @@ function getDefaultValues(metadata, options) { * @access private */ var deprecated_request_wrap = { - unary: function(makeUnaryRequest) { + [methodTypes.UNARY]: function(makeUnaryRequest) { return function makeWrappedUnaryRequest(argument, callback, metadata, options) { /* jshint validthis: true */ @@ -887,7 +769,7 @@ var deprecated_request_wrap = { opt_args.options, callback); }; }, - client_stream: function(makeServerStreamRequest) { + [methodTypes.CLIENT_STREAMING]: function(makeServerStreamRequest) { return function makeWrappedClientStreamRequest(callback, metadata, options) { /* jshint validthis: true */ @@ -896,8 +778,8 @@ var deprecated_request_wrap = { opt_args.options, callback); }; }, - server_stream: _.identity, - bidi: _.identity + [methodTypes.SERVER_STREAMING]: _.identity, + [methodTypes.BIDI_STREAMING]: _.identity }; /** @@ -932,38 +814,29 @@ exports.makeClientConstructor = function(methods, serviceName, } util.inherits(ServiceClient, Client); + ServiceClient.prototype.$method_definitions = methods; + ServiceClient.prototype.$method_names = {}; _.each(methods, function(attrs, name) { - var method_type; if (_.startsWith(name, '$')) { throw new Error('Method names cannot start with $'); } - if (attrs.requestStream) { - if (attrs.responseStream) { - method_type = 'bidi'; - } else { - method_type = 'client_stream'; - } - } else { - if (attrs.responseStream) { - method_type = 'server_stream'; - } else { - method_type = 'unary'; - } - } - var serialize = attrs.requestSerialize; - var deserialize = attrs.responseDeserialize; + var method_type = common.getMethodType(attrs); var method_func = _.partial(requester_funcs[method_type], attrs.path, - serialize, deserialize); + attrs.requestSerialize, + attrs.responseDeserialize); if (class_options.deprecatedArgumentOrder) { - ServiceClient.prototype[name] = deprecated_request_wrap(method_func); + ServiceClient.prototype[name] = + deprecated_request_wrap[method_type](method_func); } else { ServiceClient.prototype[name] = method_func; } + ServiceClient.prototype.$method_names[attrs.path] = name; // Associate all provided attributes with the method _.assign(ServiceClient.prototype[name], attrs); if (attrs.originalName) { - ServiceClient.prototype[attrs.originalName] = ServiceClient.prototype[name]; + ServiceClient.prototype[attrs.originalName] = + ServiceClient.prototype[name]; } }); @@ -984,6 +857,17 @@ exports.getClientChannel = function(client) { return Client.prototype.getChannel.call(client); }; +/** + * Gets a map of client method names to interceptor stacks. + * @param {grpc.Client} client + * @returns {Object.} + */ +exports.getClientInterceptors = function(client) { + return _.mapValues(client.$method_definitions, function(def, name) { + return client[name].interceptors; + }); +}; + /** * Wait for the client to be ready. The callback will be called when the * client has successfully connected to the server, and it will be called @@ -1002,3 +886,8 @@ exports.getClientChannel = function(client) { exports.waitForClientReady = function(client, deadline, callback) { Client.prototype.waitForReady.call(client, deadline, callback); }; + +exports.StatusBuilder = client_interceptors.StatusBuilder; +exports.ListenerBuilder = client_interceptors.ListenerBuilder; +exports.RequesterBuilder = client_interceptors.RequesterBuilder; +exports.InterceptingCall = client_interceptors.InterceptingCall; diff --git a/packages/grpc-native-core/src/client_interceptors.js b/packages/grpc-native-core/src/client_interceptors.js new file mode 100644 index 000000000..01023a52b --- /dev/null +++ b/packages/grpc-native-core/src/client_interceptors.js @@ -0,0 +1,1489 @@ +/** + * @license + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +/** + * Client Interceptors + * + * This module describes the interceptor framework for clients. + * An interceptor is a function which takes an options object and a nextCall + * function and returns an InterceptingCall: + * + * ``` + * var interceptor = function(options, nextCall) { + * return new InterceptingCall(nextCall(options)); + * } + * ``` + * + * The interceptor function must return an InterceptingCall object. Returning + * `new InterceptingCall(nextCall(options))` will satisfy the contract (but + * provide no interceptor functionality). `nextCall` is a function which will + * generate the next interceptor in the chain. + * + * To implement interceptor functionality, create a requester and pass it to + * the InterceptingCall constructor: + * + * `return new InterceptingCall(nextCall(options), requester);` + * + * A requester is a POJO with zero or more of the following methods: + * + * `start(metadata, listener, next)` + * * To continue, call next(metadata, listener). Listeners are described + * * below. + * + * `sendMessage(message, next)` + * * To continue, call next(message). + * + * `halfClose(next)` + * * To continue, call next(). + * + * `cancel(message, next)` + * * To continue, call next(). + * + * A listener is a POJO with one or more of the following methods: + * + * `onReceiveMetadata(metadata, next)` + * * To continue, call next(metadata) + * + * `onReceiveMessage(message, next)` + * * To continue, call next(message) + * + * `onReceiveStatus(status, next)` + * * To continue, call next(status) + * + * A listener is provided by the requester's `start` method. The provided + * listener implements all the inbound interceptor methods, which can be called + * to short-circuit the gRPC call. + * + * Three usage patterns are supported for listeners: + * 1) Pass the listener along without modification: `next(metadata, listener)`. + * In this case the interceptor declines to intercept any inbound operations. + * 2) Create a new listener with one or more inbound interceptor methods and + * pass it to `next`. In this case the interceptor will fire on the inbound + * operations implemented in the new listener. + * 3) Make direct inbound calls to the provided listener's methods. This + * short-circuits the interceptor stack. + * + * Do not modify the listener passed in. Either pass it along unmodified, + * ignore it, or call methods on it to short-circuit the call. + * + * To intercept errors, implement the `onReceiveStatus` method and test for + * `status.code !== grpc.status.OK`. + * + * To intercept trailers, examine `status.metadata` in the `onReceiveStatus` + * method. + * + * This is a trivial implementation of all interceptor methods: + * var interceptor = function(options, nextCall) { + * return new InterceptingCall(nextCall(options), { + * start: function(metadata, listener, next) { + * next(metadata, { + * onReceiveMetadata: function (metadata, next) { + * next(metadata); + * }, + * onReceiveMessage: function (message, next) { + * next(message); + * }, + * onReceiveStatus: function (status, next) { + * next(status); + * }, + * }); + * }, + * sendMessage: function(message, next) { + * next(message); + * }, + * halfClose: function(next) { + * next(); + * }, + * cancel: function(message, next) { + * next(); + * } + * }); + * }; + * + * This is an interceptor with a single method: + * var interceptor = function(options, nextCall) { + * return new InterceptingCall(nextCall(options), { + * sendMessage: function(message, next) { + * next(message); + * } + * }); + * }; + * + * Builders are provided for convenience: StatusBuilder, ListenerBuilder, + * and RequesterBuilder + * + * gRPC client operations use this mapping to interceptor methods: + * + * grpc.opType.SEND_INITIAL_METADATA -> start + * grpc.opType.SEND_MESSAGE -> sendMessage + * grpc.opType.SEND_CLOSE_FROM_CLIENT -> halfClose + * grpc.opType.RECV_INITIAL_METADATA -> onReceiveMetadata + * grpc.opType.RECV_MESSAGE -> onReceiveMessage + * grpc.opType.RECV_STATUS_ON_CLIENT -> onReceiveStatus + * + * @module + */ + +'use strict'; + +var _ = require('lodash'); +var grpc = require('./grpc_extension'); +var Metadata = require('./metadata'); +var constants = require('./constants'); +var common = require('./common'); +var methodTypes = constants.methodTypes; +var EventEmitter = require('events').EventEmitter; + +/** + * A custom error thrown when interceptor configuration fails. + * @param {string} message The error message + * @param {object=} extra + * @constructor + */ +var InterceptorConfigurationError = + function InterceptorConfigurationError(message, extra) { + Error.captureStackTrace(this, this.constructor); + this.name = this.constructor.name; + this.message = message; + this.extra = extra; + }; + +require('util').inherits(InterceptorConfigurationError, Error); + +/** + * A builder for gRPC status objects. + * @constructor + */ +function StatusBuilder() { + this.code = null; + this.details = null; + this.metadata = null; +} + +/** + * Adds a status code to the builder. + * @param {number} code The status code. + * @return {StatusBuilder} + */ +StatusBuilder.prototype.withCode = function(code) { + this.code = code; + return this; +}; + +/** + * Adds details to the builder. + * @param {string} details A status message. + * @return {StatusBuilder} + */ +StatusBuilder.prototype.withDetails = function(details) { + this.details = details; + return this; +}; + +/** + * Adds metadata to the builder. + * @param {Metadata} metadata The gRPC status metadata. + * @return {StatusBuilder} + */ +StatusBuilder.prototype.withMetadata = function(metadata) { + this.metadata = metadata; + return this; +}; + +/** + * Builds the status object. + * @return {grpc~StatusObject} A gRPC status. + */ +StatusBuilder.prototype.build = function() { + var status = {}; + if (this.code !== undefined) { + status.code = this.code; + } + if (this.details) { + status.details = this.details; + } + if (this.metadata) { + status.metadata = this.metadata; + } + return status; +}; + +/** + * A builder for listener interceptors. + * @constructor + */ +function ListenerBuilder() { + this.metadata = null; + this.message = null; + this.status = null; +} + +/** + * Adds an onReceiveMetadata method to the builder. + * @param {MetadataListener} on_receive_metadata A listener method for + * receiving metadata. + * @return {ListenerBuilder} + */ +ListenerBuilder.prototype.withOnReceiveMetadata = + function(on_receive_metadata) { + this.metadata = on_receive_metadata; + return this; + }; + +/** + * Adds an onReceiveMessage method to the builder. + * @param {MessageListener} on_receive_message A listener method for receiving + * messages. + * @return {ListenerBuilder} + */ +ListenerBuilder.prototype.withOnReceiveMessage = function(on_receive_message) { + this.message = on_receive_message; + return this; +}; + +/** + * Adds an onReceiveStatus method to the builder. + * @param {StatusListener} on_receive_status A listener method for receiving + * status. + * @return {ListenerBuilder} + */ +ListenerBuilder.prototype.withOnReceiveStatus = function(on_receive_status) { + this.status = on_receive_status; + return this; +}; + +/** + * Builds the call listener. + * @return {grpc~Listener} + */ +ListenerBuilder.prototype.build = function() { + var self = this; + var listener = {}; + listener.onReceiveMetadata = self.metadata; + listener.onReceiveMessage = self.message; + listener.onReceiveStatus = self.status; + return listener; +}; + +/** + * A builder for the outbound methods of an interceptor. + * @constructor + */ +function RequesterBuilder() { + this.start = null; + this.message = null; + this.half_close = null; + this.cancel = null; +} + +/** + * Add a metadata requester to the builder. + * @param {MetadataRequester} start A requester method for handling metadata. + * @return {RequesterBuilder} + */ +RequesterBuilder.prototype.withStart = function(start) { + this.start = start; + return this; +}; + +/** + * Add a message requester to the builder. + * @param {MessageRequester} send_message A requester method for handling + * messages. + * @return {RequesterBuilder} + */ +RequesterBuilder.prototype.withSendMessage = function(send_message) { + this.message = send_message; + return this; +}; + +/** + * Add a close requester to the builder. + * @param {CloseRequester} half_close A requester method for handling client + * close. + * @return {RequesterBuilder} + */ +RequesterBuilder.prototype.withHalfClose = function(half_close) { + this.half_close = half_close; + return this; +}; + +/** + * Add a cancel requester to the builder. + * @param {CancelRequester} cancel A requester method for handling `cancel` + * @return {RequesterBuilder} + */ +RequesterBuilder.prototype.withCancel = function(cancel) { + this.cancel = cancel; + return this; +}; + +/** + * Builds the requester's interceptor methods. + * @return {grpc~Requester} + */ +RequesterBuilder.prototype.build = function() { + var requester = {}; + requester.start = this.start; + requester.sendMessage = this.message; + requester.halfClose = this.half_close; + requester.cancel = this.cancel; + return requester; +}; + +/** + * Transforms a list of interceptor providers into interceptors. + * @param {InterceptorProvider[]} providers + * @param {grpc~MethodDefinition} method_definition + * @return {null|Interceptor[]} + */ +var resolveInterceptorProviders = function(providers, method_definition) { + if (!_.isArray(providers)) { + return null; + } + var interceptors = []; + for (var i = 0; i < providers.length; i++) { + var provider = providers[i]; + var interceptor = provider(method_definition); + if (interceptor) { + interceptors.push(interceptor); + } + } + return interceptors; +}; + +/** + * Resolves interceptor options at call invocation time + * @param {grpc.Client~CallOptions} options The call options passed to a gRPC + * call. + * @param {Interceptor[]} [options.interceptors] + * @param {InterceptorProvider[]} [options.interceptor_providers] + * @param {grpc~MethodDefinition} method_definition + * @return {null|function[]} + */ +var resolveInterceptorOptions = function(options, method_definition) { + var provided = resolveInterceptorProviders(options.interceptor_providers, + method_definition); + if (_.isArray(options.interceptors) && _.isArray(provided)) { + throw new InterceptorConfigurationError( + 'Both interceptors and interceptor_providers were passed as options ' + + 'to the call invocation. Only one of these is allowed.'); + } + if (_.isArray(options.interceptors)) { + return options.interceptors; + } + if (_.isArray(provided)) { + return provided; + } + return null; +}; + +/** + * A chainable gRPC call proxy which will delegate to an optional requester + * object. By default, interceptor methods will chain to next_call. If a + * requester is provided which implements an interceptor method, that + * requester method will be executed as part of the chain. + * @param {InterceptingCall|null} next_call The next call in the chain + * @param {grpc~Requester=} requester Interceptor methods to handle request + * operations. + * @constructor + */ +function InterceptingCall(next_call, requester) { + this.next_call = next_call; + this.requester = requester; +} + +/** + * Get the next method in the chain or a no-op function if we are at the end + * of the chain + * @param {string} method_name + * @return {function} The next method in the chain + * @private + */ +InterceptingCall.prototype._getNextCall = function(method_name) { + return this.next_call ? + this.next_call[method_name].bind(this.next_call) : + function(){}; +}; + +/** + * Call the next method in the chain. This will either be on the next + * InterceptingCall (next_call), or the requester if the requester + * implements the method. + * @param {string} method_name The name of the interceptor method + * @param {array=} args Payload arguments for the operation + * @param {function=} next The next InterceptingCall's method + * @return {null} + * @private + */ +InterceptingCall.prototype._callNext = function(method_name, args, next) { + var args_array = args || []; + var next_call = next ? next : this._getNextCall(method_name); + if (this.requester && this.requester[method_name]) { + // Avoid using expensive `apply` calls + var num_args = args_array.length; + switch (num_args) { + case 0: + return this.requester[method_name](next_call); + case 1: + return this.requester[method_name](args_array[0], next_call); + case 2: + return this.requester[method_name](args_array[0], args_array[1], + next_call); + } + } else { + return next_call(args_array[0], args_array[1]); + } +}; + +/** + * Starts a call through the outbound interceptor chain and adds an element to + * the reciprocal inbound listener chain. + * @param {grpc.Metadata} metadata The outgoing metadata. + * @param {grpc~Listener} listener An intercepting listener for inbound + * operations. + */ +InterceptingCall.prototype.start = function(metadata, listener) { + var self = this; + + // If the listener provided is an InterceptingListener, use it. Otherwise, we + // must be at the end of the listener chain, and any listener operations + // should be terminated in an EndListener. + var next_listener = _getInterceptingListener(listener, new EndListener()); + + // Build the next method in the interceptor chain + var next = function(metadata, current_listener) { + // If there is a next call in the chain, run it. Otherwise do nothing. + if (self.next_call) { + // Wire together any listener provided with the next listener + var listener = _getInterceptingListener(current_listener, next_listener); + self.next_call.start(metadata, listener); + } + }; + this._callNext('start', [metadata, next_listener], next); +}; + +/** + * Pass a message through the interceptor chain. + * @param {jspb.Message} message + */ +InterceptingCall.prototype.sendMessage = function(message) { + this._callNext('sendMessage', [message]); +}; + +/** + * Run a close operation through the interceptor chain + */ +InterceptingCall.prototype.halfClose = function() { + this._callNext('halfClose'); +}; + +/** + * Run a cancel operation through the interceptor chain + */ +InterceptingCall.prototype.cancel = function() { + this._callNext('cancel'); +}; + +/** + * Run a cancelWithStatus operation through the interceptor chain. + * @param {grpc~StatusObject} status + * @param {string} message + */ +InterceptingCall.prototype.cancelWithStatus = function(status, message) { + this._callNext('cancelWithStatus', [status, message]); +}; + +/** + * Pass a getPeer call down to the base gRPC call (should not be intercepted) + * @return {object} + */ +InterceptingCall.prototype.getPeer = function() { + return this._callNext('getPeer'); +}; + +/** + * For streaming calls, we need to transparently pass the stream's context + * through the interceptor chain. Passes the context between InterceptingCalls + * but hides it from any requester implementations. + * @param {object} context Carries objects needed for streaming operations. + * @param {jspb.Message} message The message to send. + */ +InterceptingCall.prototype.sendMessageWithContext = function(context, message) { + var next = this.next_call ? + this.next_call.sendMessageWithContext.bind(this.next_call, context) : + context; + this._callNext('sendMessage', [message], next); +}; + +/** + * For receiving streaming messages, we need to seed the base interceptor with + * the streaming context to create a RECV_MESSAGE batch. + * @param {object} context Carries objects needed for streaming operations + */ +InterceptingCall.prototype.recvMessageWithContext = function(context) { + this._callNext('recvMessageWithContext', [context]); +}; + +/** + * A chain-able listener object which will delegate to a custom listener when + * appropriate. + * @param {InterceptingListener|null} next_listener The next + * InterceptingListener in the chain + * @param {grpc~Listener=} delegate A custom listener object which may implement + * specific operations + * @constructor + */ +function InterceptingListener(next_listener, delegate) { + this.delegate = delegate || {}; + this.next_listener = next_listener; +} + +/** + * Get the next method in the chain or a no-op function if we are at the end + * of the chain. + * @param {string} method_name The name of the listener method. + * @return {function} The next method in the chain + * @private + */ +InterceptingListener.prototype._getNextListener = function(method_name) { + return this.next_listener ? + this.next_listener[method_name].bind(this.next_listener) : + function(){}; +}; + +/** + * Call the next method in the chain. This will either be on the next + * InterceptingListener (next_listener), or the requester if the requester + * implements the method. + * @param {string} method_name The name of the interceptor method + * @param {array=} args Payload arguments for the operation + * @param {function=} next The next InterceptingListener's method + * @return {null} + * @private + */ +InterceptingListener.prototype._callNext = function(method_name, args, next) { + var args_array = args || []; + var next_listener = next ? next : this._getNextListener(method_name); + if (this.delegate && this.delegate[method_name]) { + // Avoid using expensive `apply` calls + var num_args = args_array.length; + switch (num_args) { + case 0: + return this.delegate[method_name](next_listener); + case 1: + return this.delegate[method_name](args_array[0], next_listener); + case 2: + return this.delegate[method_name](args_array[0], args_array[1], + next_listener); + } + } else { + return next_listener(args_array[0], args_array[1]); + } +}; +/** + * Inbound metadata receiver. + * @param {Metadata} metadata + */ +InterceptingListener.prototype.onReceiveMetadata = function(metadata) { + this._callNext('onReceiveMetadata', [metadata]); +}; + +/** + * Inbound message receiver. + * @param {jspb.Message} message + */ +InterceptingListener.prototype.onReceiveMessage = function(message) { + this._callNext('onReceiveMessage', [message]); +}; + +/** + * When intercepting streaming message, we need to pass the streaming context + * transparently along the chain. Hides the context from the delegate listener + * methods. + * @param {object} context Carries objects needed for streaming operations. + * @param {jspb.Message} message The message received. + */ +InterceptingListener.prototype.recvMessageWithContext = function(context, + message) { + var fallback = this.next_listener.recvMessageWithContext; + var next_method = this.next_listener ? + fallback.bind(this.next_listener, context) : + context; + if (this.delegate.onReceiveMessage) { + this.delegate.onReceiveMessage(message, next_method, context); + } else { + next_method(message); + } +}; + +/** + * Inbound status receiver. + * @param {grpc~StatusObject} status + */ +InterceptingListener.prototype.onReceiveStatus = function(status) { + this._callNext('onReceiveStatus', [status]); +}; + +/** + * A dead-end listener used to terminate a call chain. Used when an interceptor + * creates a branch chain, when the branch returns the listener chain will + * terminate here. + * @constructor + */ +function EndListener() {} +EndListener.prototype.onReceiveMetadata = function(){}; +EndListener.prototype.onReceiveMessage = function(){}; +EndListener.prototype.onReceiveStatus = function(){}; +EndListener.prototype.recvMessageWithContext = function(){}; + +/** + * Get a call object built with the provided options. + * @param {grpc.Channel} channel + * @param {string} path + * @param {grpc.Client~CallOptions=} options Options object. + */ +function getCall(channel, path, options) { + var deadline; + var host; + var parent; + var propagate_flags; + var credentials; + if (options) { + deadline = options.deadline; + host = options.host; + parent = _.get(options, 'parent.call'); + propagate_flags = options.propagate_flags; + credentials = options.credentials; + } + if (deadline === undefined) { + deadline = Infinity; + } + var call = new grpc.Call(channel, path, deadline, host, + parent, propagate_flags); + if (credentials) { + call.setCredentials(credentials); + } + return call; +} + +var OP_DEPENDENCIES = { + [grpc.opType.SEND_MESSAGE]: [grpc.opType.SEND_INITIAL_METADATA], + [grpc.opType.SEND_CLOSE_FROM_CLIENT]: [grpc.opType.SEND_MESSAGE], + [grpc.opType.RECV_MESSAGE]: [grpc.opType.SEND_INITIAL_METADATA] +}; + +/** + * Produces a callback triggered by streaming response messages. + * @private + * @param {EventEmitter} emitter + * @param {grpc.internal~Call} call + * @param {function} get_listener Returns a grpc~Listener. + * @param {grpc~deserialize} deserialize + * @return {Function} + */ +function _getStreamReadCallback(emitter, call, get_listener, deserialize) { + return function (err, response) { + if (err) { + // Something has gone wrong. Stop reading and wait for status + emitter.finished = true; + emitter._readsDone(); + return; + } + var data = response.read; + var deserialized; + try { + deserialized = deserialize(data); + } catch (e) { + emitter._readsDone({ + code: constants.status.INTERNAL, + details: 'Failed to parse server response' + }); + return; + } + if (data === null) { + emitter._readsDone(); + return; + } + var listener = get_listener(); + var context = { + call: call, + listener: listener + }; + listener.recvMessageWithContext(context, deserialized); + }; +} + +/** + * Tests whether a batch can be started. + * @private + * @param {number[]} batch_ops The operations in the batch we are checking. + * @param {number[]} completed_ops Previously completed operations. + * @return {boolean} + */ +function _areBatchRequirementsMet(batch_ops, completed_ops) { + var dependencies = _.flatMap(batch_ops, function(op) { + return OP_DEPENDENCIES[op] || []; + }); + var dependencies_met = _.intersection(dependencies, + batch_ops.concat(completed_ops)); + return _.isEqual(dependencies_met.sort(), dependencies.sort()); +} + +/** + * Enforces the order of operations for synchronous requests. If a batch's + * operations cannot be started because required operations have not started + * yet, the batch is deferred until requirements are met. + * @private + * @param {grpc.Client~Call} call + * @param {object} batch + * @param {object} batch_state + * @param {number[]} [batch_state.completed_ops] The ops already sent. + * @param {object} [batch_state.deferred_batches] Batches to be sent after + * their dependencies are fulfilled. + * @param {function} callback + * @return {object} + */ +function _startBatchIfReady(call, batch, batch_state, callback) { + var completed_ops = batch_state.completed_ops; + var deferred_batches = batch_state.deferred_batches; + var batch_ops = _.map(_.keys(batch), Number); + if (_areBatchRequirementsMet(batch_ops, completed_ops)) { + // Dependencies are met, start the batch and any deferred batches whose + // dependencies are met as a result. + call.startBatch(batch, callback); + completed_ops = _.union(completed_ops, batch_ops); + deferred_batches = _.flatMap(deferred_batches, function(deferred_batch) { + var deferred_batch_ops = _.map(_.keys(deferred_batch), Number); + if (_areBatchRequirementsMet(deferred_batch_ops, completed_ops)) { + call.startBatch(deferred_batch.batch, deferred_batch.callback); + return []; + } + return [deferred_batch]; + }); + } else { + // Dependencies are not met, defer the batch + deferred_batches = deferred_batches.concat({ + batch: batch, + callback: callback + }); + } + return { + completed_ops: completed_ops, + deferred_batches: deferred_batches + }; +} + +/** + * Produces an interceptor which will start gRPC batches for unary calls. + * @private + * @param {grpc~MethodDefinition} method_definition + * @param {grpc.Channel} channel + * @param {EventEmitter} emitter + * @param {function} callback + * @return {Interceptor} + */ +function _getUnaryInterceptor(method_definition, channel, emitter, callback) { + var serialize = method_definition.requestSerialize; + var deserialize = method_definition.responseDeserialize; + return function (options) { + var call = getCall(channel, method_definition.path, options); + var first_listener; + var final_requester = {}; + var batch_state = { + completed_ops: [], + deferred_batches: [] + }; + final_requester.start = function (metadata, listener) { + var batch = { + [grpc.opType.SEND_INITIAL_METADATA]: + metadata._getCoreRepresentation(), + }; + first_listener = listener; + batch_state = _startBatchIfReady(call, batch, batch_state, + function() {}); + }; + final_requester.sendMessage = function (message) { + var batch = { + [grpc.opType.SEND_MESSAGE]: serialize(message), + }; + batch_state = _startBatchIfReady(call, batch, batch_state, + function() {}); + }; + final_requester.halfClose = function () { + var batch = { + [grpc.opType.SEND_CLOSE_FROM_CLIENT]: true, + [grpc.opType.RECV_INITIAL_METADATA]: true, + [grpc.opType.RECV_MESSAGE]: true, + [grpc.opType.RECV_STATUS_ON_CLIENT]: true + }; + var callback = function (err, response) { + response.status.metadata = Metadata._fromCoreRepresentation( + response.status.metadata); + var status = response.status; + var deserialized; + if (status.code === constants.status.OK) { + if (err) { + // Got a batch error, but OK status. Something went wrong + callback(err); + return; + } else { + try { + deserialized = deserialize(response.read); + } catch (e) { + /* Change status to indicate bad server response. This + * will result in passing an error to the callback */ + status = { + code: constants.status.INTERNAL, + details: 'Failed to parse server response' + }; + } + } + } + response.metadata = + Metadata._fromCoreRepresentation(response.metadata); + first_listener.onReceiveMetadata(response.metadata); + first_listener.onReceiveMessage(deserialized); + first_listener.onReceiveStatus(status); + }; + batch_state = _startBatchIfReady(call, batch, batch_state, callback); + }; + final_requester.cancel = function () { + call.cancel(); + }; + final_requester.getPeer = function () { + return call.getPeer(); + }; + return new InterceptingCall(null, final_requester); + }; +} + +/** + * Produces an interceptor which will start gRPC batches for client streaming + * calls. + * @private + * @param {grpc~MethodDefinition} method_definition + * @param {grpc.Channel} channel + * @param {EventEmitter} emitter + * @param {function} callback + * @return {Interceptor} + */ +function _getClientStreamingInterceptor(method_definition, channel, emitter, + callback) { + var serialize = common.wrapIgnoreNull(method_definition.requestSerialize); + var deserialize = method_definition.responseDeserialize; + return function (options) { + var first_listener; + var call = getCall(channel, method_definition.path, options); + var final_requester = {}; + final_requester.start = function (metadata, listener) { + var metadata_batch = { + [grpc.opType.SEND_INITIAL_METADATA]: metadata._getCoreRepresentation(), + [grpc.opType.RECV_INITIAL_METADATA]: true + }; + first_listener = listener; + call.startBatch(metadata_batch, function (err, response) { + if (err) { + // The call has stopped for some reason. A non-OK status will arrive + // in the other batch. + return; + } + response.metadata = Metadata._fromCoreRepresentation(response.metadata); + listener.onReceiveMetadata(response.metadata); + }); + var recv_batch = {}; + recv_batch[grpc.opType.RECV_MESSAGE] = true; + recv_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true; + call.startBatch(recv_batch, function (err, response) { + response.status.metadata = Metadata._fromCoreRepresentation( + response.status.metadata); + var status = response.status; + var deserialized; + if (status.code === constants.status.OK) { + if (err) { + // Got a batch error, but OK status. Something went wrong + callback(err); + return; + } else { + try { + deserialized = deserialize(response.read); + } catch (e) { + /* Change status to indicate bad server response. This will result + * in passing an error to the callback */ + status = { + code: constants.status.INTERNAL, + details: 'Failed to parse server response' + }; + } + } + } + listener.onReceiveMessage(deserialized); + listener.onReceiveStatus(status); + }); + }; + final_requester.sendMessage = function (chunk, context) { + var message; + var callback = (context && context.callback) ? + context.callback : + function () { }; + var encoding = (context && context.encoding) ? + context.encoding : + ''; + try { + message = serialize(chunk); + } catch (e) { + /* Sending this error to the server and emitting it immediately on the + client may put the call in a slightly weird state on the client side, + but passing an object that causes a serialization failure is a misuse + of the API anyway, so that's OK. The primary purpose here is to give + the programmer a useful error and to stop the stream properly */ + call.cancelWithStatus(constants.status.INTERNAL, + 'Serialization failure'); + callback(e); + return; + } + if (_.isFinite(encoding)) { + /* Attach the encoding if it is a finite number. This is the closest we + * can get to checking that it is valid flags */ + message.grpcWriteFlags = encoding; + } + var batch = { + [grpc.opType.SEND_MESSAGE]: message + }; + call.startBatch(batch, function (err, event) { + callback(err, event); + }); + }; + final_requester.halfClose = function () { + var batch = { + [grpc.opType.SEND_CLOSE_FROM_CLIENT]: true + }; + call.startBatch(batch, function () { }); + }; + final_requester.cancel = function () { + call.cancel(); + }; + final_requester.getPeer = function() { + return call.getPeer(); + }; + return new InterceptingCall(null, final_requester); + }; +} + +/** + * Produces an interceptor which will start gRPC batches for server streaming + * calls. + * @private + * @param {grpc~MethodDefinition} method_definition + * @param {grpc.Channel} channel + * @param {EventEmitter} emitter + * @return {Interceptor} + */ +function _getServerStreamingInterceptor(method_definition, channel, emitter) { + var deserialize = common.wrapIgnoreNull( + method_definition.responseDeserialize); + var serialize = method_definition.requestSerialize; + return function (options) { + var batch_state = { + completed_ops: [], + deferred_batches: [] + }; + var call = getCall(channel, method_definition.path, options); + var final_requester = {}; + var first_listener; + var get_listener = function() { + return first_listener; + }; + final_requester.start = function(metadata, listener) { + first_listener = listener; + metadata = metadata.clone(); + var metadata_batch = { + [grpc.opType.SEND_INITIAL_METADATA]: metadata._getCoreRepresentation(), + [grpc.opType.RECV_INITIAL_METADATA]: true + }; + var callback = function(err, response) { + if (err) { + // The call has stopped for some reason. A non-OK status will arrive + // in the other batch. + return; + } + first_listener.onReceiveMetadata( + Metadata._fromCoreRepresentation(response.metadata)); + }; + batch_state = _startBatchIfReady(call, metadata_batch, batch_state, + callback); + var status_batch = { + [grpc.opType.RECV_STATUS_ON_CLIENT]: true + }; + call.startBatch(status_batch, function(err, response) { + if (err) { + emitter.emit('error', err); + return; + } + response.status.metadata = Metadata._fromCoreRepresentation( + response.status.metadata); + first_listener.onReceiveStatus(response.status); + }); + }; + final_requester.sendMessage = function(argument) { + var message = serialize(argument); + if (options) { + message.grpcWriteFlags = options.flags; + } + var send_batch = { + [grpc.opType.SEND_MESSAGE]: message + }; + var callback = function(err, response) { + if (err) { + // The call has stopped for some reason. A non-OK status will arrive + // in the other batch. + return; + } + }; + batch_state = _startBatchIfReady(call, send_batch, batch_state, callback); + }; + final_requester.halfClose = function() { + var batch = { + [grpc.opType.SEND_CLOSE_FROM_CLIENT]: true + }; + batch_state = _startBatchIfReady(call, batch, batch_state, function() {}); + }; + final_requester.recvMessageWithContext = function(context) { + var recv_batch = { + [grpc.opType.RECV_MESSAGE]: true + }; + var callback = _getStreamReadCallback(emitter, call, + get_listener, deserialize); + batch_state = _startBatchIfReady(call, recv_batch, batch_state, callback); + }; + final_requester.cancel = function() { + call.cancel(); + }; + final_requester.getPeer = function() { + return call.getPeer(); + }; + return new InterceptingCall(null, final_requester); + }; +} + +/** + * Produces an interceptor which will start gRPC batches for bi-directional + * calls. + * @private + * @param {grpc~MethodDefinition} method_definition + * @param {grpc.Channel} channel + * @param {EventEmitter} emitter + * @return {Interceptor} + */ +function _getBidiStreamingInterceptor(method_definition, channel, emitter) { + var serialize = common.wrapIgnoreNull(method_definition.requestSerialize); + var deserialize = common.wrapIgnoreNull( + method_definition.responseDeserialize); + return function (options) { + var first_listener; + var get_listener = function() { + return first_listener; + }; + var call = getCall(channel, method_definition.path, options); + var final_requester = {}; + final_requester.start = function (metadata, listener) { + var metadata_batch = { + [grpc.opType.SEND_INITIAL_METADATA]: metadata._getCoreRepresentation(), + [grpc.opType.RECV_INITIAL_METADATA]: true + }; + first_listener = listener; + call.startBatch(metadata_batch, function (err, response) { + if (err) { + // The call has stopped for some reason. A non-OK status will arrive + // in the other batch. + return; + } + response.metadata = Metadata._fromCoreRepresentation(response.metadata); + listener.onReceiveMetadata(response.metadata); + }); + var recv_batch = {}; + recv_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true; + call.startBatch(recv_batch, function (err, response) { + var status = response.status; + if (status.code === constants.status.OK) { + if (err) { + emitter.emit('error', err); + return; + } + } + response.status.metadata = Metadata._fromCoreRepresentation( + response.status.metadata); + listener.onReceiveStatus(status); + }); + }; + final_requester.sendMessage = function (chunk, context) { + var message; + var callback = (context && context.callback) ? + context.callback : + function() {}; + var encoding = (context && context.encoding) ? + context.encoding : + ''; + try { + message = serialize(chunk); + } catch (e) { + /* Sending this error to the server and emitting it immediately on the + client may put the call in a slightly weird state on the client side, + but passing an object that causes a serialization failure is a misuse + of the API anyway, so that's OK. The primary purpose here is to give + the programmer a useful error and to stop the stream properly */ + call.cancelWithStatus(constants.status.INTERNAL, + 'Serialization failure'); + callback(e); + return; + } + if (_.isFinite(encoding)) { + /* Attach the encoding if it is a finite number. This is the closest we + * can get to checking that it is valid flags */ + message.grpcWriteFlags = encoding; + } + var batch = { + [grpc.opType.SEND_MESSAGE]: message + }; + call.startBatch(batch, function (err, event) { + callback(err, event); + }); + }; + final_requester.halfClose = function () { + var batch = { + [grpc.opType.SEND_CLOSE_FROM_CLIENT]: true + }; + call.startBatch(batch, function () { }); + }; + final_requester.recvMessageWithContext = function(context) { + var recv_batch = { + [grpc.opType.RECV_MESSAGE]: true + }; + call.startBatch(recv_batch, _getStreamReadCallback(emitter, call, + get_listener, deserialize)); + }; + final_requester.cancel = function() { + call.cancel(); + }; + final_requester.getPeer = function() { + return call.getPeer(); + }; + return new InterceptingCall(null, final_requester); + }; +} + +/** + * Produces a listener for responding to callers of unary RPCs. + * @private + * @param {grpc~MethodDefinition} method_definition + * @param {EventEmitter} emitter + * @param {function} callback + * @return {grpc~Listener} + */ +function _getUnaryListener(method_definition, emitter, callback) { + var resultMessage; + return { + onReceiveMetadata: function (metadata) { + emitter.emit('metadata', metadata); + }, + onReceiveMessage: function (message) { + resultMessage = message; + }, + onReceiveStatus: function (status) { + if (status.code !== constants.status.OK) { + var error = common.createStatusError(status); + callback(error); + } else { + callback(null, resultMessage); + } + emitter.emit('status', status); + } + }; +} + +/** + * Produces a listener for responding to callers of client streaming RPCs. + * @private + * @param {grpc~MethodDefinition} method_definition + * @param {EventEmitter} emitter + * @param {function} callback + * @return {grpc~Listener} + */ +function _getClientStreamingListener(method_definition, emitter, callback) { + var resultMessage; + return { + onReceiveMetadata: function (metadata) { + emitter.emit('metadata', metadata); + }, + onReceiveMessage: function (message) { + resultMessage = message; + }, + onReceiveStatus: function (status) { + if (status.code !== constants.status.OK) { + var error = common.createStatusError(status); + callback(error); + } else { + callback(null, resultMessage); + } + emitter.emit('status', status); + } + }; +} + +/** + * Produces a listener for responding to callers of server streaming RPCs. + * @private + * @param {grpc~MethodDefinition} method_definition + * @param {EventEmitter} emitter + * @return {grpc~Listener} + */ +function _getServerStreamingListener(method_definition, emitter) { + var deserialize = common.wrapIgnoreNull( + method_definition.responseDeserialize); + return { + onReceiveMetadata: function (metadata) { + emitter.emit('metadata', metadata); + }, + onReceiveMessage: function(message, next, context) { + if (emitter.push(message) && message !== null) { + var call = context.call; + var get_listener = function() { + return context.listener; + }; + var read_batch = {}; + read_batch[grpc.opType.RECV_MESSAGE] = true; + call.startBatch(read_batch, _getStreamReadCallback(emitter, call, + get_listener, deserialize)); + } else { + emitter.reading = false; + } + }, + onReceiveStatus: function (status) { + emitter._receiveStatus(status); + } + }; +} + +/** + * Produces a listener for responding to callers of bi-directional RPCs. + * @private + * @param {grpc~MethodDefinition} method_definition + * @param {EventEmitter} emitter + * @return {grpc~Listener} + */ +function _getBidiStreamingListener(method_definition, emitter) { + var deserialize = common.wrapIgnoreNull( + method_definition.responseDeserialize); + return { + onReceiveMetadata: function (metadata) { + emitter.emit('metadata', metadata); + }, + onReceiveMessage: function(message, next, context) { + if (emitter.push(message) && message !== null) { + var call = context.call; + var get_listener = function() { + return context.listener; + }; + var read_batch = {}; + read_batch[grpc.opType.RECV_MESSAGE] = true; + call.startBatch(read_batch, _getStreamReadCallback(emitter, call, + get_listener, deserialize)); + } else { + emitter.reading = false; + } + }, + onReceiveStatus: function (status) { + emitter._receiveStatus(status); + } + }; +} + +var interceptorGenerators = { + [methodTypes.UNARY]: _getUnaryInterceptor, + [methodTypes.CLIENT_STREAMING]: _getClientStreamingInterceptor, + [methodTypes.SERVER_STREAMING]: _getServerStreamingInterceptor, + [methodTypes.BIDI_STREAMING]: _getBidiStreamingInterceptor +}; + +var listenerGenerators = { + [methodTypes.UNARY]: _getUnaryListener, + [methodTypes.CLIENT_STREAMING]: _getClientStreamingListener, + [methodTypes.SERVER_STREAMING]: _getServerStreamingListener, + [methodTypes.BIDI_STREAMING]: _getBidiStreamingListener +}; + +/** + * Creates the last listener in an interceptor stack. + * @param {grpc~MethodDefinition} method_definition + * @param {EventEmitter} emitter + * @param {function=} callback + * @return {grpc~Listener} + */ +function getLastListener(method_definition, emitter, callback) { + if (emitter instanceof Function) { + callback = emitter; + callback = function() {}; + } + if (!(callback instanceof Function)) { + callback = function() {}; + } + if (!((emitter instanceof EventEmitter) && + (callback instanceof Function))) { + throw new Error('Argument mismatch in getLastListener'); + } + var method_type = common.getMethodType(method_definition); + var generator = listenerGenerators[method_type]; + return generator(method_definition, emitter, callback); +} + +/** + * + * @param {grpc~MethodDefinition} method_definition + * @param {grpc.Client~CallOptions} options + * @param {Interceptor[]} constructor_interceptors + * @param {grpc.Channel} channel + * @param {function|EventEmitter} responder + */ +function getInterceptingCall(method_definition, options, + constructor_interceptors, channel, responder) { + var interceptors = _processInterceptorLayers( + options, + constructor_interceptors, + method_definition + ); + var last_interceptor = _getLastInterceptor(method_definition, channel, + responder); + var all_interceptors = interceptors.concat(last_interceptor); + return _buildChain(all_interceptors, options); +} + +/** + * Creates the last interceptor in an interceptor stack. + * @private + * @param {grpc~MethodDefinition} method_definition + * @param {grpc.Channel} channel + * @param {function|EventEmitter} responder + * @return {Interceptor} + */ +function _getLastInterceptor(method_definition, channel, responder) { + var callback = (responder instanceof Function) ? responder : function() {}; + var emitter = (responder instanceof EventEmitter) ? responder : + new EventEmitter(); + var method_type = common.getMethodType(method_definition); + var generator = interceptorGenerators[method_type]; + return generator(method_definition, channel, emitter, callback); +} + +/** + * Chain a list of interceptors together and return the first InterceptingCall. + * @private + * @param {Interceptor[]} interceptors An interceptor stack. + * @param {grpc.Client~CallOptions} options Call options. + * @return {InterceptingCall} + */ +function _buildChain(interceptors, options) { + var next = function(interceptors) { + if (interceptors.length === 0) { + return function (options) {}; + } + var head_interceptor = interceptors[0]; + var rest_interceptors = interceptors.slice(1); + return function (options) { + return head_interceptor(options, next(rest_interceptors)); + }; + }; + var chain = next(interceptors)(options); + return new InterceptingCall(chain); +} + +/** + * Process call options and the interceptor override layers to get the final set + * of interceptors. + * @private + * @param {grpc.Client~CallOptions} call_options The options passed to the gRPC + * call. + * @param {Interceptor[]} constructor_interceptors Interceptors passed to the + * client constructor. + * @param {grpc~MethodDefinition} method_definition Details of the RPC method. + * @return {Interceptor[]|null} The final set of interceptors. + */ +function _processInterceptorLayers(call_options, + constructor_interceptors, + method_definition) { + var calltime_interceptors = resolveInterceptorOptions(call_options, + method_definition); + var interceptor_overrides = [ + calltime_interceptors, + constructor_interceptors + ]; + return _resolveInterceptorOverrides(interceptor_overrides); +} + +/** + * Wraps a plain listener object in an InterceptingListener if it isn't an + * InterceptingListener already. + * @param {InterceptingListener|object|null} current_listener + * @param {InterceptingListener|EndListener} next_listener + * @return {InterceptingListener|null} + * @private + */ +function _getInterceptingListener(current_listener, next_listener) { + if (!_isInterceptingListener(current_listener)) { + return new InterceptingListener(next_listener, current_listener); + } + return current_listener; +} + +/** + * Test if the listener exists and is an InterceptingListener. + * @param listener + * @return {boolean} + * @private + */ +function _isInterceptingListener(listener) { + return listener && listener.constructor.name === 'InterceptingListener'; +} + +/** + * Chooses the first valid array of interceptors or returns null. + * @param {Interceptor[][]} interceptor_lists A list of interceptor lists in + * descending override priority order. + * @return {Interceptor[]|null} The resulting interceptors + * @private + */ +function _resolveInterceptorOverrides(interceptor_lists) { + for (var i = 0; i < interceptor_lists.length; i++) { + var interceptor_list = interceptor_lists[i]; + if (_.isArray(interceptor_list)) { + return interceptor_list; + } + } + return null; +} + +exports.resolveInterceptorProviders = resolveInterceptorProviders; + +exports.InterceptingCall = InterceptingCall; +exports.ListenerBuilder = ListenerBuilder; +exports.RequesterBuilder = RequesterBuilder; +exports.StatusBuilder = StatusBuilder; + +exports.InterceptorConfigurationError = InterceptorConfigurationError; + +exports.getInterceptingCall = getInterceptingCall; +exports.getLastListener = getLastListener; diff --git a/packages/grpc-native-core/src/common.js b/packages/grpc-native-core/src/common.js index b9c5ee3fc..0c35296a8 100644 --- a/packages/grpc-native-core/src/common.js +++ b/packages/grpc-native-core/src/common.js @@ -19,6 +19,7 @@ 'use strict'; var _ = require('lodash'); +var constants = require('./constants'); /** * Wrap a function to pass null-like values through without calling it. If no @@ -75,6 +76,42 @@ exports.defaultGrpcOptions = { deprecatedArgumentOrder: false }; +/** + * Create an Error object from a status object + * @param {grpc~StatusObject} status The status object + * @return {Error} The resulting Error + */ +exports.createStatusError = function(status) { + let statusName = _.invert(constants.status)[status.code]; + let message = `${status.code} ${statusName}: ${status.details}`; + let error = new Error(message); + error.code = status.code; + error.metadata = status.metadata; + error.details = status.details; + return error; +}; + +/** + * Get a method's type from its definition + * @param {grpc~MethodDefinition} method_definition + * @return {number} + */ +exports.getMethodType = function(method_definition) { + if (method_definition.requestStream) { + if (method_definition.responseStream) { + return constants.methodTypes.BIDI_STREAMING; + } else { + return constants.methodTypes.CLIENT_STREAMING; + } + } else { + if (method_definition.responseStream) { + return constants.methodTypes.SERVER_STREAMING; + } else { + return constants.methodTypes.UNARY; + } + } +}; + // JSDoc definitions that are used in multiple other modules /** @@ -166,6 +203,79 @@ exports.defaultGrpcOptions = { * function for repsonse data */ +/** + * @function MetadataListener + * @param {grpc.Metadata} metadata The response metadata. + * @param {function} next Passes metadata to the next interceptor. + */ + +/** + * @function MessageListener + * @param {jspb.Message} message The response message. + * @param {function} next Passes a message to the next interceptor. + */ + +/** + * @function StatusListener + * @param {grpc~StatusObject} status The response status. + * @param {function} next Passes a status to the next interceptor. + */ + +/** + * A set of interceptor functions triggered by responses + * @typedef {object} grpc~Listener + * @property {MetadataListener=} onReceiveMetadata A function triggered by + * response metadata. + * @property {MessageListener=} onReceiveMessage A function triggered by a + * response message. + * @property {StatusListener=} onReceiveStatus A function triggered by a + * response status. + */ + +/** + * @function MetadataRequester + * @param {grpc.Metadata} metadata The request metadata. + * @param {grpc~Listener} listener A listener wired to the previous layers + * in the interceptor stack. + * @param {function} next Passes metadata and a listener to the next + * interceptor. + */ + +/** + * @function MessageRequester + * @param {jspb.Message} message The request message. + * @param {function} next Passes a message to the next interceptor. + */ + +/** + * @function CloseRequester + * @param {function} next Calls the next interceptor. + */ + +/** + * @function CancelRequester + * @param {function} next Calls the next interceptor. + */ + +/** + * @function GetPeerRequester + * @param {function} next Calls the next interceptor. + * @return {string} + */ + +/** + * @typedef {object} grpc~Requester + * @param {MetadataRequester=} start A function triggered when the call begins. + * @param {MessageRequester=} sendMessage A function triggered by the request + * message. + * @param {CloseRequester=} halfClose A function triggered when the client + * closes the call. + * @param {CancelRequester=} cancel A function triggered when the call is + * cancelled. + * @param {GetPeerRequester=} getPeer A function triggered when the endpoint is + * requested. + */ + /** * An object that completely defines a service. * @typedef {Object.} grpc~ServiceDefinition @@ -175,3 +285,27 @@ exports.defaultGrpcOptions = { * An object that defines a package hierarchy with multiple services * @typedef {Object.} grpc~PackageDefinition */ + +/** + * A function for dynamically assigning an interceptor to a call. + * @function InterceptorProvider + * @param {grpc~MethodDefinition} method_definition The method to provide + * an interceptor for. + * @return {Interceptor|null} The interceptor to provide or nothing + */ + +/** + * A function which can modify call options and produce methods to intercept + * RPC operations. + * @function Interceptor + * @param {object} options The grpc call options + * @param {NextCall} nextCall + * @return {InterceptingCall} + */ + +/** + * A function which produces the next InterceptingCall. + * @function NextCall + * @param {object} options The grpc call options + * @return {InterceptingCall|null} + */ diff --git a/packages/grpc-native-core/src/constants.js b/packages/grpc-native-core/src/constants.js index b91d4214a..e9df9a318 100644 --- a/packages/grpc-native-core/src/constants.js +++ b/packages/grpc-native-core/src/constants.js @@ -235,3 +235,17 @@ exports.logVerbosity = { INFO: 1, ERROR: 2 }; + +/** + * Method types: the supported RPC types + * @memberof grpc + * @alias grpc.methodTypes + * @readonly + * @enum {number} + */ +exports.methodTypes = { + UNARY: 0, + CLIENT_STREAMING: 1, + SERVER_STREAMING: 2, + BIDI_STREAMING: 3 +}; diff --git a/packages/grpc-native-core/test/client_interceptors_test.js b/packages/grpc-native-core/test/client_interceptors_test.js new file mode 100644 index 000000000..b14ec1569 --- /dev/null +++ b/packages/grpc-native-core/test/client_interceptors_test.js @@ -0,0 +1,1795 @@ +/** + * @license + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +'use strict'; + +var _ = require('lodash'); +var assert = require('assert'); +var grpc = require('..'); +var grpc_client = require('../src/client.js'); +var Metadata = require('../src/metadata'); + +var insecureCreds = grpc.credentials.createInsecure(); + +var echo_proto = grpc.load(__dirname + '/echo_service.proto'); +var echo_service = echo_proto.EchoService.service; + +var StatusBuilder = grpc_client.StatusBuilder; +var ListenerBuilder = grpc_client.ListenerBuilder; +var InterceptingCall = grpc_client.InterceptingCall; +var RequesterBuilder = grpc_client.RequesterBuilder; + +var CallRegistry = function(done, expectation, is_ordered, is_verbose) { + this.call_map = {}; + this.call_array = []; + this.done = done; + this.expectation = expectation; + this.expectation_is_array = _.isArray(this.expectation); + this.is_ordered = is_ordered; + this.is_verbose = is_verbose; + if (is_verbose) { + console.log('Expectation: ', expectation); + } +}; + +CallRegistry.prototype.addCall = function(call_name) { + if (this.expectation_is_array) { + this.call_array.push(call_name); + if (this.is_verbose) { + console.log(this.call_array); + } + } else { + if (!this.call_map[call_name]) { + this.call_map[call_name] = 0; + } + this.call_map[call_name]++; + if (this.is_verbose) { + console.log(this.call_map); + } + } + this.maybeCallDone(); +}; + +CallRegistry.prototype.maybeCallDone = function() { + if (this.expectation_is_array) { + if (this.is_ordered) { + if (this.expectation && _.isEqual(this.expectation, this.call_array)) { + this.done(); + } + } else { + var intersection = _.intersectionWith(this.expectation, this.call_array, + _.isEqual); + if (intersection.length === this.expectation.length) { + this.done(); + } + } + } else if (this.expectation && _.isEqual(this.expectation, this.call_map)) { + this.done(); + } +}; + +describe('Client interceptors', function() { + var echo_server; + var echo_port; + var client; + + function startServer() { + echo_server = new grpc.Server(); + echo_server.addService(echo_service, { + echo: function(call, callback) { + call.sendMetadata(call.metadata); + if (call.request.value === 'error') { + var status = { + code: 2, + message: 'test status message' + }; + status.metadata = call.metadata; + callback(status, null); + return; + } + callback(null, call.request); + }, + echoClientStream: function(call, callback){ + call.sendMetadata(call.metadata); + var payload; + var err = null; + call.on('data', function(data) { + if (data.value === 'error') { + err = { + code: 2, + message: 'test status message' + }; + err.metadata = call.metadata; + return; + } + payload = data; + }); + call.on('end', function() { + callback(err, payload, call.metadata); + }); + }, + echoServerStream: function(call) { + call.sendMetadata(call.metadata); + if (call.request.value === 'error') { + var status = { + code: 2, + message: 'test status message' + }; + status.metadata = call.metadata; + call.emit('error', status); + return; + } + call.write(call.request); + call.end(call.metadata); + }, + echoBidiStream: function(call) { + call.sendMetadata(call.metadata); + call.on('data', function(data) { + if (data.value === 'error') { + var status = { + code: 2, + message: 'test status message' + }; + call.emit('error', status); + return; + } + call.write(data); + }); + call.on('end', function() { + call.end(call.metadata); + }); + } + }); + var server_credentials = grpc.ServerCredentials.createInsecure(); + echo_port = echo_server.bind('localhost:0', server_credentials); + echo_server.start(); + } + + function stopServer() { + echo_server.forceShutdown(); + } + + function resetClient() { + var EchoClient = grpc_client.makeClientConstructor(echo_service); + client = new EchoClient('localhost:' + echo_port, insecureCreds); + } + + before(function() { + startServer(); + }); + beforeEach(function() { + resetClient(); + }); + after(function() { + stopServer(); + }); + describe('pass calls through when no interceptors provided', function() { + it('with unary call', function(done) { + var expected_value = 'foo'; + var message = {value: expected_value}; + client.echo(message, function(err, response) { + assert.strictEqual(response.value, expected_value); + done(); + }); + assert(_.isEqual(grpc_client.getClientInterceptors(client), { + echo: [], + echoClientStream: [], + echoServerStream: [], + echoBidiStream: [] + })); + }); + }); + + describe('execute downstream interceptors when a new call is made outbound', + function() { + var registry; + var options; + before(function() { + var stored_listener; + var stored_metadata; + var interceptor_a = function (options, nextCall) { + options.call_number = 1; + registry.addCall('construct a ' + options.call_number); + return new InterceptingCall(nextCall(options), { + start: function (metadata, listener, next) { + registry.addCall('start a ' + options.call_number); + stored_listener = listener; + stored_metadata = metadata; + next(metadata, listener); + }, + sendMessage: function (message, next) { + registry.addCall('send a ' + options.call_number); + var options2 = _.clone(options); + options2.call_number = 2; + var second_call = nextCall(options2); + second_call.start(stored_metadata); + second_call.sendMessage(message); + second_call.halfClose(); + next(message); + }, + halfClose: function (next) { + registry.addCall('close a ' + options.call_number); + next(); + } + }); + }; + + var interceptor_b = function (options, nextCall) { + registry.addCall('construct b ' + options.call_number); + return new InterceptingCall(nextCall(options), { + start: function (metadata, listener, next) { + registry.addCall('start b ' + options.call_number); + next(metadata, listener); + }, + sendMessage: function (message, next) { + registry.addCall('send b ' + options.call_number); + next(message); + }, + halfClose: function (next) { + registry.addCall('close b ' + options.call_number); + next(); + } + }); + }; + options = { + interceptors: [interceptor_a, interceptor_b] + }; + }); + var expected_calls = [ + 'construct a 1', + 'construct b 1', + 'start a 1', + 'start b 1', + 'send a 1', + 'construct b 2', + 'start b 2', + 'send b 2', + 'close b 2', + 'send b 1', + 'close a 1', + 'close b 1', + 'response' + ]; + + it('with unary call', function(done) { + registry = new CallRegistry(done, expected_calls, true); + var message = {}; + message.value = 'foo'; + client.echo(message, options, function(err, response){ + if (!err) { + registry.addCall('response'); + } + }); + }); + it('with client streaming call', function(done) { + registry = new CallRegistry(done, expected_calls, false); + var message = {}; + message.value = 'foo'; + var stream = client.echoClientStream(options, function(err, response) { + if (!err) { + registry.addCall('response'); + } + }); + stream.write(message); + stream.end(); + }); + it('with server streaming call', function(done) { + registry = new CallRegistry(done, expected_calls, true); + var message = {}; + message.value = 'foo'; + var stream = client.echoServerStream(message, options); + stream.on('data', function(data) { + registry.addCall('response'); + }); + }); + it('with bidi streaming call', function(done) { + registry = new CallRegistry( done, expected_calls, true); + var message = {}; + message.value = 'foo'; + var stream = client.echoBidiStream(options); + stream.on('data', function(data) { + registry.addCall('response'); + }); + stream.write(message); + stream.end(); + }); + }); + + + describe('execute downstream interceptors when a new call is made inbound', + function() { + var registry; + var options; + before(function() { + var interceptor_a = function (options, nextCall) { + return new InterceptingCall(nextCall(options), { + start: function (metadata, listener, next) { + next(metadata, { + onReceiveMetadata: function () { }, + onReceiveMessage: function (message, next) { + registry.addCall('interceptor_a'); + var second_call = nextCall(options); + second_call.start(metadata, listener); + second_call.sendMessage(message); + second_call.halfClose(); + }, + onReceiveStatus: function () { } + }); + } + }); + }; + + var interceptor_b = function (options, nextCall) { + return new InterceptingCall(nextCall(options), { + start: function (metadata, listener, next) { + next(metadata, { + onReceiveMessage: function (message, next) { + registry.addCall('interceptor_b'); + next(message); + } + }); + } + }); + }; + + options = { + interceptors: [interceptor_a, interceptor_b] + }; + + }); + var expected_calls = ['interceptor_b', 'interceptor_a', + 'interceptor_b', 'response']; + it('with unary call', function(done) { + registry = new CallRegistry(done, expected_calls, true); + var message = {}; + message.value = 'foo'; + client.echo(message, options, function(err) { + if (!err) { + registry.addCall('response'); + } + }); + }); + it('with client streaming call', function(done) { + registry = new CallRegistry(done, expected_calls, true); + var message = {}; + message.value = 'foo'; + var stream = client.echoClientStream(options, function(err, response) { + if (!err) { + registry.addCall('response'); + } + }); + stream.write(message); + stream.end(); + }); + }); + + it('will delay operations and short circuit unary requests', function(done) { + var registry = new CallRegistry(done, ['foo_miss', 'foo_hit', 'bar_miss', + 'foo_hit_done', 'foo_miss_done', 'bar_miss_done']); + var cache = {}; + var _getCachedResponse = function(value) { + return cache[value]; + }; + var _store = function(key, value) { + cache[key] = value; + }; + + var interceptor = function(options, nextCall) { + var savedMetadata; + var startNext; + var storedListener; + var storedMessage; + var messageNext; + var requester = (new RequesterBuilder()) + .withStart(function(metadata, listener, next) { + savedMetadata = metadata; + storedListener = listener; + startNext = next; + }) + .withSendMessage(function(message, next) { + storedMessage = message; + messageNext = next; + }) + .withHalfClose(function(next) { + var cachedValue = _getCachedResponse(storedMessage.value); + if (cachedValue) { + var cachedMessage = {}; + cachedMessage.value = cachedValue; + registry.addCall(storedMessage.value + '_hit'); + storedListener.onReceiveMetadata(new Metadata()); + storedListener.onReceiveMessage(cachedMessage); + storedListener.onReceiveStatus( + (new StatusBuilder()).withCode(grpc.status.OK).build()); + } else { + registry.addCall(storedMessage.value + '_miss'); + var newListener = (new ListenerBuilder()).withOnReceiveMessage( + function(message, next) { + _store(storedMessage.value, message.value); + next(message); + }).build(); + startNext(savedMetadata, newListener); + messageNext(storedMessage); + next(); + } + }) + .withCancel(function(message, next) { + next(); + }).build(); + + return new InterceptingCall(nextCall(options), requester); + }; + + var options = { + interceptors: [interceptor] + }; + + var foo_message = {}; + foo_message.value = 'foo'; + client.echo(foo_message, options, function(err, response){ + assert.equal(response.value, 'foo'); + registry.addCall('foo_miss_done'); + client.echo(foo_message, options, function(err, response){ + assert.equal(response.value, 'foo'); + registry.addCall('foo_hit_done'); + }); + }); + + var bar_message = {}; + bar_message.value = 'bar'; + client.echo(bar_message, options, function(err, response) { + assert.equal(response.value, 'bar'); + registry.addCall('bar_miss_done'); + }); + }); + + it('can retry failed messages and handle eventual success', function(done) { + var registry = new CallRegistry(done, + ['retry_foo_1', 'retry_foo_2', 'retry_foo_3', 'foo_result', + 'retry_bar_1', 'bar_result']); + var maxRetries = 3; + var retry_interceptor = function(options, nextCall) { + var savedMetadata; + var savedSendMessage; + var savedReceiveMessage; + var savedMessageNext; + var requester = (new RequesterBuilder()) + .withStart(function(metadata, listener, next) { + savedMetadata = metadata; + var new_listener = (new ListenerBuilder()) + .withOnReceiveMessage(function(message, next) { + savedReceiveMessage = message; + savedMessageNext = next; + }) + .withOnReceiveStatus(function(status, next) { + var retries = 0; + var retry = function(message, metadata) { + retries++; + var newCall = nextCall(options); + var receivedMessage; + newCall.start(metadata, { + onReceiveMessage: function(message) { + receivedMessage = message; + }, + onReceiveStatus: function(status) { + registry.addCall('retry_' + savedMetadata.get('name') + + '_' + retries); + if (status.code !== grpc.status.OK) { + if (retries <= maxRetries) { + retry(message, metadata); + } else { + savedMessageNext(receivedMessage); + next(status); + } + } else { + registry.addCall('success_call'); + var new_status = (new StatusBuilder()) + .withCode(grpc.status.OK).build(); + savedMessageNext(receivedMessage); + next(new_status); + } + } + }); + newCall.sendMessage(message); + newCall.halfClose(); + }; + if (status.code !== grpc.status.OK) { + // Change the message we're sending only for test purposes + // so the server will respond without error + var newMessage = (savedMetadata.get('name')[0] === 'bar') ? + {value: 'bar'} : savedSendMessage; + retry(newMessage, savedMetadata); + } else { + savedMessageNext(savedReceiveMessage); + next(status); + } + } + ).build(); + next(metadata, new_listener); + }) + .withSendMessage(function(message, next) { + savedSendMessage = message; + next(message); + }).build(); + return new InterceptingCall(nextCall(options), requester); + }; + + var options = { + interceptors: [retry_interceptor] + }; + + // Make a call which the server will return a non-OK status for + var foo_message = {value: 'error'}; + var foo_metadata = new Metadata(); + foo_metadata.set('name', 'foo'); + client.echo(foo_message, foo_metadata, options, function(err, response) { + assert.strictEqual(err.code, 2); + registry.addCall('foo_result'); + }); + + // Make a call which will fail the first time and succeed on the first + // retry + var bar_message = {value: 'error'}; + var bar_metadata = new Metadata(); + bar_metadata.set('name', 'bar'); + client.echo(bar_message, bar_metadata, options, function(err, response) { + assert.strictEqual(response.value, 'bar'); + registry.addCall('bar_result'); + }); + }); + + it('can retry and preserve interceptor order on success', function(done) { + var registry = new CallRegistry(done, + ['interceptor_c', 'retry_interceptor', 'fail_call', 'interceptor_c', + 'success_call', 'interceptor_a', 'result'], true); + var interceptor_a = function(options, nextCall) { + var requester = (new RequesterBuilder()) + .withStart(function(metadata, listener, next) { + var new_listener = (new ListenerBuilder()) + .withOnReceiveMessage(function(message, next) { + registry.addCall('interceptor_a'); + next(message); + }).build(); + next(metadata, new_listener); + }).build(); + return new InterceptingCall(nextCall(options), requester); + }; + + var retry_interceptor = function(options, nextCall) { + var savedMetadata; + var savedMessage; + var savedMessageNext; + var sendMessageNext; + var originalMessage; + var startNext; + var originalListener; + var requester = (new RequesterBuilder()) + .withStart(function(metadata, listener, next) { + startNext = next; + savedMetadata = metadata; + originalListener = listener; + var new_listener = (new ListenerBuilder()) + .withOnReceiveMessage(function(message, next) { + savedMessage = message; + savedMessageNext = next; + }) + .withOnReceiveStatus(function(status, next) { + var retries = 0; + var maxRetries = 1; + var receivedMessage; + var retry = function(message, metadata) { + retries++; + var new_call = nextCall(options); + new_call.start(metadata, { + onReceiveMessage: function(message) { + receivedMessage = message; + }, + onReceiveStatus: function(status) { + if (status.code !== grpc.status.OK) { + if (retries <= maxRetries) { + retry(message, metadata); + } else { + savedMessageNext(receivedMessage); + next(status); + } + } else { + registry.addCall('success_call'); + var new_status = (new StatusBuilder()) + .withCode(grpc.status.OK).build(); + savedMessageNext(receivedMessage); + next(new_status); + } + } + }); + new_call.sendMessage(message); + new_call.halfClose(); + }; + registry.addCall('retry_interceptor'); + if (status.code !== grpc.status.OK) { + registry.addCall('fail_call'); + var newMessage = {value: 'foo'}; + retry(newMessage, savedMetadata); + } else { + savedMessageNext(savedMessage); + next(status); + } + }).build(); + next(metadata, new_listener); + }) + .withSendMessage(function(message, next) { + sendMessageNext = next; + originalMessage = message; + next(message); + }) + .build(); + return new InterceptingCall(nextCall(options), requester); + }; + + var interceptor_c = function(options, nextCall) { + var requester = (new RequesterBuilder()) + .withStart(function(metadata, listener, next) { + var new_listener = (new ListenerBuilder()) + .withOnReceiveMessage(function(message, next) { + registry.addCall('interceptor_c'); + next(message); + }).build(); + next(metadata, new_listener); + }).build(); + return new InterceptingCall(nextCall(options), requester); + }; + + var options = { + interceptors: [interceptor_a, retry_interceptor, interceptor_c] + }; + + var message = {value: 'error'}; + client.echo(message, options, function(err, response) { + assert.strictEqual(response.value, 'foo'); + registry.addCall('result'); + }); + }); + + describe('handle interceptor errors', function (doneOuter) { + var options; + before(function () { + var foo_interceptor = function (options, nextCall) { + var savedListener; + var outbound = (new RequesterBuilder()) + .withStart(function (metadata, listener, next) { + savedListener = listener; + next(metadata, listener); + }) + .withSendMessage(function (message, next) { + savedListener.onReceiveMetadata(new Metadata()); + savedListener.onReceiveMessage({ value: 'failed' }); + var error_status = (new StatusBuilder()) + .withCode(16) + .withDetails('Error in foo interceptor') + .build(); + savedListener.onReceiveStatus(error_status); + }).build(); + return new grpc_client.InterceptingCall(nextCall(options), outbound); + }; + options = { interceptors: [foo_interceptor] }; + }); + it('with unary call', function(done) { + var message = {}; + client.echo(message, options, function(err, response) { + assert.strictEqual(err.code, 16); + assert.strictEqual(err.message, + '16 UNAUTHENTICATED: Error in foo interceptor'); + done(); + doneOuter(); + }); + }); + }); + + describe('implement fallbacks for streaming RPCs', function() { + + var options; + before(function () { + var fallback_response = { value: 'fallback' }; + var savedMessage; + var savedMessageNext; + var interceptor = function (options, nextCall) { + var requester = (new RequesterBuilder()) + .withStart(function (metadata, listener, next) { + var new_listener = (new ListenerBuilder()) + .withOnReceiveMessage(function (message, next) { + savedMessage = message; + savedMessageNext = next; + }) + .withOnReceiveStatus(function (status, next) { + if (status.code !== grpc.status.OK) { + savedMessageNext(fallback_response); + next((new StatusBuilder()).withCode(grpc.status.OK)); + } else { + savedMessageNext(savedMessage); + next(status); + } + }).build(); + next(metadata, new_listener); + }).build(); + return new InterceptingCall(nextCall(options), requester); + }; + options = { + interceptors: [interceptor] + }; + }); + it('with client streaming call', function (done) { + var registry = new CallRegistry(done, ['foo_result', 'fallback_result']); + var stream = client.echoClientStream(options, function (err, response) { + assert.strictEqual(response.value, 'foo'); + registry.addCall('foo_result'); + }); + stream.write({ value: 'foo' }); + stream.end(); + + stream = client.echoClientStream(options, function(err, response) { + assert.strictEqual(response.value, 'fallback'); + registry.addCall('fallback_result'); + }); + stream.write({value: 'error'}); + stream.end(); + }); + }); + + describe('allows the call options to be modified for downstream interceptors', + function() { + var done; + var options; + var method_name; + var method_path_last; + before(function() { + var interceptor_a = function (options, nextCall) { + options.deadline = 10; + return new InterceptingCall(nextCall(options)); + }; + var interceptor_b = function (options, nextCall) { + assert.equal(options.method_definition.path, '/EchoService/' + + method_path_last); + assert.equal(options.deadline, 10); + done(); + return new InterceptingCall(nextCall(options)); + }; + + options = { + interceptors: [interceptor_a, interceptor_b], + deadline: 100 + }; + }); + + it('with unary call', function(cb) { + done = cb; + var metadata = new Metadata(); + var message = {}; + method_name = 'echo'; + method_path_last = 'Echo'; + + client.echo(message, metadata, options, function(){}); + }); + + it('with client streaming call', function(cb) { + done = cb; + var metadata = new Metadata(); + method_name = 'echoClientStream'; + method_path_last = 'EchoClientStream'; + + client.echoClientStream(metadata, options, function() {}); + }); + + it('with server streaming call', function(cb) { + done = cb; + var metadata = new Metadata(); + var message = {}; + method_name = 'echoServerStream'; + method_path_last = 'EchoServerStream'; + + client.echoServerStream(message, metadata, options); + }); + + it('with bidi streaming call', function(cb) { + done = cb; + var metadata = new Metadata(); + method_name = 'echoBidiStream'; + method_path_last = 'EchoBidiStream'; + + client.echoBidiStream(metadata, options); + }); + }); + + describe('pass accurate MethodDefinitions', function() { + var registry; + var initial_value = 'broken'; + var expected_value = 'working'; + var options; + before(function() { + var interceptor = function (options, nextCall) { + registry.addCall({ + path: options.method_definition.path, + requestStream: options.method_definition.requestStream, + responseStream: options.method_definition.responseStream + }); + var outbound = (new RequesterBuilder()) + .withSendMessage(function (message, next) { + message.value = expected_value; + next(message); + }).build(); + return new InterceptingCall(nextCall(options), outbound); + }; + options = { interceptors: [interceptor] }; + }); + + it('with unary call', function(done) { + var unary_definition = { + path: '/EchoService/Echo', + requestStream: false, + responseStream: false + }; + registry = new CallRegistry(done, [ + unary_definition, + 'result_unary' + ]); + + var metadata = new Metadata(); + + var message = {value: initial_value}; + + client.echo(message, metadata, options, function(err, response){ + assert.equal(response.value, expected_value); + registry.addCall('result_unary'); + }); + + }); + it('with client streaming call', function(done) { + + var client_stream_definition = { + path: '/EchoService/EchoClientStream', + requestStream: true, + responseStream: false + }; + registry = new CallRegistry(done, [ + client_stream_definition, + 'result_client_stream' + ], false, true); + var metadata = new Metadata(); + var message = {value: initial_value}; + var client_stream = client.echoClientStream(metadata, options, + function(err, response) { + assert.strictEqual(response.value, expected_value); + registry.addCall('result_client_stream'); + }); + client_stream.write(message); + client_stream.end(); + + }); + it('with server streaming call', function(done) { + var server_stream_definition = { + path: '/EchoService/EchoServerStream', + responseStream: true, + requestStream: false, + }; + registry = new CallRegistry(done, [ + server_stream_definition, + 'result_server_stream' + ]); + + var metadata = new Metadata(); + var message = {value: initial_value}; + var server_stream = client.echoServerStream(message, metadata, options); + server_stream.on('data', function(data) { + assert.strictEqual(data.value, expected_value); + registry.addCall('result_server_stream'); + }); + + }); + it('with bidi streaming call', function(done) { + var bidi_stream_definition = { + path: '/EchoService/EchoBidiStream', + requestStream: true, + responseStream: true + }; + registry = new CallRegistry(done, [ + bidi_stream_definition, + 'result_bidi_stream' + ]); + + var metadata = new Metadata(); + var message = {value: initial_value}; + var bidi_stream = client.echoBidiStream(metadata, options); + bidi_stream.on('data', function(data) { + assert.strictEqual(data.value, expected_value); + registry.addCall('result_bidi_stream'); + }); + bidi_stream.write(message); + bidi_stream.end(); + }); + }); + + it('uses interceptors passed to the client constructor', function(done) { + var registry = new CallRegistry(done, { + 'constructor_interceptor_a_echo': 1, + 'constructor_interceptor_b_echoServerStream': 1, + 'invocation_interceptor': 1, + 'result_unary': 1, + 'result_stream': 1, + 'result_invocation': 1 + }); + + var constructor_interceptor_a = function(options, nextCall) { + var outbound = (new RequesterBuilder()) + .withStart(function(metadata, listener, next) { + registry.addCall('constructor_interceptor_a_' + + client.$method_names[options.method_definition.path]); + next(metadata, listener); + }).build(); + return new InterceptingCall(nextCall(options), outbound); + }; + var constructor_interceptor_b = function(options, nextCall) { + var outbound = (new RequesterBuilder()) + .withStart(function(metadata, listener, next) { + registry.addCall('constructor_interceptor_b_' + + client.$method_names[options.method_definition.path]); + next(metadata, listener); + }).build(); + return new InterceptingCall(nextCall(options), outbound); + }; + var invocation_interceptor = function(options, nextCall) { + var outbound = (new RequesterBuilder()) + .withStart(function(metadata, listener, next) { + registry.addCall('invocation_interceptor'); + next(metadata, listener); + }).build(); + return new InterceptingCall(nextCall(options), outbound); + }; + + var interceptor_providers = [ + function(method_definition) { + if (!method_definition.requestStream && + !method_definition.responseStream) { + return constructor_interceptor_a; + } + }, + function(method_definition) { + if (!method_definition.requestStream && + method_definition.responseStream) { + return constructor_interceptor_b; + } + } + ]; + var constructor_options = { + interceptor_providers: interceptor_providers + }; + var IntClient = grpc_client.makeClientConstructor(echo_service); + var int_client = new IntClient('localhost:' + echo_port, insecureCreds, + constructor_options); + var message = {}; + int_client.echo(message, function() { + registry.addCall('result_unary'); + }); + var stream = int_client.echoServerStream(message); + stream.on('data', function() { + registry.addCall('result_stream'); + }); + + var options = { interceptors: [invocation_interceptor] }; + int_client.echo(message, options, function() { + registry.addCall('result_invocation'); + }); + + assert(_.isEqual(grpc_client.getClientInterceptors(int_client), { + echo: [constructor_interceptor_a], + echoClientStream: [], + echoServerStream: [constructor_interceptor_b], + echoBidiStream: [] + })); + }); + + it('will reject conflicting interceptor options at invocation', + function(done) { + try { + client.echo('message', { + interceptors: [], + interceptor_providers: [] + }, function () {}); + } catch (e) { + assert.equal(e.name, 'InterceptorConfigurationError'); + done(); + } + }); + + it('will resolve interceptor providers at invocation', function(done) { + var constructor_interceptor = function(options, nextCall) { + var outbound = (new RequesterBuilder()) + .withStart(function() { + assert(false); + }).build(); + return new InterceptingCall(nextCall(options), outbound); + }; + var invocation_interceptor = function(options, nextCall) { + var outbound = (new RequesterBuilder()) + .withStart(function() { + done(); + }).build(); + return new InterceptingCall(nextCall(options), outbound); + }; + var constructor_interceptor_providers = [ + function() { + return constructor_interceptor; + } + ]; + var invocation_interceptor_providers = [ + function() { + return invocation_interceptor; + } + ]; + var constructor_options = { + interceptor_providers: constructor_interceptor_providers + }; + var IntClient = grpc_client.makeClientConstructor(echo_service); + var int_client = new IntClient('localhost:' + echo_port, insecureCreds, + constructor_options); + var message = {}; + var options = { interceptor_providers: invocation_interceptor_providers }; + int_client.echo(message, options, function() {}); + }); + + describe('trigger a stack of interceptors in nested order', function() { + var registry; + var expected_calls = ['constructA', 'constructB', 'outboundA', 'outboundB', + 'inboundB', 'inboundA']; + var options; + before(function() { + var interceptor_a = function (options, nextCall) { + registry.addCall('constructA'); + var outbound = (new RequesterBuilder()) + .withStart(function (metadata, listener, next) { + registry.addCall('outboundA'); + var new_listener = (new ListenerBuilder()).withOnReceiveMessage( + function (message, next) { + registry.addCall('inboundA'); + next(message); + }).build(); + next(metadata, new_listener); + }).build(); + return new grpc_client.InterceptingCall(nextCall(options), + outbound); + }; + var interceptor_b = function (options, nextCall) { + registry.addCall('constructB'); + var outbound = (new RequesterBuilder()) + .withStart(function (metadata, listener, next) { + registry.addCall('outboundB'); + var new_listener = (new ListenerBuilder()).withOnReceiveMessage( + function (message, next) { + registry.addCall('inboundB'); + next(message); + }).build(); + next(metadata, new_listener); + }).build(); + return new grpc_client.InterceptingCall(nextCall(options), + outbound); + }; + options = { interceptors: [interceptor_a, interceptor_b] }; + }); + var metadata = new Metadata(); + var message = {}; + + it('with unary call', function(done) { + registry = new CallRegistry(done, expected_calls, true); + client.echo(message, metadata, options, function(){}); + }); + + it('with client streaming call', function(done) { + registry = new CallRegistry(done, expected_calls, true); + var client_stream = client.echoClientStream(metadata, options, + function() {}); + client_stream.write(message); + client_stream.end(); + }); + + it('with server streaming call', function(done) { + registry = new CallRegistry(done, expected_calls, true); + var stream = client.echoServerStream(message, metadata, options); + stream.on('data', function() {}); + }); + + it('with bidi streaming call', function(done) { + registry = new CallRegistry(done, expected_calls, true); + var bidi_stream = client.echoBidiStream(metadata, options); + bidi_stream.on('data', function(){}); + bidi_stream.write(message); + bidi_stream.end(); + }); + }); + + describe('trigger interceptors horizontally', function() { + var expected_calls = [ + 'interceptor_a_start', + 'interceptor_b_start', + 'interceptor_a_send', + 'interceptor_b_send' + ]; + var registry; + var options; + var metadata = new Metadata(); + var message = {}; + + before(function() { + var interceptor_a = function (options, nextCall) { + var outbound = (new RequesterBuilder()) + .withStart(function (metadata, listener, next) { + registry.addCall('interceptor_a_start'); + next(metadata, listener); + }) + .withSendMessage(function (message, next) { + registry.addCall('interceptor_a_send'); + next(message); + }).build(); + return new grpc_client.InterceptingCall(nextCall(options), + outbound); + }; + var interceptor_b = function (options, nextCall) { + var outbound = (new RequesterBuilder()) + .withStart(function (metadata, listener, next) { + registry.addCall('interceptor_b_start'); + next(metadata, listener); + }) + .withSendMessage(function (message, next) { + registry.addCall('interceptor_b_send'); + next(message); + }).build(); + return new grpc_client.InterceptingCall(nextCall(options), + outbound); + }; + options = { interceptors: [interceptor_a, interceptor_b] }; + }); + + it('with unary call', function(done) { + registry = new CallRegistry(done, expected_calls, true); + client.echo(message, metadata, options, function(){}); + }); + + it('with client streaming call', function(done) { + registry = new CallRegistry(done, expected_calls, true); + var client_stream = client.echoClientStream(metadata, options, + function() {}); + client_stream.write(message); + client_stream.end(); + }); + + it('with server streaming call', function(done) { + registry = new CallRegistry(done, expected_calls, true); + var stream = client.echoServerStream(message, metadata, options); + stream.on('data', function() {}); + }); + + it('with bidi streaming call', function(done) { + registry = new CallRegistry(done, expected_calls, true); + var bidi_stream = client.echoBidiStream(metadata, options); + bidi_stream.on('data', function(){}); + bidi_stream.write(message); + bidi_stream.end(); + }); + }); + + describe('trigger when sending metadata', function() { + var registry; + + var message = {}; + var key_names = ['original', 'foo', 'bar']; + var keys = { + original: 'originalkey', + foo: 'fookey', + bar: 'barkey' + }; + var values = { + original: 'originalvalue', + foo: 'foovalue', + bar: 'barvalue' + }; + var expected_calls = ['foo', 'bar', 'response']; + var options; + before(function () { + var foo_interceptor = function (options, nextCall) { + var outbound = (new RequesterBuilder()) + .withStart(function (metadata, listener, next) { + metadata.add(keys.foo, values.foo); + registry.addCall('foo'); + next(metadata, listener); + }).build(); + return new InterceptingCall(nextCall(options), outbound); + }; + var bar_interceptor = function (options, nextCall) { + var outbound = (new RequesterBuilder()) + .withStart(function (metadata, listener, next) { + metadata.add(keys.bar, values.bar); + registry.addCall('bar'); + next(metadata, listener); + }).build(); + return new InterceptingCall(nextCall(options), outbound); + }; + options = { interceptors: [foo_interceptor, bar_interceptor] }; + }); + + it('with unary call', function (done) { + registry = new CallRegistry(done, expected_calls, true); + var metadata = new Metadata(); + metadata.add(keys.original, values.original); + + var unary_call = client.echo(message, metadata, options, function () {}); + unary_call.on('metadata', function (metadata) { + var has_expected_values = _.every(key_names, function (key_name) { + return _.isEqual(metadata.get(keys[key_name]), [values[key_name]]); + }); + assert(has_expected_values); + registry.addCall('response'); + }); + }); + it('with client streaming call', function(done) { + registry = new CallRegistry(done, expected_calls, true); + var metadata = new Metadata(); + metadata.add(keys.original, values.original); + + var client_stream = client.echoClientStream(metadata, options, + function () { + }); + client_stream.write(message); + client_stream.on('metadata', function (metadata) { + var has_expected_values = _.every(key_names, function (key_name) { + return _.isEqual(metadata.get(keys[key_name]), [values[key_name]]); + }); + assert(has_expected_values); + registry.addCall('response'); + }); + client_stream.end(); + }); + it('with server streaming call', function(done) { + registry = new CallRegistry(done, expected_calls, true); + var metadata = new Metadata(); + metadata.add(keys.original, values.original); + var server_stream = client.echoServerStream(message, metadata, options); + server_stream.on('metadata', function (metadata) { + var has_expected_values = _.every(key_names, function (key_name) { + return _.isEqual(metadata.get(keys[key_name]), [values[key_name]]); + }); + assert(has_expected_values); + registry.addCall('response'); + }); + server_stream.on('data', function() { }); + }); + it('with bidi streaming call', function(done) { + registry = new CallRegistry(done, expected_calls, true); + var metadata = new Metadata(); + metadata.add(keys.original, values.original); + var bidi_stream = client.echoBidiStream(metadata, options); + bidi_stream.on('metadata', function(metadata) { + var has_expected_values = _.every(key_names, function(key_name) { + return _.isEqual(metadata.get(keys[key_name]),[values[key_name]]); + }); + assert(has_expected_values); + bidi_stream.end(); + registry.addCall('response'); + }); + bidi_stream.on('data', function() { }); + bidi_stream.write(message); + bidi_stream.end(); + }); + }); + + describe('trigger when sending messages', function() { + var registry; + var originalValue = 'foo'; + var expectedValue = 'bar'; + var options; + var metadata = new Metadata(); + var expected_calls = ['messageIntercepted', 'response']; + + before(function() { + var foo_interceptor = function (options, nextCall) { + var outbound = (new RequesterBuilder()) + .withSendMessage(function (message, next) { + assert.strictEqual(message.value, originalValue); + registry.addCall('messageIntercepted'); + next({ value: expectedValue }); + }).build(); + return new grpc_client.InterceptingCall(nextCall(options), + outbound); + }; + options = { interceptors: [foo_interceptor] }; + }); + + it('with unary call', function(done) { + registry = new CallRegistry(done, expected_calls, true); + var message = {value: originalValue}; + + client.echo(message, metadata, options, function (err, response) { + assert.strictEqual(response.value, expectedValue); + registry.addCall('response'); + }); + }); + it('with client streaming call', function(done) { + registry = new CallRegistry(done, expected_calls, true); + var message = {value: originalValue}; + var client_stream = client.echoClientStream(metadata, options, + function (err, response) { + assert.strictEqual(response.value, expectedValue); + registry.addCall('response'); + }); + client_stream.write(message); + client_stream.end(); + }); + it('with server streaming call', function(done) { + registry = new CallRegistry(done, expected_calls, true); + var message = {value: originalValue}; + var server_stream = client.echoServerStream(message, metadata, options); + server_stream.on('data', function (data) { + assert.strictEqual(data.value, expectedValue); + registry.addCall('response'); + }); + }); + it('with bidi streaming call', function(done) { + registry = new CallRegistry(done, expected_calls, true); + var message = {value: originalValue}; + var bidi_stream = client.echoBidiStream(metadata, options); + bidi_stream.on('data', function(data) { + assert.strictEqual(data.value, expectedValue); + registry.addCall('response'); + }); + bidi_stream.write(message); + bidi_stream.end(); + }); + }); + + describe('trigger when client closes the call', function() { + var registry; + var expected_calls = [ + 'response', 'halfClose' + ]; + var message = {}; + var options; + before(function() { + var foo_interceptor = function (options, nextCall) { + var outbound = (new RequesterBuilder()) + .withHalfClose(function (next) { + registry.addCall('halfClose'); + next(); + }).build(); + return new grpc_client.InterceptingCall(nextCall(options), + outbound); + }; + options = { interceptors: [foo_interceptor] }; + }); + it('with unary call', function (done) { + registry = new CallRegistry(done, expected_calls); + client.echo(message, options, function (err, response) { + if (!err) { + registry.addCall('response'); + } + }); + }); + it('with client streaming call', function (done) { + registry = new CallRegistry(done, expected_calls); + var client_stream = client.echoClientStream(options, + function (err, response) { }); + client_stream.write(message, function (err) { + if (!err) { + registry.addCall('response'); + } + }); + client_stream.end(); + }); + it('with server streaming call', function (done) { + registry = new CallRegistry(done, expected_calls); + var server_stream = client.echoServerStream(message, options); + server_stream.on('data', function (data) { + registry.addCall('response'); + }); + }); + it('with bidi streaming call', function (done) { + registry = new CallRegistry(done, expected_calls); + var bidi_stream = client.echoBidiStream(options); + bidi_stream.on('data', function (data) { + registry.addCall('response'); + }); + bidi_stream.write(message); + bidi_stream.end(); + }); + }); + + describe('trigger when the stream is canceled', function() { + var done; + var message = {}; + var options; + before(function() { + var foo_interceptor = function (options, nextCall) { + var outbound = (new RequesterBuilder()) + .withCancel(function (next) { + done(); + next(); + }).build(); + return new grpc_client.InterceptingCall(nextCall(options), + outbound); + }; + options = { interceptors: [foo_interceptor] }; + }); + + it('with unary call', function(cb) { + done = cb; + var stream = client.echo(message, options, function() {}); + stream.cancel(); + }); + + it('with client streaming call', function(cb) { + done = cb; + var stream = client.echoClientStream(options, function() {}); + stream.cancel(); + }); + it('with server streaming call', function(cb) { + done = cb; + var stream = client.echoServerStream(message, options); + stream.cancel(); + }); + it('with bidi streaming call', function(cb) { + done = cb; + var stream = client.echoBidiStream(options); + stream.cancel(); + }); + }); + + describe('trigger when receiving metadata', function() { + var message = {}; + var expectedKey = 'foo'; + var expectedValue = 'bar'; + var options; + before(function() { + var foo_interceptor = function (options, nextCall) { + var outbound = (new RequesterBuilder()) + .withStart(function (metadata, listener, next) { + var new_listener = (new ListenerBuilder()).withOnReceiveMetadata( + function (metadata, next) { + metadata.add(expectedKey, expectedValue); + next(metadata); + }).build(); + next(metadata, new_listener); + }).build(); + return new grpc_client.InterceptingCall(nextCall(options), outbound); + }; + options = { interceptors: [foo_interceptor] }; + }); + + it('with unary call', function(done) { + var metadata = new Metadata(); + var unary_call = client.echo(message, metadata, options, function () {}); + unary_call.on('metadata', function (metadata) { + assert.strictEqual(metadata.get(expectedKey)[0], expectedValue); + done(); + }); + }); + it('with client streaming call', function(done) { + var metadata = new Metadata(); + var client_stream = client.echoClientStream(metadata, options, + function () {}); + client_stream.write(message); + client_stream.on('metadata', function (metadata) { + assert.strictEqual(metadata.get(expectedKey)[0], expectedValue); + done(); + }); + client_stream.end(); + }); + it('with server streaming call', function(done) { + var metadata = new Metadata(); + var server_stream = client.echoServerStream(message, metadata, options); + server_stream.on('metadata', function (metadata) { + assert.strictEqual(metadata.get(expectedKey)[0], expectedValue); + done(); + }); + server_stream.on('data', function() { }); + }); + it('with bidi streaming call', function(done) { + var metadata = new Metadata(); + var bidi_stream = client.echoBidiStream(metadata, options); + bidi_stream.on('metadata', function(metadata) { + assert.strictEqual(metadata.get(expectedKey)[0], expectedValue); + bidi_stream.end(); + done(); + }); + bidi_stream.on('data', function() { }); + bidi_stream.write(message); + }); + }); + + describe('trigger when sending messages', function() { + var originalValue = 'foo'; + var expectedValue = 'bar'; + var options; + var metadata = new Metadata(); + before(function() { + var foo_interceptor = function (options, nextCall) { + var outbound = (new RequesterBuilder()) + .withStart(function (metadata, listener, next) { + var new_listener = (new ListenerBuilder()).withOnReceiveMessage( + function (message, next) { + if (!message) { + next(message); + return; + } + assert.strictEqual(message.value, originalValue); + message.value = expectedValue; + next(message); + }).build(); + next(metadata, new_listener); + }).build(); + return new grpc_client.InterceptingCall(nextCall(options), + outbound); + }; + options = { interceptors: [foo_interceptor] }; + }); + + it('with unary call', function (done) { + var message = { value: originalValue }; + client.echo(message, metadata, options, function (err, response) { + assert.strictEqual(response.value, expectedValue); + done(); + }); + }); + it('with client streaming call', function (done) { + var message = { value: originalValue }; + var client_stream = client.echoClientStream(metadata, options, + function (err, response) { + assert.strictEqual(response.value, expectedValue); + done(); + }); + client_stream.write(message); + client_stream.end(); + }); + it('with server streaming call', function (done) { + var message = { value: originalValue }; + var server_stream = client.echoServerStream(message, metadata, options); + server_stream.on('data', function (data) { + assert.strictEqual(data.value, expectedValue); + done(); + }); + }); + it('with bidi streaming call', function (done) { + var message = { value: originalValue }; + var bidi_stream = client.echoBidiStream(metadata, options); + bidi_stream.on('data', function (data) { + assert.strictEqual(data.value, expectedValue); + done(); + }); + bidi_stream.write(message); + bidi_stream.end(); + }); + }); + + describe('trigger when receiving status', function() { + var expectedStatus = 'foo'; + var options; + var metadata = new Metadata(); + before(function() { + var foo_interceptor = function (options, nextCall) { + var outbound = (new RequesterBuilder()) + .withStart(function (metadata, listener, next) { + var new_listener = (new ListenerBuilder()).withOnReceiveStatus( + function (status, next) { + assert.strictEqual(status.code, 2); + assert.strictEqual(status.details, 'test status message'); + var new_status = { + code: 1, + details: expectedStatus, + metadata: {} + }; + next(new_status); + }).build(); + next(metadata, new_listener); + }).build(); + return new grpc_client.InterceptingCall(nextCall(options), + outbound); + }; + options = { interceptors: [foo_interceptor] }; + }); + it('with unary call', function (done) { + var message = { value: 'error' }; + var unary_call = client.echo(message, metadata, options, function () { + }); + unary_call.on('status', function (status) { + assert.strictEqual(status.code, 1); + assert.strictEqual(status.details, expectedStatus); + done(); + }); + }); + it('with client streaming call', function (done) { + var message = { value: 'error' }; + var client_stream = client.echoClientStream(metadata, options, + function () { + }); + client_stream.on('status', function (status) { + assert.strictEqual(status.code, 1); + assert.strictEqual(status.details, expectedStatus); + done(); + }); + client_stream.write(message); + client_stream.end(); + }); + + it('with server streaming call', function(done) { + var message = {value: 'error'}; + var server_stream = client.echoServerStream(message, metadata, options); + server_stream.on('error', function (err) { + }); + server_stream.on('data', function (data) { + }); + server_stream.on('status', function (status) { + assert.strictEqual(status.code, 1); + assert.strictEqual(status.details, expectedStatus); + done(); + }); + }); + + it('with bidi streaming call', function(done) { + var message = {value: 'error'}; + var bidi_stream = client.echoBidiStream(metadata, options); + bidi_stream.on('error', function(err) {}); + bidi_stream.on('data', function(data) {}); + bidi_stream.on('status', function(status) { + assert.strictEqual(status.code, 1); + assert.strictEqual(status.details, expectedStatus); + done(); + }); + bidi_stream.write(message); + bidi_stream.end(); + }); + }); + describe('delay streaming headers', function() { + var options; + var metadata = new Metadata(); + before(function() { + var foo_interceptor = function (options, nextCall) { + var startNext; + var startListener; + var startMetadata; + var methods = { + start: function (metadata, listener, next) { + startNext = next; + startListener = listener; + startMetadata = metadata; + }, + sendMessage: function (message, next) { + startMetadata.set('fromMessage', message.value); + startNext(startMetadata, startListener); + next(message); + } + }; + return new grpc_client.InterceptingCall(nextCall(options), methods); + }; + options = { interceptors: [foo_interceptor] }; + }); + + it('with client streaming call', function (done) { + var message = { value: 'foo' }; + var client_stream = client.echoClientStream(metadata, options, + function () { }); + client_stream.on('metadata', function (metadata) { + assert.equal(metadata.get('fromMessage'), 'foo'); + done(); + }); + client_stream.write(message); + client_stream.end(); + }); + it('with bidi streaming call', function (done) { + var message = { value: 'foo' }; + var bidi_stream = client.echoBidiStream(metadata, options); + bidi_stream.on('metadata', function (metadata) { + assert.equal(metadata.get('fromMessage'), 'foo'); + done(); + }); + bidi_stream.write(message); + bidi_stream.end(); + }); + }); + + describe('order of operations enforced for async interceptors', function() { + it('with unary call', function(done) { + var expected_calls = [ + 'close_b', + 'message_b', + 'start_b', + 'done' + ]; + var registry = new CallRegistry(done, expected_calls, true); + var message = {value: 'foo'}; + var interceptor_a = function(options, nextCall) { + return new InterceptingCall(nextCall(options), { + start: function(metadata, listener, next) { + setTimeout(function() { next(metadata, listener); }, 50); + }, + sendMessage: function(message, next) { + setTimeout(function () { next(message); }, 10); + } + }); + }; + var interceptor_b = function(options, nextCall) { + return new InterceptingCall(nextCall(options), { + start: function(metadata, listener, next) { + registry.addCall('start_b'); + next(metadata, listener); + }, + sendMessage: function(message, next) { + registry.addCall('message_b'); + next(message); + }, + halfClose: function(next) { + registry.addCall('close_b'); + next(); + } + }); + }; + var options = { + interceptors: [interceptor_a, interceptor_b] + }; + client.echo(message, options, function(err, response) { + assert.strictEqual(err, null); + registry.addCall('done'); + }); + }); + it('with serverStreaming call', function(done) { + var expected_calls = [ + 'close_b', + 'message_b', + 'start_b', + 'done' + ]; + var registry = new CallRegistry(done, expected_calls, true); + var message = {value: 'foo'}; + var interceptor_a = function(options, nextCall) { + return new InterceptingCall(nextCall(options), { + start: function(metadata, listener, next) { + setTimeout(function() { next(metadata, listener); }, 50); + }, + sendMessage: function(message, next) { + setTimeout(function () { next(message); }, 10); + } + }); + }; + var interceptor_b = function(options, nextCall) { + return new InterceptingCall(nextCall(options), { + start: function(metadata, listener, next) { + registry.addCall('start_b'); + next(metadata, listener); + }, + sendMessage: function(message, next) { + registry.addCall('message_b'); + next(message); + }, + halfClose: function(next) { + registry.addCall('close_b'); + next(); + } + }); + }; + var options = { + interceptors: [interceptor_a, interceptor_b] + }; + var stream = client.echoServerStream(message, options); + stream.on('data', function(response) { + assert.strictEqual(response.value, 'foo'); + registry.addCall('done'); + }); + }); + }); +}); diff --git a/packages/grpc-native-core/test/echo_service.proto b/packages/grpc-native-core/test/echo_service.proto new file mode 100644 index 000000000..414b421ec --- /dev/null +++ b/packages/grpc-native-core/test/echo_service.proto @@ -0,0 +1,34 @@ +/** + * @license + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +syntax = "proto3"; + +message EchoMessage { + string value = 1; + int32 value2 = 2; +} + +service EchoService { + rpc Echo (EchoMessage) returns (EchoMessage); + + rpc EchoClientStream (stream EchoMessage) returns (EchoMessage); + + rpc EchoServerStream (EchoMessage) returns (stream EchoMessage); + + rpc EchoBidiStream (stream EchoMessage) returns (stream EchoMessage); +}