Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement client interceptors for grpc-native-core #59

Merged
merged 4 commits into from
Feb 28, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Pass cancel and getPeer to underlying call
  • Loading branch information
drobertduke committed Feb 28, 2018
commit 6bfb5de33718b014a269a5e410b395dbb3b412ff
126 changes: 75 additions & 51 deletions packages/grpc-native-core/src/client_interceptors.js
Original file line number Diff line number Diff line change
Expand Up @@ -707,59 +707,65 @@ function _getUnaryInterceptor(method_definition, channel, emitter, callback) {
return function (options) {
var call = common.getCall(channel, method_definition.path, options);
var first_listener;
return new InterceptingCall(null, {
start: function (metadata, listener) {
var batch = {
[grpc.opType.SEND_INITIAL_METADATA]:
metadata._getCoreRepresentation(),
};
first_listener = listener;
call.startBatch(batch, function () {});
},
sendMessage: function(message) {
var batch = {
[grpc.opType.SEND_MESSAGE]: serialize(message),
};
call.startBatch(batch, function () {});
},
halfClose: function() {
var batch = {
[grpc.opType.SEND_CLOSE_FROM_CLIENT]: true,
[grpc.opType.RECV_INITIAL_METADATA]: true,
[grpc.opType.RECV_MESSAGE]: true,
[grpc.opType.RECV_STATUS_ON_CLIENT]: true
};
call.startBatch(batch, function (err, response) {
response.status.metadata = Metadata._fromCoreRepresentation(
response.status.metadata);
var status = response.status;
var deserialized;
if (status.code === constants.status.OK) {
if (err) {
// Got a batch error, but OK status. Something went wrong
callback(err);
return;
} else {
try {
deserialized = deserialize(response.read);
} catch (e) {
/* Change status to indicate bad server response. This
* will result in passing an error to the callback */
status = {
code: constants.status.INTERNAL,
details: 'Failed to parse server response'
};
}
var final_requester = {};
final_requester.start = function (metadata, listener) {
var batch = {
[grpc.opType.SEND_INITIAL_METADATA]:
metadata._getCoreRepresentation(),
};
first_listener = listener;
call.startBatch(batch, function () { });
};
final_requester.sendMessage = function (message) {
var batch = {
[grpc.opType.SEND_MESSAGE]: serialize(message),
};
call.startBatch(batch, function () { });
};
final_requester.halfClose = function () {
var batch = {
[grpc.opType.SEND_CLOSE_FROM_CLIENT]: true,
[grpc.opType.RECV_INITIAL_METADATA]: true,
[grpc.opType.RECV_MESSAGE]: true,
[grpc.opType.RECV_STATUS_ON_CLIENT]: true
};
call.startBatch(batch, function (err, response) {
response.status.metadata = Metadata._fromCoreRepresentation(
response.status.metadata);
var status = response.status;
var deserialized;
if (status.code === constants.status.OK) {
if (err) {
// Got a batch error, but OK status. Something went wrong
callback(err);
return;
} else {
try {
deserialized = deserialize(response.read);
} catch (e) {
/* Change status to indicate bad server response. This
* will result in passing an error to the callback */
status = {
code: constants.status.INTERNAL,
details: 'Failed to parse server response'
};
}
}
response.metadata =
Metadata._fromCoreRepresentation(response.metadata);
first_listener.onReceiveMetadata(response.metadata);
first_listener.onReceiveMessage(deserialized);
first_listener.onReceiveStatus(status);
});
}
});
}
response.metadata =
Metadata._fromCoreRepresentation(response.metadata);
first_listener.onReceiveMetadata(response.metadata);
first_listener.onReceiveMessage(deserialized);
first_listener.onReceiveStatus(status);
});
};
final_requester.cancel = function () {
call.cancel();
};
final_requester.getPeer = function () {
return call.getPeer();
};
return new InterceptingCall(null, final_requester);
};
}

Expand Down Expand Up @@ -865,6 +871,12 @@ function _getClientStreamingInterceptor(method_definition, channel, emitter,
};
call.startBatch(batch, function () { });
};
final_requester.cancel = function () {
call.cancel();
};
final_requester.getPeer = function() {
return call.getPeer();
};
return new InterceptingCall(null, final_requester);
};
}
Expand Down Expand Up @@ -944,6 +956,12 @@ function _getServerStreamingInterceptor(method_definition, channel, emitter) {
call.startBatch(recv_batch, _getStreamReadCallback(emitter, call,
first_listener, deserialize));
};
final_requester.cancel = function() {
call.cancel();
};
final_requester.getPeer = function() {
return call.getPeer();
};
return new InterceptingCall(null, final_requester);
};
}
Expand Down Expand Up @@ -1041,6 +1059,12 @@ function _getBidiStreamingInterceptor(method_definition, channel, emitter) {
call.startBatch(recv_batch, _getStreamReadCallback(emitter, call,
first_listener, deserialize));
};
final_requester.cancel = function() {
call.cancel();
};
final_requester.getPeer = function() {
return call.getPeer();
};
return new InterceptingCall(null, final_requester);
};
}
Expand Down
8 changes: 8 additions & 0 deletions packages/grpc-native-core/src/common.js
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,12 @@ exports.getCall = function(channel, path, options) {
* @param {function} next Calls the next interceptor.
*/

/**
* @function GetPeerRequester
* @param {function} next Calls the next interceptor.
* @return {string}
*/

/**
* @typedef {object} grpc~Requester
* @param {MetadataRequester=} start A function triggered when the call begins.
Expand All @@ -298,6 +304,8 @@ exports.getCall = function(channel, path, options) {
* closes the call.
* @param {CancelRequester=} cancel A function triggered when the call is
* cancelled.
* @param {GetPeerRequester=} getPeer A function triggered when the endpoint is
* requested.
*/

/**
Expand Down