Skip to content

Commit

Permalink
add multi_server_sample
Browse files Browse the repository at this point in the history
  • Loading branch information
qinzuoyan committed Nov 18, 2014
1 parent 50f53c6 commit fc3cd81
Show file tree
Hide file tree
Showing 6 changed files with 404 additions and 0 deletions.
76 changes: 76 additions & 0 deletions sample/multi_server_sample/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# Copyright (c) 2014 Baidu.com, Inc. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file. See the AUTHORS file for names of contributors.

#-----------------------------------------------
## Sofa-pbrpc path containing `include'and `lib'.
##
## Check file exist:
## $(SOFA_PBRPC)/include/sofa/pbrpc/pbrpc.h
## $(SOFA_PBRPC)/lib/libsofa-pbrpc.a
##
SOFA_PBRPC=../../output
#-----------------------------------------------

#-----------------------------------------------
# Uncomment exactly one of the lines labelled (A), (B), and (C) below
# to switch between compilation modes.
#
OPT ?= -O2 # (A) Production use (optimized mode)
# OPT ?= -g2 # (B) Debug mode, w/ full line-level debugging symbols
# OPT ?= -O2 -g2 # (C) Profiling mode: opt, but w/debugging symbols
#-----------------------------------------------

#-----------------------------------------------
# !!! Do not change the following lines !!!
#-----------------------------------------------

include ../../depends.mk

CXX=g++
INCPATH=-I. -I$(SOFA_PBRPC)/include -I$(BOOST_HEADER_DIR) -I$(PROTOBUF_DIR)/include \
-I$(SNAPPY_DIR)/include -I$(ZLIB_DIR)/include
CXXFLAGS += $(OPT) -pipe -W -Wall -fPIC -D_GNU_SOURCE -D__STDC_LIMIT_MACROS $(INCPATH)

LIBRARY=$(SOFA_PBRPC)/lib/libsofa-pbrpc.a $(PROTOBUF_DIR)/lib/libprotobuf.a $(SNAPPY_DIR)/lib/libsnappy.a
LDFLAGS += -L$(ZLIB_DIR)/lib -lpthread -lrt -lz

PROTO_SRC=echo_service.proto
PROTO_OBJ=$(patsubst %.proto,%.pb.o,$(PROTO_SRC))
PROTO_OPTIONS=--proto_path=. --proto_path=$(SOFA_PBRPC)/include --proto_path=$(PROTOBUF_DIR)/include

BIN=server client

all: check_depends $(BIN)

.PHONY: check_depends clean

check_depends:
@if [ ! -f "$(PROTOBUF_DIR)/include/google/protobuf/message.h" ]; then echo "ERROR: need protobuf header"; exit 1; fi
@if [ ! -f "$(PROTOBUF_DIR)/lib/libprotobuf.a" ]; then echo "ERROR: need protobuf lib"; exit 1; fi
@if [ ! -f "$(PROTOBUF_DIR)/bin/protoc" ]; then echo "ERROR: need protoc binary"; exit 1; fi
@if [ ! -f "$(SNAPPY_DIR)/include/snappy.h" ]; then echo "ERROR: need snappy header"; exit 1; fi
@if [ ! -f "$(SNAPPY_DIR)/lib/libsnappy.a" ]; then echo "ERROR: need snappy lib"; exit 1; fi
@if [ ! -f "$(SOFA_PBRPC)/include/sofa/pbrpc/pbrpc.h" ]; then echo "ERROR: need sofa-pbrpc header"; exit 1; fi
@if [ ! -f "$(SOFA_PBRPC)/lib/libsofa-pbrpc.a" ]; then echo "ERROR: need sofa-pbrpc lib"; exit 1; fi

clean:
@rm -f $(BIN) *.o *.pb.*

rebuild: clean all

server: $(PROTO_OBJ) server.o
$(CXX) $^ -o $@ $(LIBRARY) $(LDFLAGS)

client: $(PROTO_OBJ) client.o
$(CXX) $^ -o $@ $(LIBRARY) $(LDFLAGS)

