Skip to content

Commit

Permalink
Update the message allocator API
Browse files Browse the repository at this point in the history
  • Loading branch information
yang-g committed May 16, 2019
1 parent 7de22db commit 51210ba
Show file tree
Hide file tree
Showing 3 changed files with 148 additions and 139 deletions.
47 changes: 29 additions & 18 deletions include/grpcpp/impl/codegen/message_allocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,31 +22,42 @@
namespace grpc {
namespace experimental {

// This is per rpc struct for the allocator. We can potentially put the grpc
// call arena in here in the future.
// NOTE: This is an API for advanced users who need custom allocators.
// Per rpc struct for the allocator. This is the interface to return to user.
class RpcAllocatorState {
public:
virtual ~RpcAllocatorState() = default;
// Optionally deallocate request early to reduce the size of working set.
// A custom MessageAllocator needs to be registered to make use of this.
// This is not abstract because implementing it is optional.
virtual void FreeRequest() {}
};

// This is the interface returned by the allocator.
// grpc library will call the methods to get request/response pointers and to
// release the object when it is done.
template <typename RequestT, typename ResponseT>
struct RpcAllocatorInfo {
RequestT* request;
ResponseT* response;
// per rpc allocator internal state. MessageAllocator can set it when
// AllocateMessages is called and use it later.
void* allocator_state;
class MessageHolder : public RpcAllocatorState {
public:
virtual void Release() { delete this; }
RequestT* request() { return request_; }
ResponseT* response() { return response_; }

protected:
// NOTE: subclasses should set these pointers.
RequestT* request_;
ResponseT* response_;
};

// Implementations need to be thread-safe
// A custom allocator can be set via the generated code to a callback unary
// method, such as SetMessageAllocatorFor_Echo(custom_allocator). The allocator
// needs to be alive for the lifetime of the server.
// Implementations need to be thread-safe.
template <typename RequestT, typename ResponseT>
class MessageAllocator {
public:
virtual ~MessageAllocator() = default;
// Allocate both request and response
virtual void AllocateMessages(
RpcAllocatorInfo<RequestT, ResponseT>* info) = 0;
// Optional: deallocate request early, called by
// ServerCallbackRpcController::ReleaseRequest
virtual void DeallocateRequest(RpcAllocatorInfo<RequestT, ResponseT>* info) {}
// Deallocate response and request (if applicable)
virtual void DeallocateMessages(
RpcAllocatorInfo<RequestT, ResponseT>* info) = 0;
virtual MessageHolder<RequestT, ResponseT>* AllocateMessages() = 0;
};

} // namespace experimental
Expand Down
101 changes: 42 additions & 59 deletions include/grpcpp/impl/codegen/server_callback.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,24 @@ class ServerReactor {
std::atomic_int on_cancel_conditions_remaining_{2};
};

template <class Request, class Response>
class DefaultMessageHolder
: public experimental::MessageHolder<Request, Response> {
public:
DefaultMessageHolder() {
this->request_ = &request_obj_;
this->response_ = &response_obj_;
}
void Release() override {
// the object is allocated in the call arena.
this->~DefaultMessageHolder<Request, Response>();
}

private:
Request request_obj_;
Response response_obj_;
};

} // namespace internal

namespace experimental {
Expand Down Expand Up @@ -137,13 +155,9 @@ class ServerCallbackRpcController {
virtual void SetCancelCallback(std::function<void()> callback) = 0;
virtual void ClearCancelCallback() = 0;

// NOTE: This is an API for advanced users who need custom allocators.
// Optionally deallocate request early to reduce the size of working set.
// A custom MessageAllocator needs to be registered to make use of this.
virtual void FreeRequest() = 0;
// NOTE: This is an API for advanced users who need custom allocators.
// Get and maybe mutate the allocator state associated with the current RPC.
virtual void* GetAllocatorState() = 0;
virtual RpcAllocatorState* GetRpcAllocatorState() = 0;
};

// NOTE: The actual streaming object classes are provided
Expand Down Expand Up @@ -465,13 +479,13 @@ class CallbackUnaryHandler : public MethodHandler {
void RunHandler(const HandlerParameter& param) final {
// Arena allocate a controller structure (that includes request/response)
g_core_codegen_interface->grpc_call_ref(param.call->call());
auto* allocator_info =
static_cast<experimental::RpcAllocatorInfo<RequestType, ResponseType>*>(
auto* allocator_state =
static_cast<experimental::MessageHolder<RequestType, ResponseType>*>(
param.internal_data);
auto* controller = new (g_core_codegen_interface->grpc_call_arena_alloc(
param.call->call(), sizeof(ServerCallbackRpcControllerImpl)))
ServerCallbackRpcControllerImpl(param.server_context, param.call,
allocator_info, allocator_,
allocator_state,
std::move(param.call_requester));
Status status = param.status;
if (status.ok()) {
Expand All @@ -489,36 +503,24 @@ class CallbackUnaryHandler : public MethodHandler {
ByteBuffer buf;
buf.set_buffer(req);
RequestType* request = nullptr;
experimental::RpcAllocatorInfo<RequestType, ResponseType>* allocator_info =
new (g_core_codegen_interface->grpc_call_arena_alloc(
call, sizeof(*allocator_info)))
experimental::RpcAllocatorInfo<RequestType, ResponseType>();
experimental::MessageHolder<RequestType, ResponseType>* allocator_state =
nullptr;
if (allocator_ != nullptr) {
allocator_->AllocateMessages(allocator_info);
allocator_state = allocator_->AllocateMessages();
} else {
allocator_info->request =
new (g_core_codegen_interface->grpc_call_arena_alloc(
call, sizeof(RequestType))) RequestType();
allocator_info->response =
new (g_core_codegen_interface->grpc_call_arena_alloc(
call, sizeof(ResponseType))) ResponseType();
allocator_state = new (g_core_codegen_interface->grpc_call_arena_alloc(
call, sizeof(DefaultMessageHolder<RequestType, ResponseType>)))
DefaultMessageHolder<RequestType, ResponseType>();
}
*handler_data = allocator_info;
request = allocator_info->request;
*handler_data = allocator_state;
request = allocator_state->request();
*status = SerializationTraits<RequestType>::Deserialize(&buf, request);
buf.Release();
if (status->ok()) {
return request;
}
// Clean up on deserialization failure.
if (allocator_ != nullptr) {
allocator_->DeallocateMessages(allocator_info);
} else {
allocator_info->request->~RequestType();
allocator_info->response->~ResponseType();
allocator_info->request = nullptr;
allocator_info->response = nullptr;
}
allocator_state->Release();
return nullptr;
}

Expand Down Expand Up @@ -548,9 +550,8 @@ class CallbackUnaryHandler : public MethodHandler {
}
// The response is dropped if the status is not OK.
if (s.ok()) {
finish_ops_.ServerSendStatus(
&ctx_->trailing_metadata_,
finish_ops_.SendMessagePtr(allocator_info_->response));
finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
finish_ops_.SendMessagePtr(response()));
} else {
finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
}
Expand Down Expand Up @@ -588,50 +589,32 @@ class CallbackUnaryHandler : public MethodHandler {

void ClearCancelCallback() override { ctx_->ClearCancelCallback(); }

void FreeRequest() override {
if (allocator_ != nullptr) {
allocator_->DeallocateRequest(allocator_info_);
}
}

void* GetAllocatorState() override {
return allocator_info_->allocator_state;
experimental::RpcAllocatorState* GetRpcAllocatorState() override {
return allocator_state_;
}

private:
friend class CallbackUnaryHandler<RequestType, ResponseType>;

ServerCallbackRpcControllerImpl(
ServerContext* ctx, Call* call,
experimental::RpcAllocatorInfo<RequestType, ResponseType>*
allocator_info,
experimental::MessageAllocator<RequestType, ResponseType>* allocator,
experimental::MessageHolder<RequestType, ResponseType>* allocator_state,
std::function<void()> call_requester)
: ctx_(ctx),
call_(*call),
allocator_info_(allocator_info),
allocator_(allocator),
allocator_state_(allocator_state),
call_requester_(std::move(call_requester)) {
ctx_->BeginCompletionOp(call, [this](bool) { MaybeDone(); }, nullptr);
}

const RequestType* request() { return allocator_info_->request; }
ResponseType* response() { return allocator_info_->response; }
const RequestType* request() { return allocator_state_->request(); }
ResponseType* response() { return allocator_state_->response(); }

void MaybeDone() {
if (--callbacks_outstanding_ == 0) {
grpc_call* call = call_.call();
auto call_requester = std::move(call_requester_);
if (allocator_ != nullptr) {
allocator_->DeallocateMessages(allocator_info_);
} else {
if (allocator_info_->request != nullptr) {
allocator_info_->request->~RequestType();
}
if (allocator_info_->response != nullptr) {
allocator_info_->response->~ResponseType();
}
}
allocator_state_->Release();
this->~ServerCallbackRpcControllerImpl(); // explicitly call destructor
g_core_codegen_interface->grpc_call_unref(call);
call_requester();
Expand All @@ -647,8 +630,8 @@ class CallbackUnaryHandler : public MethodHandler {

ServerContext* ctx_;
Call call_;
experimental::RpcAllocatorInfo<RequestType, ResponseType>* allocator_info_;
experimental::MessageAllocator<RequestType, ResponseType>* allocator_;
experimental::MessageHolder<RequestType, ResponseType>* const
allocator_state_;
std::function<void()> call_requester_;
std::atomic_int callbacks_outstanding_{
2}; // reserve for Finish and CompletionOp
Expand Down
Loading

0 comments on commit 51210ba

Please sign in to comment.