Skip to content

Commit

Permalink
小优化
Browse files Browse the repository at this point in the history
  • Loading branch information
Ananfa committed Jun 22, 2024
1 parent 03c9f47 commit d0f1614
Show file tree
Hide file tree
Showing 7 changed files with 37 additions and 13 deletions.
14 changes: 8 additions & 6 deletions corpc/src/corpc_io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ std::shared_ptr<corpc::Pipeline> UdpPipelineFactory::buildPipeline(std::shared_p
return std::shared_ptr<corpc::Pipeline>( new corpc::UdpPipeline(connection, worker_, decodeFun_, encodeFun_, headSize_, maxBodySize_) );
}

Connection::Connection(int fd, IO* io, bool needHB): fd_(fd), io_(io), needHB_(needHB), routineHang_(false), routine_(NULL), sendThreadIndex_(-1), recvThreadIndex_(-1), decodeError_(false), closed_(false), isClosing_(false), canClose_(false), lastRecvHBTime_(0) {
Connection::Connection(int fd, IO* io, bool needHB): fd_(fd), io_(io), needHB_(needHB), routineHang_(false), routine_(NULL), sendThreadIndex_(-1), recvThreadIndex_(-1), decodeError_(false), closed_(false), isClosing_(false), closeSem_(0), lastRecvHBTime_(0) {
}

Connection::~Connection() {
Expand Down Expand Up @@ -747,10 +747,11 @@ DEBUG_LOG("Receiver::connectionRoutine -- 1\n");

DEBUG_LOG("Receiver::connectionRoutine -- 2\n");
// 等待写关闭
while (!connection->canClose_) {
// sleep 100 milisecond
msleep(100);
}
connection->closeSem_.wait();
//while (!connection->canClose_) {
// // sleep 100 milisecond
// msleep(100);
//}

DEBUG_LOG("Receiver::connectionRoutine -- 3\n");
close(fd);
Expand Down Expand Up @@ -962,7 +963,8 @@ void *Sender::connectionRoutine( void * arg ) {

connection->isClosing_ = true;
shutdown(connection->fd_, SHUT_RD);
connection->canClose_ = true;
connection->closeSem_.post();
//connection->canClose_ = true;

DEBUG_LOG("Sender::connectionRoutine -- routine end for fd %d\n", connection->fd_);

Expand Down
7 changes: 5 additions & 2 deletions corpc/src/corpc_io.h
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,9 @@ namespace corpc {
bool decodeError_; // 是否数据解码出错
std::atomic<bool> closed_; // 是否已关闭
std::atomic<bool> isClosing_; // 是否正在关闭
std::atomic<bool> canClose_; // 是否可调用close(当sender中fd相关协程退出时设置canClose为true,receiver中fd相关协程才可以进行close调用)

Semaphore closeSem_; // 关闭同步用信号量(注意:应该用条件变量更合适)
//std::atomic<bool> canClose_; // 是否可调用close(当sender中fd相关协程退出时设置canClose为true,receiver中fd相关协程才可以进行close调用)

public:
friend class Receiver;
Expand Down Expand Up @@ -299,7 +301,8 @@ namespace corpc {
Acceptor *acceptor_;
Worker *worker_;

PipelineFactory *pipelineFactory_;
//PipelineFactory *pipelineFactory_;
std::unique_ptr<PipelineFactory> pipelineFactory_;
};

class Acceptor {
Expand Down
2 changes: 1 addition & 1 deletion corpc/src/corpc_kcp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ int KcpMessageServer::Connection::rawOut(const char *buf, int len, ikcpcb *kcp,
KcpMessageServer::KcpMessageServer(corpc::IO *io, bool needHB, bool enableSendCRC, bool enableRecvCRC, bool enableSerial, const std::string& ip, uint16_t port): MessageServer(io, needHB, enableSendCRC, enableRecvCRC, enableSerial) {
acceptor_ = new UdpAcceptor(this, ip, port);

pipelineFactory_ = new KcpPipelineFactory(worker_, decode, encode, CORPC_MESSAGE_HEAD_SIZE, CORPC_MAX_MESSAGE_SIZE, 0, corpc::MessagePipeline::FOUR_BYTES);
pipelineFactory_.reset(new KcpPipelineFactory(worker_, decode, encode, CORPC_MESSAGE_HEAD_SIZE, CORPC_MAX_MESSAGE_SIZE, 0, corpc::MessagePipeline::FOUR_BYTES));
}

corpc::Connection *KcpMessageServer::buildConnection(int fd) {
Expand Down
4 changes: 2 additions & 2 deletions corpc/src/corpc_message_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -491,11 +491,11 @@ bool MessageServer::encode(std::shared_ptr<corpc::Connection> &connection, std::
TcpMessageServer::TcpMessageServer(corpc::IO *io, bool needHB, bool enableSendCRC, bool enableRecvCRC, bool enableSerial, const std::string& ip, uint16_t port): MessageServer(io, needHB, enableSendCRC, enableRecvCRC, enableSerial) {
acceptor_ = new TcpAcceptor(this, ip, port);

pipelineFactory_ = new TcpPipelineFactory(worker_, decode, encode, CORPC_MESSAGE_HEAD_SIZE, CORPC_MAX_MESSAGE_SIZE, 0, corpc::MessagePipeline::FOUR_BYTES);
pipelineFactory_.reset(new TcpPipelineFactory(worker_, decode, encode, CORPC_MESSAGE_HEAD_SIZE, CORPC_MAX_MESSAGE_SIZE, 0, corpc::MessagePipeline::FOUR_BYTES));
}

UdpMessageServer::UdpMessageServer(corpc::IO *io, bool needHB, bool enableSendCRC, bool enableRecvCRC, bool enableSerial, const std::string& ip, uint16_t port): MessageServer(io, needHB, enableSendCRC, enableRecvCRC, enableSerial) {
acceptor_ = new UdpAcceptor(this, ip, port);

pipelineFactory_ = new UdpPipelineFactory(worker_, decode, encode, CORPC_MESSAGE_HEAD_SIZE, CORPC_MAX_UDP_MESSAGE_SIZE);
pipelineFactory_.reset(new UdpPipelineFactory(worker_, decode, encode, CORPC_MESSAGE_HEAD_SIZE, CORPC_MAX_UDP_MESSAGE_SIZE));
}
15 changes: 14 additions & 1 deletion corpc/src/corpc_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,20 @@ namespace corpc {

public:
MPSC_NoLockQueue():head_(NULL), outqueue_(NULL) {}
~MPSC_NoLockQueue() {}
~MPSC_NoLockQueue() {
while (outqueue_) {
Node *tnode = outqueue_;
outqueue_ = outqueue_->next;
delete tnode;
}

outqueue_ = head_;
while (outqueue_) {
Node *tnode = outqueue_;
outqueue_ = outqueue_->next;
delete tnode;
}
}

void push(T& v) {
Node *newNode = new Node;
Expand Down
2 changes: 1 addition & 1 deletion corpc/src/corpc_rpc_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ RpcServer::RpcServer(IO *io, uint16_t workThreadNum, const std::string& ip, uint
worker_ = new CoroutineWorker(this);
}

pipelineFactory_ = new TcpPipelineFactory(worker_, decode, encode, CORPC_REQUEST_HEAD_SIZE, CORPC_MAX_REQUEST_SIZE, 0, corpc::MessagePipeline::FOUR_BYTES);
pipelineFactory_.reset(new TcpPipelineFactory(worker_, decode, encode, CORPC_REQUEST_HEAD_SIZE, CORPC_MAX_REQUEST_SIZE, 0, corpc::MessagePipeline::FOUR_BYTES));
}

RpcServer::~RpcServer() {}
Expand Down
6 changes: 6 additions & 0 deletions corpc/src/corpc_semaphore.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ namespace corpc {
private:
std::atomic<int> res_;
std::list<RoutineInfo> waitRoutines_;

private:
Semaphore(Semaphore const&) = delete; // copy ctor delete
Semaphore(Semaphore &&) = delete; // move ctor delete
Semaphore& operator=(Semaphore const&) = delete; // assign op. delete
Semaphore& operator=(Semaphore &&) = delete; // move assign op. delete
};

}
Expand Down

0 comments on commit d0f1614

Please sign in to comment.