%.pb.o: %.pb.cc
$(CXX) $(CXXFLAGS) -c $< -o $@

%.pb.cc: %.proto
$(PROTOBUF_DIR)/bin/protoc $(PROTO_OPTIONS) --cpp_out=. $<

%.o: %.cc $(PROTO_OBJ)
$(CXX) $(CXXFLAGS) -c $< -o $@

23 changes: 23 additions & 0 deletions sample/multi_server_sample/README
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
1, make

2, start servers:
./server 127.0.0.1 12345 &>/dev/null &
./server 127.0.0.1 12346 &>/dev/null &
./server 127.0.0.1 12347 &>/dev/null &

3, start client:
./client address_list.txt

4, remove one address from `address_list.txt'.

5, signal client to reload address list from `address_list.txt':
killall -s SIGTERM client

6, add a new address into `address_list.txt'.

7, signal client to reload address list from `address_list.txt':
killall -s SIGTERM client

8, test done, stop all servers:
killall server

3 changes: 3 additions & 0 deletions sample/multi_server_sample/address_list.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
127.0.0.1:12345
127.0.0.1:12346
127.0.0.1:12347
197 changes: 197 additions & 0 deletions sample/multi_server_sample/client.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
// Copyright (c) 2014 Baidu.com, Inc. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
//
// Author: qinzuoyan01@baidu.com (Qin Zuoyan)

#include <stdio.h>
#include <signal.h>
#include <unistd.h>
#include <fstream>
#include <vector>
#include <set>

#include <sofa/pbrpc/pbrpc.h>
#include "echo_service.pb.h"

class ReloadableAddressProvider : public sofa::pbrpc::RpcChannel::AddressProvider
{
public:
typedef std::vector<sofa::pbrpc::RpcChannel::EventHandler*> EventHandlerVector;
public:
ReloadableAddressProvider(const std::string& addr_file) : _addr_file(addr_file) {
Reload();
}

virtual ~ReloadableAddressProvider() {
sofa::pbrpc::ScopedLocker<sofa::pbrpc::MutexLock> _(_lock);
for (EventHandlerVector::iterator it = _handler_list.begin();
it != _handler_list.end(); ++it) {
delete *it;
}
}

virtual void GetInitAddress(std::vector<std::string>* address_list) {
sofa::pbrpc::ScopedLocker<sofa::pbrpc::MutexLock> _(_lock);
address_list->assign(_addr_set.begin(), _addr_set.end());
}

virtual bool RegisterEventHandler(sofa::pbrpc::RpcChannel::EventHandler* event_handler) {
sofa::pbrpc::ScopedLocker<sofa::pbrpc::MutexLock> _(_lock);
_handler_list.push_back(event_handler);
return true;
}

void Reload() {
sofa::pbrpc::ScopedLocker<sofa::pbrpc::MutexLock> _(_lock);
SLOG(NOTICE, "start reloading address list from file \"%s\"", _addr_file.c_str());
// open file
std::ifstream ifs(_addr_file.c_str(), std::ifstream::in);
if (!ifs.good()) {
SLOG(ERROR, "open address list file \"%s\" fail", _addr_file.c_str());
return;
}
// read new addresses
std::set<std::string> new_addr_set;
std::string addr;
while (std::getline(ifs, addr)) {
if (!addr.empty()) {
new_addr_set.insert(addr);
}
}
// make diff
std::vector<std::string> add_list;
std::vector<std::string> remove_list;
std::set<std::string>::iterator old_it = _addr_set.begin();
std::set<std::string>::iterator old_end = _addr_set.end();
std::set<std::string>::iterator new_it = new_addr_set.begin();
std::set<std::string>::iterator new_end = new_addr_set.end();
while (old_it != old_end && new_it != new_end) {
if (*old_it == *new_it) {
// keep
++old_it;
++new_it;
}
else if (*old_it < *new_it) {
// remove
remove_list.push_back(*old_it);
++old_it;
}
else {
// add
add_list.push_back(*new_it);
++new_it;
}
}
if (old_it != old_end) {
remove_list.insert(remove_list.end(), old_it, old_end);
}
if (new_it != new_end) {
add_list.insert(add_list.end(), new_it, new_end);
}
// notice handler
if (!add_list.empty() || !remove_list.empty()) {
for (EventHandlerVector::iterator it = _handler_list.begin();
it != _handler_list.end(); ++it) {
if (!add_list.empty()) {
(*it)->OnAddressAdded(add_list);
}
if (!remove_list.empty()) {
(*it)->OnAddressRemoved(remove_list);
}
}
}
// update _addr_set
_addr_set = new_addr_set;
}

private:
std::string _addr_file;
sofa::pbrpc::MutexLock _lock;
std::set<std::string> _addr_set;
EventHandlerVector _handler_list;
};

