Skip to content

Commit

Permalink
boardd: madly switch to boost asio for connection handling.
Browse files Browse the repository at this point in the history
  • Loading branch information
robertabcd committed May 9, 2015
1 parent 2134819 commit 5fed578
Show file tree
Hide file tree
Showing 9 changed files with 283 additions and 304 deletions.
4 changes: 2 additions & 2 deletions daemon/boardd/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ SRCROOT= ../..
.include "$(SRCROOT)/pttbbs.mk"

PROG= boardd
SRCS= boardd.c server.c session.cpp
SRCS= boardd.c server.cpp
MAN=

UTILDIR= $(SRCROOT)/util
Expand All @@ -20,7 +20,7 @@ LDADD:= $(UTILOBJ) \
$(SRCROOT)/common/sys/libcmsys.a \
$(SRCROOT)/common/osdep/libosdep.a \
$(LIBEVENT_LIBS_l) -levent_pthreads \
-lpthread -lstdc++ \
-pthread -lstdc++ -lboost_system \
$(LDADD)

.include <bsd.prog.mk>
106 changes: 96 additions & 10 deletions daemon/boardd/boardd.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
#include <var.h>
#include <perm.h>

#include "server.h"
#include "boardd.h"

#define CONVERT_TO_UTF8
Expand Down Expand Up @@ -671,6 +670,28 @@ static const struct {
{NULL, cmd_unknown}
};

int
split_args(char *line, char ***argp)
{
int argc = 0;
char *p, **argv;

if ((argv = calloc(MAX_ARGS + 1, sizeof(char *))) == NULL)
return -1;

while ((p = strsep(&line, " \t\r\n")) != NULL) {
argv[argc++] = p;

if (argc == MAX_ARGS)
break;
}

argv = realloc(argv, (argc + 1) * sizeof(char *));
*argp = argv;

return argc;
}

int
process_line(struct evbuffer *output, void *ctx, char *line)
{
Expand All @@ -689,21 +710,86 @@ process_line(struct evbuffer *output, void *ctx, char *line)
return result;
}

void
setup_client(struct event_base *base, evutil_socket_t fd,
struct sockaddr *address, int socklen)
{
session_start(base, fd, address, socklen);
}

void
setup_program()
{
setuid(BBSUID);
setgid(BBSGID);
chdir(BBSHOME);

session_init();

attach_SHM();
}

int main(int argc, char *argv[])
{
int ch, run_as_daemon = 1;
const char *iface_ip = "127.0.0.1:5150";

while ((ch = getopt(argc, argv, "Dl:h")) != -1)
switch (ch) {
case 'D':
run_as_daemon = 0;
break;
case 'l':
iface_ip = optarg;
break;
case 'h':
default:
fprintf(stderr, "Usage: %s [-D] [-l interface_ip:port]\n", argv[0]);
exit(EXIT_FAILURE);
}

if (run_as_daemon)
if (daemon(1, 1) < 0) {
perror("daemon");
exit(EXIT_FAILURE);
}

setup_program();

signal(SIGPIPE, SIG_IGN);

char *ipport = strdup(iface_ip);
char *ip = strtok(ipport, ":");
char *port = strtok(NULL, ":");
start_server(ip, atoi(port));
free(ipport);

return 0;
}

#ifdef __linux__

int
daemon(int nochdir, int noclose)
{
int fd;

switch (fork()) {
case -1:
return -1;
case 0:
break;
default:
_exit(0);
}

if (setsid() == -1)
return -1;

if (!nochdir)
chdir("/");

if (!noclose && (fd = open("/dev/null", O_RDWR)) >= 0) {
dup2(fd, 0);
dup2(fd, 1);
dup2(fd, 2);

if (fd > 2)
close(fd);
}

return 0;
}

#endif // __linux__
13 changes: 7 additions & 6 deletions daemon/boardd/boardd.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@
# define _BOARDD_H

int process_line(struct evbuffer *output, void *ctx, char *line);
void start_server(const char *host, unsigned short port);

void session_start(struct event_base *base, evutil_socket_t fd,
struct sockaddr *address, int socklen);

void session_init();
void session_shutdown();

#ifndef NUM_THREADS
#define NUM_THREADS 8
#endif

#ifndef MAX_ARGS
#define MAX_ARGS 100
#endif

#endif
1 change: 0 additions & 1 deletion daemon/boardd/server.c

This file was deleted.

