Skip to content

Commit

Permalink
Merge baidu/sofa-pbrpc to github
Browse files Browse the repository at this point in the history
  • Loading branch information
tianye15 committed Mar 12, 2018
1 parent c2cf352 commit 1306c93
Show file tree
Hide file tree
Showing 17 changed files with 145 additions and 41 deletions.
11 changes: 1 addition & 10 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,7 @@ env:
before_install:
- echo -n | openssl s_client -connect scan.coverity.com:443 | sed -ne '/-BEGIN CERTIFICATE-/,/-END CERTIFICATE-/p' | sudo tee -a /etc/ssl/certs/ca-
install:
- echo | sudo apt-add-repository ppa:boost-latest/ppa
- sudo apt-get update
- sudo apt-get install libboost-dev
- wget https://github.com/google/protobuf/releases/download/v$PROTOBUF_VERSION/protobuf-$PROTOBUF_VERSION.tar.gz
- tar xf protobuf-$PROTOBUF_VERSION.tar.gz
- ( cd protobuf-$PROTOBUF_VERSION && ./configure && make -j4 && sudo make install && sudo ldconfig )
- wget https://github.com/google/snappy/releases/download/1.1.4/snappy-1.1.4.tar.gz
- tar xf snappy-1.1.4.tar.gz
- ( cd snappy-1.1.4 && sh -x ./autogen.sh && ./configure $DEPS_CONFIG && make -j2 && sudo make install && sudo ldconfig )
- sudo apt-get install zlib1g-dev
- sh -x ./build.sh
- git clone https://github.com/google/googletest.git
script:
- make -j4 && make install
Expand Down
3 changes: 3 additions & 0 deletions src/sofa/pbrpc/binary_rpc_request.cc
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ ReadBufferPtr BinaryRpcRequest::AssembleSucceedResponse(
RpcMessageHeader header;
int header_size = static_cast<int>(sizeof(header));
WriteBuffer write_buffer;
write_buffer.set_base_block_factor(_write_buffer_base_block_factor);

int64 header_pos = write_buffer.Reserve(header_size);
if (header_pos < 0)
{
Expand Down Expand Up @@ -192,6 +194,7 @@ ReadBufferPtr BinaryRpcRequest::AssembleFailedResponse(
RpcMessageHeader header;
int header_size = static_cast<int>(sizeof(header));
WriteBuffer write_buffer;

int64 header_pos = write_buffer.Reserve(header_size);
if (header_pos < 0)
{
Expand Down
4 changes: 3 additions & 1 deletion src/sofa/pbrpc/buffer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ WriteBuffer::WriteBuffer()
, _total_capacity(0)
, _last_bytes(0)
, _write_bytes(0)
, _base_block_factor(4)
{}

WriteBuffer::~WriteBuffer()
Expand Down Expand Up @@ -298,9 +299,10 @@ int64 WriteBuffer::ByteCount() const
bool WriteBuffer::Extend()
{
// incrementally extend block
unsigned int current_factor = _buf_list.size() + _base_block_factor;
char* block = static_cast<char*>(TranBufPool::malloc(std::min(
SOFA_PBRPC_TRAN_BUF_BLOCK_MAX_FACTOR,
(int)_buf_list.size())));
current_factor)));
if (block == NULL) return false;
_buf_list.push_back(BufHandle(block, TranBufPool::capacity(block)));
_total_block_size += TranBufPool::block_size(block);
Expand Down
11 changes: 11 additions & 0 deletions src/sofa/pbrpc/buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,16 @@ class WriteBuffer : public google::protobuf::io::ZeroCopyOutputStream
bool Append(const std::string& data);
bool Append(const char* data, int size);

void set_base_block_factor(size_t factor)
{
_base_block_factor = factor;
}

size_t base_block_factor()
{
return _base_block_factor;
}

private:
// Add a new block to the end of the buffer.
bool Extend();
Expand All @@ -131,6 +141,7 @@ class WriteBuffer : public google::protobuf::io::ZeroCopyOutputStream
int64 _total_capacity; // total capacity in the buffer
int _last_bytes; // last write bytes
int64 _write_bytes; // total write bytes
size_t _base_block_factor; //base block malloc factor

SOFA_PBRPC_DISALLOW_EVIL_CONSTRUCTORS(WriteBuffer);
}; // class WriteBuffer
Expand Down
24 changes: 16 additions & 8 deletions src/sofa/pbrpc/profiling.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@

bool PROFILING_LINKER_FALSE = false;

#if defined(SOFA_PBRPC_PROFILING)
void __attribute__((weak)) TCMallocGetHeapSample(std::string* writer);
#endif // SOFA_PBRPC_PROFILING

namespace sofa {
namespace pbrpc {
Expand Down Expand Up @@ -373,10 +375,7 @@ std::string Profiling::ProfilingPage(ProfilingType profiling_type,
Profiling::Status Profiling::DoCpuProfiling(OperationType operation_type,
std::string& profiling_file)
{
if (ProfilerStart == NULL)
{
return DISABLE;
}
#if defined(SOFA_PBRPC_PROFILING)

if (_is_cpu_profiling == true)
{
Expand All @@ -401,15 +400,15 @@ Profiling::Status Profiling::DoCpuProfiling(OperationType operation_type,
&Profiling::CpuProfilingFunc);
_profiling_thread_group->post(done);
return OK;
#else
return DISABLE;
#endif // SOFA_PBRPC_PROFILING
}

Profiling::Status Profiling::DoMemoryProfiling(OperationType operation_type,
std::string& profiling_file)
{
if (TCMallocGetHeapSample == NULL)
{
return DISABLE;
}
#if defined(SOFA_PBRPC_PROFILING)

if (_is_mem_profiling == true)
{
Expand All @@ -434,6 +433,9 @@ Profiling::Status Profiling::DoMemoryProfiling(OperationType operation_type,
&Profiling::MemoryProfilingFunc);
_profiling_thread_group->post(done);
return OK;
#else
return DISABLE;
#endif // SOFA_PBRPC_PROFILING
}

Profiling::Profiling()
Expand Down Expand Up @@ -497,6 +499,7 @@ void Profiling::CpuProfilingFunc()

void Profiling::MemoryProfilingFunc()
{
#if defined(SOFA_PBRPC_PROFILING)
if (!IsFileExist(_dir.path + MEMORY_PROFILING_PATH))
{
if (!Mkdir(_dir.path + MEMORY_PROFILING_PATH))
Expand All @@ -522,12 +525,14 @@ void Profiling::MemoryProfilingFunc()
return;
}
_is_mem_profiling = false;
#endif // SOFA_PBRPC_PROFILING
}

std::string Profiling::ShowResult(ProfilingType profiling_type,
const std::string& view_prof,
const std::string& base_prof)
{
#if defined(SOFA_PBRPC_PROFILING)
std::ostringstream oss;
std::string path = _dir.path + "/rpc_profiling/pprof.perl";
if (!IsFileExist(path))
Expand Down Expand Up @@ -634,6 +639,9 @@ std::string Profiling::ShowResult(ProfilingType profiling_type,
oss << "</script>";

return oss.str();
#else
return "";
#endif // SOFA_PBRPC_PROFILING
}

} // namespace pbrpc
Expand Down
45 changes: 40 additions & 5 deletions src/sofa/pbrpc/rpc_byte_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include <sofa/pbrpc/common_internal.h>
#include <sofa/pbrpc/rpc_endpoint.h>
#include <sofa/pbrpc/tran_buf_pool.h>

// If SOFA_PBRPC_TCP_NO_DELAY == true, means disable the Nagle algorithm.
//
Expand All @@ -25,9 +26,6 @@
// Disabling the Nagle algorithm would cause these affacts:
// * decrease delay time (positive affact)
// * decrease the qps (negative affact)
#ifndef SOFA_PBRPC_TCP_NO_DELAY
#define SOFA_PBRPC_TCP_NO_DELAY true
#endif

namespace sofa {
namespace pbrpc {
Expand All @@ -47,11 +45,44 @@ class RpcByteStream : public sofa::pbrpc::enable_shared_from_this<RpcByteStream>
, _socket(io_service)
, _connect_timeout(-1)
, _status(STATUS_INIT)
, _no_delay(true)
, _read_buffer_base_block_factor(SOFA_PBRPC_TRAN_BUF_BLOCK_MAX_FACTOR)
, _write_buffer_base_block_factor(4)
{
SOFA_PBRPC_INC_RESOURCE_COUNTER(RpcByteStream);
memset(_error_message, 0, sizeof(_error_message));
}

bool no_delay()
{
return _no_delay;
}

void set_no_delay(bool no_delay)
{
_no_delay = no_delay;
}

void set_read_buffer_base_block_factor(size_t factor)
{
_read_buffer_base_block_factor = factor;
}

size_t read_buffer_base_block_factor()
{
return _read_buffer_base_block_factor;
}

void set_write_buffer_base_block_factor(size_t factor)
{
_write_buffer_base_block_factor = factor;
}

size_t write_buffer_base_block_factor()
{
return _write_buffer_base_block_factor;
}

virtual ~RpcByteStream()
{
SOFA_PBRPC_FUNCTION_TRACE;
Expand Down Expand Up @@ -148,7 +179,7 @@ class RpcByteStream : public sofa::pbrpc::enable_shared_from_this<RpcByteStream>
_last_rw_ticks = _ticks;

boost::system::error_code ec;
_socket.set_option(tcp::no_delay(SOFA_PBRPC_TCP_NO_DELAY), ec);
_socket.set_option(tcp::no_delay(_no_delay), ec);
if (ec)
{
#if defined( LOG )
Expand Down Expand Up @@ -329,7 +360,7 @@ class RpcByteStream : public sofa::pbrpc::enable_shared_from_this<RpcByteStream>
}

boost::system::error_code ec;
_socket.set_option(tcp::no_delay(SOFA_PBRPC_TCP_NO_DELAY), ec);
_socket.set_option(tcp::no_delay(_no_delay), ec);
if (ec)
{
#if defined( LOG )
Expand Down Expand Up @@ -390,6 +421,10 @@ class RpcByteStream : public sofa::pbrpc::enable_shared_from_this<RpcByteStream>
char _error_message[128];
volatile int64 _ticks;
volatile int64 _last_rw_ticks;
bool _no_delay;

size_t _read_buffer_base_block_factor;
size_t _write_buffer_base_block_factor;

private:
deadline_timer _timer;
Expand Down
3 changes: 3 additions & 0 deletions src/sofa/pbrpc/rpc_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ struct RpcClientOptions {
// default is -1
int connect_timeout;

bool no_delay;

RpcClientOptions()
: work_thread_num(4)
, callback_thread_num(4)
Expand All @@ -49,6 +51,7 @@ struct RpcClientOptions {
, max_throughput_in(-1)
, max_throughput_out(-1)
, connect_timeout(-1)
, no_delay(true)
{}
};

Expand Down
3 changes: 2 additions & 1 deletion src/sofa/pbrpc/rpc_client_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ void RpcClientImpl::ResetOptions(const RpcClientOptions& options)
_slice_quota_out = _options.max_throughput_out == -1 ?
-1 : std::max(0L, _options.max_throughput_out * 1024L * 1024L) / _slice_count;
_max_pending_buffer_size =
std::max(0L, _options.max_pending_buffer_size * 1024L * 1024L);
std::max(0L, options.max_pending_buffer_size * 1024L * 1024L);
_keep_alive_ticks = _options.keep_alive_time == -1 ?
-1 : std::max(1, _options.keep_alive_time) * _ticks_per_second;

Expand Down Expand Up @@ -382,6 +382,7 @@ RpcClientStreamPtr RpcClientImpl::FindOrCreateStream(const RpcEndpoint& remote_e
stream->set_connect_timeout(_options.connect_timeout);
stream->set_closed_stream_callback(
boost::bind(&RpcClientImpl::OnClosed, shared_from_this(), _1));
stream->set_no_delay(_options.no_delay);

_stream_map[remote_endpoint] = stream;
create = true;
Expand Down
3 changes: 2 additions & 1 deletion src/sofa/pbrpc/rpc_message_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -738,7 +738,7 @@ class RpcMessageStream : public RpcByteStream
_tran_buf = NULL;
}
_tran_buf = reinterpret_cast<char*>(
TranBufPool::malloc(SOFA_PBRPC_TRAN_BUF_BLOCK_MAX_FACTOR));
TranBufPool::malloc(_read_buffer_base_block_factor));
if(_tran_buf == NULL)
{
#if defined( LOG )
Expand Down Expand Up @@ -829,6 +829,7 @@ class RpcMessageStream : public RpcByteStream
static const int TOKEN_LOCK = 1;
volatile int _send_token;
volatile int _receive_token;

}; // class RpcMessageStream

} // namespace pbrpc
Expand Down
13 changes: 12 additions & 1 deletion src/sofa/pbrpc/rpc_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class RpcRequest : public sofa::pbrpc::enable_shared_from_this<RpcRequest>
};

public:
RpcRequest() {}
RpcRequest():_write_buffer_base_block_factor(4) {}
virtual ~RpcRequest() {}

// The request type: BINARY or HTTP.
Expand Down Expand Up @@ -102,6 +102,16 @@ class RpcRequest : public sofa::pbrpc::enable_shared_from_this<RpcRequest>
const std::string& service_name,
const std::string& method_name);

void set_write_buffer_base_block_factor(size_t factor)
{
_write_buffer_base_block_factor = factor;
}

size_t write_buffer_base_block_factor()
{
return _write_buffer_base_block_factor;
}

void SetLocalEndpoint(const RpcEndpoint& local_endpoint)
{
_local_endpoint = local_endpoint;
Expand Down Expand Up @@ -136,6 +146,7 @@ class RpcRequest : public sofa::pbrpc::enable_shared_from_this<RpcRequest>
RpcEndpoint _local_endpoint;
RpcEndpoint _remote_endpoint;
PTime _received_time;
size_t _write_buffer_base_block_factor;
}; // class RpcRequest

} // namespace pbrpc
Expand Down
16 changes: 16 additions & 0 deletions src/sofa/pbrpc/rpc_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,19 @@ struct RpcServerOptions {

size_t io_service_pool_size;

//This controls the base memory block factor in malloc for writing. A base block is 64Bytes.
//default:4, it means we will use 64 << 4 = 1024B as a block size.
size_t write_buffer_base_block_factor;

//This controls the base memory block factor in malloc for reading. A base block is 64Bytes.
//default:9, it means we will use 64 << 9 = 32KB as a block size.
size_t read_buffer_base_block_factor;

//If disable Nagle's algorithm in tcp protocol.
//default:true
bool no_delay;


RpcServerOptions()
: work_thread_num(8)
, max_connection_count(-1)
Expand All @@ -73,6 +86,9 @@ struct RpcServerOptions {
, work_thread_init_func(NULL)
, work_thread_dest_func(NULL)
, io_service_pool_size(1)
, write_buffer_base_block_factor(4)
, read_buffer_base_block_factor(9)
, no_delay(true)
{}
};

Expand Down
12 changes: 12 additions & 0 deletions src/sofa/pbrpc/rpc_server_impl.cc
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
// Copyright (c) 2014 Baidu.com, Inc. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
//
#include <algorithm>

#include <sofa/pbrpc/rpc_server_impl.h>
#include <sofa/pbrpc/rpc_controller_impl.h>
Expand Down Expand Up @@ -394,6 +396,16 @@ void RpcServerImpl::OnAccepted(const RpcServerStreamPtr& stream)
stream->close("server not running");
return;
}

size_t read_base_block_factor = std::min(_options.read_buffer_base_block_factor,
(size_t)SOFA_PBRPC_TRAN_BUF_BLOCK_MAX_FACTOR);
stream->set_read_buffer_base_block_factor(read_base_block_factor);

size_t write_base_block_factor = std::min(_options.write_buffer_base_block_factor,
(size_t)SOFA_PBRPC_TRAN_BUF_BLOCK_MAX_FACTOR);
stream->set_write_buffer_base_block_factor(write_base_block_factor);

stream->set_no_delay(_options.no_delay);

if (_options.max_connection_count != -1 && ConnectionCount() >= _options.max_connection_count)
{
Expand Down
Loading

0 comments on commit 1306c93

Please sign in to comment.