static ReloadableAddressProvider* g_address_provider;

static void sigcatcher(int sig)
{
SLOG(NOTICE, "signal catched: %d", sig);
if (g_address_provider) {
g_address_provider->Reload();
}
}

int main(int argc, char** argv)
{
SOFA_PBRPC_SET_LOG_LEVEL(NOTICE);

if (argc < 2) {
fprintf(stderr, "USAGE: %s <address-list-file>\n", argv[0]);
return EXIT_FAILURE;
}

std::string addr_file = argv[1];
g_address_provider = new ReloadableAddressProvider(addr_file);

signal(SIGTERM, &sigcatcher);

// Define an rpc client.
sofa::pbrpc::RpcClientOptions client_options;
sofa::pbrpc::RpcClient* rpc_client = new sofa::pbrpc::RpcClient(client_options);

// Define an rpc channel.
sofa::pbrpc::RpcChannelOptions channel_options;
sofa::pbrpc::RpcChannel* rpc_channel =
new sofa::pbrpc::RpcChannel(rpc_client, g_address_provider, channel_options);

// Define an rpc stub.
sofa::pbrpc::test::EchoServer_Stub* stub =
new sofa::pbrpc::test::EchoServer_Stub(rpc_channel);

while (true) {
// Prepare parameters.
sofa::pbrpc::RpcController* cntl = new sofa::pbrpc::RpcController();
cntl->SetTimeout(3000);
sofa::pbrpc::test::EchoRequest* request = new sofa::pbrpc::test::EchoRequest();
request->set_message("Hello from qinzuoyan01");
sofa::pbrpc::test::EchoResponse* response = new sofa::pbrpc::test::EchoResponse();

// Sync call.
stub->Echo(cntl, request, response, NULL);

// Check if the request has been sent.
// If has been sent, then can get the sent bytes.
SLOG(NOTICE, "RemoteAddress=%s", cntl->RemoteAddress().c_str());
SLOG(NOTICE, "IsRequestSent=%s", cntl->IsRequestSent() ? "true" : "false");
if (cntl->IsRequestSent()) {
SLOG(NOTICE, "LocalAddress=%s", cntl->LocalAddress().c_str());
SLOG(NOTICE, "SentBytes=%ld", cntl->SentBytes());
}

// Check if failed.
if (cntl->Failed()) {
SLOG(ERROR, "request failed: %s", cntl->ErrorText().c_str());
}
else {
SLOG(NOTICE, "request succeed: %s", response->message().c_str());
}

// Destroy objects.
delete cntl;
delete request;
delete response;

sleep(1);
}

delete stub;
delete rpc_channel;
delete rpc_client;
delete g_address_provider;
g_address_provider = NULL;

return EXIT_SUCCESS;
}

/* vim: set ts=4 sw=4 sts=4 tw=100 */
16 changes: 16 additions & 0 deletions sample/multi_server_sample/echo_service.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package sofa.pbrpc.test;

option cc_generic_services = true;
option java_generic_services = true;

message EchoRequest {
required string message = 1;
}

message EchoResponse {
required string message = 1;
}

service EchoServer {
rpc Echo(EchoRequest) returns(EchoResponse);
}
Loading

0 comments on commit fc3cd81

Please sign in to comment.