Skip to content

Commit

Permalink
add corpc_memcached lib
Browse files Browse the repository at this point in the history
  • Loading branch information
liu.xianke committed May 15, 2018
1 parent a2832a7 commit d284608
Show file tree
Hide file tree
Showing 19 changed files with 1,135 additions and 57 deletions.
7 changes: 5 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
DerivedData
libcorpc_mac.xcworkspace/xcuserdata
test
libcorpc_test.xcworkspace
libcorpc.xcworkspace/xcuserdata
co/co.xcodeproj/xcuserdata
corpc/corpc.xcodeproj/xcuserdata
corpc_mysql/corpc_mysql.xcodeproj/xcuserdata
corpc_memcached/corpc_memcached.xcodeproj/xcuserdata
example/example_interRpc/example_interRpc.xcodeproj/xcuserdata
example/example_innerRpc/example_innerRpc.xcodeproj/xcuserdata
example/example_echoTcp/example_echoTcp.xcodeproj/xcuserdata
example/example_echoUdp/example_echoUdp.xcodeproj/xcuserdata
example/example_mysql/example_mysql.xcodeproj/xcuserdata
example/test*
example/example_memcached/example_memcached.xcodeproj/xcuserdata
tutorial/tutorial1/tutorial1.xcodeproj/xcuserdata
tutorial/tutorial2/tutorial2.xcodeproj/xcuserdata
tutorial/tutorial3/tutorial3.xcodeproj/xcuserdata
Expand Down
2 changes: 1 addition & 1 deletion corpc/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ DIR_OBJ = ./${v}/obj/
DIR_STATICLIB = ./lib/
DIR_DYNAMICLIB = ./solib/

OBJS = corpc_option.pb.o corpc_utils.o corpc_routine_env.o corpc_io.o corpc_rpc_server.o corpc_rpc_client.o corpc_inner_rpc.o corpc_message_server.o
OBJS = corpc_option.pb.o corpc_thirdparty.pb.o corpc_utils.o corpc_routine_env.o corpc_io.o corpc_rpc_server.o corpc_rpc_client.o corpc_inner_rpc.o corpc_message_server.o
OBJS_WITH_PATH = ${foreach n,${OBJS},${DIR_OBJ}${n}}

empty =
Expand Down
8 changes: 8 additions & 0 deletions corpc/corpc.xcodeproj/project.pbxproj
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
2DEA99B9208DAE3A0095C62F /* corpc_message_server.h in Headers */ = {isa = PBXBuildFile; fileRef = 2DEA99B7208DAE3A0095C62F /* corpc_message_server.h */; };
2DEA9A8C20A2E7C90095C62F /* corpc_option.pb.h in Headers */ = {isa = PBXBuildFile; fileRef = 2DEA9A8A20A2E7C90095C62F /* corpc_option.pb.h */; };
2DEA9A8D20A2E7C90095C62F /* corpc_option.pb.cc in Sources */ = {isa = PBXBuildFile; fileRef = 2DEA9A8B20A2E7C90095C62F /* corpc_option.pb.cc */; };
2DEA9A9420A98F540095C62F /* corpc_thirdparty.pb.cc in Sources */ = {isa = PBXBuildFile; fileRef = 2DEA9A9220A98F530095C62F /* corpc_thirdparty.pb.cc */; };
2DEA9A9520A98F540095C62F /* corpc_thirdparty.pb.h in Headers */ = {isa = PBXBuildFile; fileRef = 2DEA9A9320A98F530095C62F /* corpc_thirdparty.pb.h */; };
/* End PBXBuildFile section */

/* Begin PBXFileReference section */
Expand All @@ -49,6 +51,8 @@
2DEA99B7208DAE3A0095C62F /* corpc_message_server.h */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.c.h; path = corpc_message_server.h; sourceTree = "<group>"; };
2DEA9A8A20A2E7C90095C62F /* corpc_option.pb.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; name = corpc_option.pb.h; path = ../proto/corpc_option.pb.h; sourceTree = "<group>"; };
2DEA9A8B20A2E7C90095C62F /* corpc_option.pb.cc */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; name = corpc_option.pb.cc; path = ../proto/corpc_option.pb.cc; sourceTree = "<group>"; };
2DEA9A9220A98F530095C62F /* corpc_thirdparty.pb.cc */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; name = corpc_thirdparty.pb.cc; path = ../proto/corpc_thirdparty.pb.cc; sourceTree = "<group>"; };
2DEA9A9320A98F530095C62F /* corpc_thirdparty.pb.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; name = corpc_thirdparty.pb.h; path = ../proto/corpc_thirdparty.pb.h; sourceTree = "<group>"; };
/* End PBXFileReference section */