178 changes: 178 additions & 0 deletions daemon/boardd/server.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
// Copyright (c) 2015, Robert Wang <robert@arctic.tw>
// The MIT License.
#include <iostream>
#include <memory>
#include <functional>
#include <list>
#include <thread>
#include <mutex>
#include <boost/asio.hpp>
extern "C" {
#include <event2/buffer.h>
#include <event2/bufferevent.h>
#include "boardd.h"
}

namespace asio = boost::asio;
using boost::system::error_code;
using boost::asio::ip::tcp;

class Conn : public std::enable_shared_from_this<Conn> {
public:
using Ptr = std::shared_ptr<Conn>;

static Ptr Create(asio::io_service &io_service) {
return Ptr(new Conn(io_service));
}

virtual ~Conn() {
evbuffer_free(pending_output_);
}

tcp::socket& Socket() { return socket_; }

void Start() {
std::lock_guard<std::mutex> lk(mutex_);
ReadLine();
ResetTimer(true);
}

private:
Conn(asio::io_service &io_service)
: socket_(io_service)
, timer_(io_service)
, pending_output_(evbuffer_new()) { }

void ReadLine() {
asio::async_read_until(socket_, buffer_, '\n',
std::bind(&Conn::OnReadLines,
shared_from_this(),
std::placeholders::_1,
std::placeholders::_2));
}

void OnReadLines(const error_code &ec, size_t bytes) {
std::lock_guard<std::mutex> lk(mutex_);
if (ec) {
Close();
return;
}
ResetTimer();
std::string line;
size_t i = 0, consumed = 0;
auto data = buffer_.data();
for (auto it = asio::buffers_begin(data),
end = asio::buffers_end(data);
it != end;
++it) {
const char c = *it;
++i;
line.push_back(c);
if (c == '\n') {
consumed = i;
process_line(pending_output_, nullptr, strdup(line.c_str()));
line.clear();
}
}
buffer_.consume(consumed);

size_t total = evbuffer_get_length(pending_output_);
asio::const_buffers_1 output(
evbuffer_pullup(pending_output_, total), total);
async_write(socket_, output,
std::bind(&Conn::OnWriteCompleted, shared_from_this(),
std::placeholders::_1, std::placeholders::_2));
ResetTimer();
}

void OnWriteCompleted(const error_code &ec, size_t bytes) {
std::lock_guard<std::mutex> lk(mutex_);
if (ec) {
Close();
return;
}
evbuffer_drain(pending_output_, evbuffer_get_length(pending_output_));
ReadLine();
ResetTimer();
}

void ResetTimer(bool is_first_time = false) {
using boost::posix_time::seconds;
if (timer_.expires_from_now(seconds(kTimeoutSeconds)) > 0 ||
is_first_time) {
timer_.async_wait(std::bind(&Conn::OnTimeout,
shared_from_this(),
std::placeholders::_1));
}
}

void OnTimeout(const error_code &ec) {
std::lock_guard<std::mutex> lk(mutex_);
if (ec != asio::error::operation_aborted) {
Close();
}
}

void Close() {
socket_.close();
}

std::mutex mutex_;
tcp::socket socket_;
asio::deadline_timer timer_;
asio::streambuf buffer_;
evbuffer *pending_output_;

static const int kTimeoutSeconds = 60;
};

class Server {
public:
Server(asio::io_service &io_service,
const std::string &bind, unsigned short port)
: acceptor_(io_service,
tcp::endpoint(asio::ip::address::from_string(bind), port))
{ StartAccept(); }

private:
void StartAccept() {
auto conn = Conn::Create(acceptor_.get_io_service());
acceptor_.async_accept(conn->Socket(),
std::bind(&Server::HandleAccept,
this, conn, std::placeholders::_1));
}

void HandleAccept(Conn::Ptr conn, const error_code &ec) {
if (!ec) {
conn->Start();
}
StartAccept();
}

tcp::acceptor acceptor_;
};

void ServiceThread(asio::io_service &io_service) {
try {
io_service.run();
} catch (std::exception &ex) {
std::cerr << ex.what() << std::endl;
exit(1);
}
}

void start_server(const char *host, unsigned short port) {
std::cerr << "boardd: built with mulit-threading, "
<< NUM_THREADS << " threads." << std::endl;

std::list<std::thread> threads;
asio::io_service io_service;
Server server(io_service, host, port);
for (int i = 0; i < NUM_THREADS; ++i) {
threads.push_back(std::move(
std::thread(std::bind(&ServiceThread, std::ref(io_service)))));
}
for (auto &thr : threads) {
thr.join();
}
}
1 change: 0 additions & 1 deletion daemon/boardd/server.h

This file was deleted.

Loading

0 comments on commit 5fed578

Please sign in to comment.