Skip to content

Commit

Permalink
mongodb lib
Browse files Browse the repository at this point in the history
  • Loading branch information
liu.xianke committed May 16, 2018
1 parent d284608 commit d67d941
Show file tree
Hide file tree
Showing 36 changed files with 869 additions and 4,337 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,16 @@ co/co.xcodeproj/xcuserdata
corpc/corpc.xcodeproj/xcuserdata
corpc_mysql/corpc_mysql.xcodeproj/xcuserdata
corpc_memcached/corpc_memcached.xcodeproj/xcuserdata
corpc_mongodb/corpc_mongodb.xcodeproj/xcuserdata
corpc_redis/corpc_redis.xcodeproj/xcuserdata
example/example_interRpc/example_interRpc.xcodeproj/xcuserdata
example/example_innerRpc/example_innerRpc.xcodeproj/xcuserdata
example/example_echoTcp/example_echoTcp.xcodeproj/xcuserdata
example/example_echoUdp/example_echoUdp.xcodeproj/xcuserdata
example/example_mysql/example_mysql.xcodeproj/xcuserdata
example/example_memcached/example_memcached.xcodeproj/xcuserdata
example/example_mongodb/example_mongodb.xcodeproj/xcuserdata
example/example_redis/example_redis.xcodeproj/xcuserdata
tutorial/tutorial1/tutorial1.xcodeproj/xcuserdata
tutorial/tutorial2/tutorial2.xcodeproj/xcuserdata
tutorial/tutorial3/tutorial3.xcodeproj/xcuserdata
Expand Down
32 changes: 19 additions & 13 deletions corpc_memcached/corpc_memcached.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ void MemcachedConnectPool::take(::google::protobuf::RpcController* controller,
} else {
fprintf(stderr, "Couldn't add server: %s\n", memcached_strerror(memc, rc));
memcached_free(memc);
// MYSQL对象创建失败
// memcached_st对象创建失败
controller->SetFailed("Couldn't add memcached server");
}
} else {
Expand Down Expand Up @@ -127,20 +127,26 @@ void MemcachedConnectPool::put(::google::protobuf::RpcController* controller,
assert(_idleList.size() == 0);

if (_realConnectCount < _maxConnectNum) {
memcached_st *memc = memcached_create(NULL);
memcached_return rc = memcached_server_push(memc, _memcServers);
memc = memcached_create(NULL);

if (rc == MEMCACHED_SUCCESS) {
_realConnectCount++;
_idleList.push_back(memc);

stCoRoutine_t *co = _waitingList.front();
_waitingList.pop_front();

co_resume(co);
} else {
memcached_free(memc);
if (memc) {
memcached_return rc = memcached_server_push(memc, _memcServers);

if (rc == MEMCACHED_SUCCESS) {
_realConnectCount++;
_idleList.push_back(memc);

stCoRoutine_t *co = _waitingList.front();
_waitingList.pop_front();

co_resume(co);
} else {
memcached_free(memc);
memc = NULL;
}
}

if (!memc) {
// 唤醒当前所有等待协程
while (!_waitingList.empty()) {
stCoRoutine_t *co = _waitingList.front();
Expand Down
203 changes: 203 additions & 0 deletions corpc_mongodb/corpc_mongodb.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
/*
* Created by Xianke Liu on 2018/5/15.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "corpc_mongodb.h"

using namespace corpc;

std::mutex MongodbConnectPool::_initMutex;
bool MongodbConnectPool::_initialized = false;

MongodbConnectPool::Proxy::Proxy(MongodbConnectPool *pool) {
InnerRpcClient *client = InnerRpcClient::instance();

InnerRpcClient::Channel *channel = new InnerRpcClient::Channel(client, pool->_server);

_stub = new thirdparty::ThirdPartyService::Stub(channel, thirdparty::ThirdPartyService::STUB_OWNS_CHANNEL);
}

MongodbConnectPool::Proxy::~Proxy() {
delete _stub;
}

void MongodbConnectPool::Proxy::callDoneHandle(::google::protobuf::Message *request, corpc::Controller *controller) {
delete controller;
delete request;
}

mongoc_client_t* MongodbConnectPool::Proxy::take() {
Void *request = new Void();
thirdparty::TakeResponse *response = new thirdparty::TakeResponse();
Controller *controller = new Controller();

_stub->take(controller, request, response, NULL);

if (controller->Failed()) {
fprintf(stderr, "Rpc Call Failed : %s\n", controller->ErrorText().c_str());
return NULL;
}

mongoc_client_t* mongoc = (mongoc_client_t*)response->handle();

delete controller;
delete response;
delete request;

return mongoc;
}

void MongodbConnectPool::Proxy::put(mongoc_client_t* mongoc, bool error) {
thirdparty::PutRequest *request = new thirdparty::PutRequest();
Controller *controller = new Controller();

request->set_handle((intptr_t)mongoc);
if (error) {
request->set_error(error);
}

_stub->put(controller, request, NULL, google::protobuf::NewCallback<::google::protobuf::Message *>(&callDoneHandle, request, controller));
}

MongodbConnectPool::MongodbConnectPool(std::string &uri, uint32_t maxConnectNum, uint32_t maxIdleNum): _uri(uri), _maxConnectNum(maxConnectNum), _maxIdleNum(maxIdleNum), _realConnectCount(0) {

}

void MongodbConnectPool::take(::google::protobuf::RpcController* controller,
const Void* request,
thirdparty::TakeResponse* response,
::google::protobuf::Closure* done) {
if (_idleList.size() > 0) {
intptr_t handle = (intptr_t)_idleList.front();
_idleList.pop_front();

response->set_handle(handle);
} else if (_realConnectCount < _maxConnectNum) {
// 建立新连接
mongoc_client_t *mongoc = mongoc_client_new(_uri.c_str());

if (mongoc) {
response->set_handle((intptr_t)mongoc);
_realConnectCount++;
} else {
// mongoc_client_t对象创建失败
controller->SetFailed("new mongoc client fail");
}
} else {
// 等待空闲连接
_waitingList.push_back(co_self());
co_yield_ct();

if (_idleList.size() == 0) {
controller->SetFailed("can't connect to mongodb server");
} else {
intptr_t handle = (intptr_t)_idleList.front();
_idleList.pop_front();

response->set_handle(handle);
}
}
}

void MongodbConnectPool::put(::google::protobuf::RpcController* controller,
const thirdparty::PutRequest* request,
Void* response,
::google::protobuf::Closure* done) {
mongoc_client_t *mongoc = (mongoc_client_t *)request->handle();

if (_idleList.size() < _maxIdleNum) {
if (request->error()) {
_realConnectCount--;
mongoc_client_destroy(mongoc);

// 若有等待协程,尝试重连
if (_waitingList.size() > 0) {
assert(_idleList.size() == 0);

if (_realConnectCount < _maxConnectNum) {
mongoc = mongoc_client_new(_uri.c_str());

if (mongoc) {
_realConnectCount++;
_idleList.push_back(mongoc);

stCoRoutine_t *co = _waitingList.front();
_waitingList.pop_front();

co_resume(co);
} else {
// 唤醒当前所有等待协程
while (!_waitingList.empty()) {
stCoRoutine_t *co = _waitingList.front();
_waitingList.pop_front();

co_resume(co);
}
}
}
}
} else {
_idleList.push_back(mongoc);

if (_waitingList.size() > 0) {
assert(_idleList.size() == 1);
stCoRoutine_t *co = _waitingList.front();
_waitingList.pop_front();

co_resume(co);
}
}
} else {
assert(_waitingList.size() == 0);
_realConnectCount--;
mongoc_client_destroy(mongoc);
}
}

MongodbConnectPool* MongodbConnectPool::create(std::string &uri, uint32_t maxConnectNum, uint32_t maxIdleNum) {
if (!_initialized) {
std::unique_lock<std::mutex> lock( _initMutex );

if (!_initialized) {
mongoc_init();
_initialized = true;
}
}

MongodbConnectPool *pool = new MongodbConnectPool(uri, maxConnectNum, maxIdleNum);
pool->init();

return pool;
}

void MongodbConnectPool::init() {
_server = InnerRpcServer::create();
_server->registerService(this);
}

MongodbConnectPool::Proxy* MongodbConnectPool::getProxy() {
pid_t pid = GetPid();

auto iter = _threadProxyMap.find(pid);
if (iter != _threadProxyMap.end()) {
return iter->second;
}

// 为当前线程创建proxy
Proxy *proxy = new Proxy(this);
_threadProxyMap.insert(std::make_pair(pid, proxy));

return proxy;
}
89 changes: 89 additions & 0 deletions corpc_mongodb/corpc_mongodb.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Created by Xianke Liu on 2018/5/15.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#ifndef corpc_mongodb_h
#define corpc_mongodb_h

#include "corpc_routine_env.h"
#include "corpc_controller.h"
#include "corpc_inner_rpc.h"

#include "corpc_thirdparty.pb.h"

#include <mongoc.h>

using namespace corpc;

class MongodbConnectPool : public thirdparty::ThirdPartyService {
public:
class Proxy {
public:
mongoc_client_t* take();
void put(mongoc_client_t* mongoc, bool error);

private:
Proxy(MongodbConnectPool *pool);
~Proxy();

static void callDoneHandle(::google::protobuf::Message *request, corpc::Controller *controller);

private:
thirdparty::ThirdPartyService::Stub *_stub;

public:
friend class MongodbConnectPool;
};

public:
virtual void take(::google::protobuf::RpcController* controller,
const Void* request,
thirdparty::TakeResponse* response,
::google::protobuf::Closure* done);
virtual void put(::google::protobuf::RpcController* controller,
const thirdparty::PutRequest* request,
Void* response,
::google::protobuf::Closure* done);

public:
static MongodbConnectPool* create(std::string &uri, uint32_t maxConnectNum, uint32_t maxIdleNum);

Proxy* getProxy();

private:
MongodbConnectPool(std::string &uri, uint32_t maxConnectNum, uint32_t maxIdleNum);
~MongodbConnectPool() {}

void init();

private:
static std::mutex _initMutex;
static bool _initialized;

private:
std::string _uri;

uint32_t _maxConnectNum; // 与mysql数据库最多建立的连接数
uint32_t _maxIdleNum; // 最大空闲连接数量
uint32_t _realConnectCount; // 当前实际建立连接的数量

std::list<mongoc_client_t*> _idleList; // 空闲连接表
std::list<stCoRoutine_t*> _waitingList; // 等待队列:当连接数量达到最大时,新的请求需要等待

InnerRpcServer *_server;
std::map<pid_t, Proxy*> _threadProxyMap; // 线程相关代理
};

#endif /* corpc_mongodb_h */
Loading

0 comments on commit d67d941

Please sign in to comment.