/* Begin PBXFrameworksBuildPhase section */
Expand Down Expand Up @@ -83,6 +87,8 @@
2DB2B2CE2049298900114392 /* src */ = {
isa = PBXGroup;
children = (
2DEA9A9220A98F530095C62F /* corpc_thirdparty.pb.cc */,
2DEA9A9320A98F530095C62F /* corpc_thirdparty.pb.h */,
2DEA9A8B20A2E7C90095C62F /* corpc_option.pb.cc */,
2DEA9A8A20A2E7C90095C62F /* corpc_option.pb.h */,
2DB2B2F3204931A300114392 /* corpc_rpc_client.cpp */,
Expand Down Expand Up @@ -130,6 +136,7 @@
2DB2B30D204931A500114392 /* corpc_inner_rpc.h in Headers */,
2D161BEC2058CAF10062BA35 /* corpc_utils.h in Headers */,
2DEA997C208480940095C62F /* corpc_rpc_client.h in Headers */,
2DEA9A9520A98F540095C62F /* corpc_thirdparty.pb.h in Headers */,
);
runOnlyForDeploymentPostprocessing = 0;
};
Expand Down Expand Up @@ -197,6 +204,7 @@
2DB2B30A204931A500114392 /* corpc_rpc_server.cpp in Sources */,
2DEA9A8D20A2E7C90095C62F /* corpc_option.pb.cc in Sources */,
2DB2B30E204931A500114392 /* corpc_routine_env.cpp in Sources */,
2DEA9A9420A98F540095C62F /* corpc_thirdparty.pb.cc in Sources */,
2DB2B306204931A500114392 /* corpc_io.cpp in Sources */,
);
runOnlyForDeploymentPostprocessing = 0;
Expand Down
197 changes: 197 additions & 0 deletions corpc_memcached/corpc_memcached.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
/*
* Created by Xianke Liu on 2018/5/14.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "corpc_memcached.h"

using namespace corpc;

MemcachedConnectPool::Proxy::Proxy(MemcachedConnectPool *pool) {
InnerRpcClient *client = InnerRpcClient::instance();

InnerRpcClient::Channel *channel = new InnerRpcClient::Channel(client, pool->_server);

_stub = new thirdparty::ThirdPartyService::Stub(channel, thirdparty::ThirdPartyService::STUB_OWNS_CHANNEL);
}

MemcachedConnectPool::Proxy::~Proxy() {
delete _stub;
}

void MemcachedConnectPool::Proxy::callDoneHandle(::google::protobuf::Message *request, corpc::Controller *controller) {
delete controller;
delete request;
}

memcached_st* MemcachedConnectPool::Proxy::take() {
Void *request = new Void();
thirdparty::TakeResponse *response = new thirdparty::TakeResponse();
Controller *controller = new Controller();

_stub->take(controller, request, response, NULL);

if (controller->Failed()) {
fprintf(stderr, "Rpc Call Failed : %s\n", controller->ErrorText().c_str());
return NULL;
}

memcached_st* memc = (memcached_st*)response->handle();

delete controller;
delete response;
delete request;

return memc;
}

void MemcachedConnectPool::Proxy::put(memcached_st* memc, bool error) {
thirdparty::PutRequest *request = new thirdparty::PutRequest();
Controller *controller = new Controller();

request->set_handle((intptr_t)memc);
if (error) {
request->set_error(error);
}

_stub->put(controller, request, NULL, google::protobuf::NewCallback<::google::protobuf::Message *>(&callDoneHandle, request, controller));
}

MemcachedConnectPool::MemcachedConnectPool(memcached_server_st *memcServers, uint32_t maxConnectNum, uint32_t maxIdleNum): _memcServers(memcServers), _maxConnectNum(maxConnectNum), _maxIdleNum(maxIdleNum), _realConnectCount(0) {

}

void MemcachedConnectPool::take(::google::protobuf::RpcController* controller,
const Void* request,
thirdparty::TakeResponse* response,
::google::protobuf::Closure* done) {
if (_idleList.size() > 0) {
intptr_t handle = (intptr_t)_idleList.front();
_idleList.pop_front();

response->set_handle(handle);
} else if (_realConnectCount < _maxConnectNum) {
// 建立新连接
memcached_st *memc = memcached_create(NULL);
memcached_return rc = memcached_server_push(memc, _memcServers);

if (rc == MEMCACHED_SUCCESS) {
response->set_handle((intptr_t)memc);
_realConnectCount++;
} else {
fprintf(stderr, "Couldn't add server: %s\n", memcached_strerror(memc, rc));
memcached_free(memc);
// MYSQL对象创建失败
controller->SetFailed("Couldn't add memcached server");
}
} else {
// 等待空闲连接
_waitingList.push_back(co_self());
co_yield_ct();

if (_idleList.size() == 0) {
controller->SetFailed("can't connect to memcached server");
} else {
intptr_t handle = (intptr_t)_idleList.front();
_idleList.pop_front();

response->set_handle(handle);
}
}
}

void MemcachedConnectPool::put(::google::protobuf::RpcController* controller,
const thirdparty::PutRequest* request,
Void* response,
::google::protobuf::Closure* done) {
memcached_st *memc = (memcached_st *)request->handle();

if (_idleList.size() < _maxIdleNum) {
if (request->error()) {
_realConnectCount--;
memcached_free(memc);

// 若有等待协程,尝试重连
if (_waitingList.size() > 0) {
assert(_idleList.size() == 0);

if (_realConnectCount < _maxConnectNum) {
memcached_st *memc = memcached_create(NULL);
memcached_return rc = memcached_server_push(memc, _memcServers);

if (rc == MEMCACHED_SUCCESS) {
_realConnectCount++;
_idleList.push_back(memc);

stCoRoutine_t *co = _waitingList.front();
_waitingList.pop_front();

co_resume(co);
} else {
memcached_free(memc);

// 唤醒当前所有等待协程
while (!_waitingList.empty()) {
stCoRoutine_t *co = _waitingList.front();
_waitingList.pop_front();

co_resume(co);
}
}
}
}
} else {
_idleList.push_back(memc);

if (_waitingList.size() > 0) {
assert(_idleList.size() == 1);
stCoRoutine_t *co = _waitingList.front();
_waitingList.pop_front();

co_resume(co);
}
}
} else {
assert(_waitingList.size() == 0);
_realConnectCount--;
memcached_free(memc);
}
}

MemcachedConnectPool* MemcachedConnectPool::create(memcached_server_st *memcServers, uint32_t maxConnectNum, uint32_t maxIdleNum) {
MemcachedConnectPool *pool = new MemcachedConnectPool(memcServers, maxConnectNum, maxIdleNum);
pool->init();

return pool;
}

void MemcachedConnectPool::init() {
_server = InnerRpcServer::create();
_server->registerService(this);
}

MemcachedConnectPool::Proxy* MemcachedConnectPool::getProxy() {
pid_t pid = GetPid();

auto iter = _threadProxyMap.find(pid);
if (iter != _threadProxyMap.end()) {
return iter->second;
}

// 为当前线程创建proxy
Proxy *proxy = new Proxy(this);
_threadProxyMap.insert(std::make_pair(pid, proxy));

return proxy;
}
84 changes: 84 additions & 0 deletions corpc_memcached/corpc_memcached.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Created by Xianke Liu on 2018/5/14.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#ifndef corpc_memcached_h
#define corpc_memcached_h

#include "corpc_routine_env.h"
#include "corpc_controller.h"
#include "corpc_inner_rpc.h"

#include "corpc_thirdparty.pb.h"

#include <libmemcached/memcached.h>

using namespace corpc;

class MemcachedConnectPool : public thirdparty::ThirdPartyService {
public:
class Proxy {
public:
memcached_st* take();
void put(memcached_st* memc, bool error);

private:
Proxy(MemcachedConnectPool *pool);
~Proxy();

static void callDoneHandle(::google::protobuf::Message *request, corpc::Controller *controller);

private:
thirdparty::ThirdPartyService::Stub *_stub;

public:
friend class MemcachedConnectPool;
};

public:
virtual void take(::google::protobuf::RpcController* controller,
const Void* request,
thirdparty::TakeResponse* response,
::google::protobuf::Closure* done);
virtual void put(::google::protobuf::RpcController* controller,
const thirdparty::PutRequest* request,
Void* response,
::google::protobuf::Closure* done);

public:
static MemcachedConnectPool* create(memcached_server_st *memcServers, uint32_t maxConnectNum, uint32_t maxIdleNum);

Proxy* getProxy();

private:
MemcachedConnectPool(memcached_server_st *memcServers, uint32_t maxConnectNum, uint32_t maxIdleNum);
~MemcachedConnectPool() {}

void init();
private:
memcached_server_st *_memcServers; // memcached服务器列表

uint32_t _maxConnectNum; // 与mysql数据库最多建立的连接数
uint32_t _maxIdleNum; // 最大空闲连接数量
uint32_t _realConnectCount; // 当前实际建立连接的数量

std::list<memcached_st*> _idleList; // 空闲连接表
std::list<stCoRoutine_t*> _waitingList; // 等待队列:当连接数量达到最大时,新的请求需要等待

InnerRpcServer *_server;
std::map<pid_t, Proxy*> _threadProxyMap; // 线程相关代理
};

#endif /* corpc_memcached_h */
Loading

0 comments on commit d284608

Please sign in to comment.