diff --git a/daemon/boardd/Makefile b/daemon/boardd/Makefile index f0c83f590..bd3c0a3ca 100644 --- a/daemon/boardd/Makefile +++ b/daemon/boardd/Makefile @@ -4,7 +4,7 @@ SRCROOT= ../.. .include "$(SRCROOT)/pttbbs.mk" PROG= boardd -SRCS= boardd.c server.c session.cpp +SRCS= boardd.c server.cpp MAN= UTILDIR= $(SRCROOT)/util @@ -20,7 +20,7 @@ LDADD:= $(UTILOBJ) \ $(SRCROOT)/common/sys/libcmsys.a \ $(SRCROOT)/common/osdep/libosdep.a \ $(LIBEVENT_LIBS_l) -levent_pthreads \ - -lpthread -lstdc++ \ + -pthread -lstdc++ -lboost_system \ $(LDADD) .include diff --git a/daemon/boardd/boardd.c b/daemon/boardd/boardd.c index fdac12a4e..512f0af2a 100644 --- a/daemon/boardd/boardd.c +++ b/daemon/boardd/boardd.c @@ -26,7 +26,6 @@ #include #include -#include "server.h" #include "boardd.h" #define CONVERT_TO_UTF8 @@ -671,6 +670,28 @@ static const struct { {NULL, cmd_unknown} }; +int +split_args(char *line, char ***argp) +{ + int argc = 0; + char *p, **argv; + + if ((argv = calloc(MAX_ARGS + 1, sizeof(char *))) == NULL) + return -1; + + while ((p = strsep(&line, " \t\r\n")) != NULL) { + argv[argc++] = p; + + if (argc == MAX_ARGS) + break; + } + + argv = realloc(argv, (argc + 1) * sizeof(char *)); + *argp = argv; + + return argc; +} + int process_line(struct evbuffer *output, void *ctx, char *line) { @@ -689,13 +710,6 @@ process_line(struct evbuffer *output, void *ctx, char *line) return result; } -void -setup_client(struct event_base *base, evutil_socket_t fd, - struct sockaddr *address, int socklen) -{ - session_start(base, fd, address, socklen); -} - void setup_program() { @@ -703,7 +717,79 @@ setup_program() setgid(BBSGID); chdir(BBSHOME); - session_init(); - attach_SHM(); } + +int main(int argc, char *argv[]) +{ + int ch, run_as_daemon = 1; + const char *iface_ip = "127.0.0.1:5150"; + + while ((ch = getopt(argc, argv, "Dl:h")) != -1) + switch (ch) { + case 'D': + run_as_daemon = 0; + break; + case 'l': + iface_ip = optarg; + break; + case 'h': + default: + fprintf(stderr, "Usage: %s [-D] [-l interface_ip:port]\n", argv[0]); + exit(EXIT_FAILURE); + } + + if (run_as_daemon) + if (daemon(1, 1) < 0) { + perror("daemon"); + exit(EXIT_FAILURE); + } + + setup_program(); + + signal(SIGPIPE, SIG_IGN); + + char *ipport = strdup(iface_ip); + char *ip = strtok(ipport, ":"); + char *port = strtok(NULL, ":"); + start_server(ip, atoi(port)); + free(ipport); + + return 0; +} + +#ifdef __linux__ + +int +daemon(int nochdir, int noclose) +{ + int fd; + + switch (fork()) { + case -1: + return -1; + case 0: + break; + default: + _exit(0); + } + + if (setsid() == -1) + return -1; + + if (!nochdir) + chdir("/"); + + if (!noclose && (fd = open("/dev/null", O_RDWR)) >= 0) { + dup2(fd, 0); + dup2(fd, 1); + dup2(fd, 2); + + if (fd > 2) + close(fd); + } + + return 0; +} + +#endif // __linux__ diff --git a/daemon/boardd/boardd.h b/daemon/boardd/boardd.h index 91d362a99..00029e4a2 100644 --- a/daemon/boardd/boardd.h +++ b/daemon/boardd/boardd.h @@ -2,13 +2,14 @@ # define _BOARDD_H int process_line(struct evbuffer *output, void *ctx, char *line); +void start_server(const char *host, unsigned short port); -void session_start(struct event_base *base, evutil_socket_t fd, - struct sockaddr *address, int socklen); - -void session_init(); -void session_shutdown(); - +#ifndef NUM_THREADS #define NUM_THREADS 8 +#endif + +#ifndef MAX_ARGS +#define MAX_ARGS 100 +#endif #endif diff --git a/daemon/boardd/server.c b/daemon/boardd/server.c deleted file mode 120000 index 88ec683ff..000000000 --- a/daemon/boardd/server.c +++ /dev/null @@ -1 +0,0 @@ -../barebone/server.c \ No newline at end of file diff --git a/daemon/boardd/server.cpp b/daemon/boardd/server.cpp new file mode 100644 index 000000000..3447627d3 --- /dev/null +++ b/daemon/boardd/server.cpp @@ -0,0 +1,178 @@ +// Copyright (c) 2015, Robert Wang +// The MIT License. +#include +#include +#include +#include +#include +#include +#include +extern "C" { +#include +#include +#include "boardd.h" +} + +namespace asio = boost::asio; +using boost::system::error_code; +using boost::asio::ip::tcp; + +class Conn : public std::enable_shared_from_this { + public: + using Ptr = std::shared_ptr; + + static Ptr Create(asio::io_service &io_service) { + return Ptr(new Conn(io_service)); + } + + virtual ~Conn() { + evbuffer_free(pending_output_); + } + + tcp::socket& Socket() { return socket_; } + + void Start() { + std::lock_guard lk(mutex_); + ReadLine(); + ResetTimer(true); + } + + private: + Conn(asio::io_service &io_service) + : socket_(io_service) + , timer_(io_service) + , pending_output_(evbuffer_new()) { } + + void ReadLine() { + asio::async_read_until(socket_, buffer_, '\n', + std::bind(&Conn::OnReadLines, + shared_from_this(), + std::placeholders::_1, + std::placeholders::_2)); + } + + void OnReadLines(const error_code &ec, size_t bytes) { + std::lock_guard lk(mutex_); + if (ec) { + Close(); + return; + } + ResetTimer(); + std::string line; + size_t i = 0, consumed = 0; + auto data = buffer_.data(); + for (auto it = asio::buffers_begin(data), + end = asio::buffers_end(data); + it != end; + ++it) { + const char c = *it; + ++i; + line.push_back(c); + if (c == '\n') { + consumed = i; + process_line(pending_output_, nullptr, strdup(line.c_str())); + line.clear(); + } + } + buffer_.consume(consumed); + + size_t total = evbuffer_get_length(pending_output_); + asio::const_buffers_1 output( + evbuffer_pullup(pending_output_, total), total); + async_write(socket_, output, + std::bind(&Conn::OnWriteCompleted, shared_from_this(), + std::placeholders::_1, std::placeholders::_2)); + ResetTimer(); + } + + void OnWriteCompleted(const error_code &ec, size_t bytes) { + std::lock_guard lk(mutex_); + if (ec) { + Close(); + return; + } + evbuffer_drain(pending_output_, evbuffer_get_length(pending_output_)); + ReadLine(); + ResetTimer(); + } + + void ResetTimer(bool is_first_time = false) { + using boost::posix_time::seconds; + if (timer_.expires_from_now(seconds(kTimeoutSeconds)) > 0 || + is_first_time) { + timer_.async_wait(std::bind(&Conn::OnTimeout, + shared_from_this(), + std::placeholders::_1)); + } + } + + void OnTimeout(const error_code &ec) { + std::lock_guard lk(mutex_); + if (ec != asio::error::operation_aborted) { + Close(); + } + } + + void Close() { + socket_.close(); + } + + std::mutex mutex_; + tcp::socket socket_; + asio::deadline_timer timer_; + asio::streambuf buffer_; + evbuffer *pending_output_; + + static const int kTimeoutSeconds = 60; +}; + +class Server { + public: + Server(asio::io_service &io_service, + const std::string &bind, unsigned short port) + : acceptor_(io_service, + tcp::endpoint(asio::ip::address::from_string(bind), port)) + { StartAccept(); } + + private: + void StartAccept() { + auto conn = Conn::Create(acceptor_.get_io_service()); + acceptor_.async_accept(conn->Socket(), + std::bind(&Server::HandleAccept, + this, conn, std::placeholders::_1)); + } + + void HandleAccept(Conn::Ptr conn, const error_code &ec) { + if (!ec) { + conn->Start(); + } + StartAccept(); + } + + tcp::acceptor acceptor_; +}; + +void ServiceThread(asio::io_service &io_service) { + try { + io_service.run(); + } catch (std::exception &ex) { + std::cerr << ex.what() << std::endl; + exit(1); + } +} + +void start_server(const char *host, unsigned short port) { + std::cerr << "boardd: built with mulit-threading, " + << NUM_THREADS << " threads." << std::endl; + + std::list threads; + asio::io_service io_service; + Server server(io_service, host, port); + for (int i = 0; i < NUM_THREADS; ++i) { + threads.push_back(std::move( + std::thread(std::bind(&ServiceThread, std::ref(io_service))))); + } + for (auto &thr : threads) { + thr.join(); + } +} diff --git a/daemon/boardd/server.h b/daemon/boardd/server.h deleted file mode 120000 index 75e7076e3..000000000 --- a/daemon/boardd/server.h +++ /dev/null @@ -1 +0,0 @@ -../barebone/server.h \ No newline at end of file diff --git a/daemon/boardd/session.cpp b/daemon/boardd/session.cpp deleted file mode 100644 index 419c1b321..000000000 --- a/daemon/boardd/session.cpp +++ /dev/null @@ -1,177 +0,0 @@ -// Copyright (c) 2015, Robert Wang -// The MIT License. -#include "session.hpp" -#include "threadpool.hpp" -extern "C" { -#include -#include -#include -#include "server.h" -#include "boardd.h" -} - -namespace { - -class LineProcessJob -{ -public: - LineProcessJob(Session *session, void *ctx, char *line) - : session_(session), ctx_(ctx), line_(line) - { - session_->add_ref(); - } - - ~LineProcessJob() - { - session_->dec_ref(); - } - - void Run() - { - session_->process_line(ctx_, line_); - } - -private: - // Ref-counted - Session *session_; - - // Not owned - void *ctx_; - char *line_; -}; - -static ThreadPool *g_threadpool; - -void read_cb(struct bufferevent *bev, void *ctx) -{ - reinterpret_cast(ctx)->on_read(); -} - -void event_cb(struct bufferevent *bev, short events, void *ctx) -{ - reinterpret_cast(ctx)->on_error(events); -} - -void blackhole_read_cb(bufferevent *bev, void *ctx) -{ - evbuffer *buf = bufferevent_get_input(bev); - evbuffer_drain(buf, evbuffer_get_length(buf)); -} - -} // namespace - -void session_init() -{ -#ifdef BOARDD_MT - fprintf(stderr, "boardd: built with mulit-threading, %d threads.\n", - NUM_THREADS); - g_threadpool = new ThreadPool(NUM_THREADS); -#endif -} - -void session_shutdown() -{ - delete g_threadpool; -} - -void session_start(struct event_base *base, evutil_socket_t fd, - struct sockaddr *address, int socklen) -{ - new Session(process_line, base, fd, address, socklen); -} - -Session::Session(LineFunc process_line, - struct event_base *base, evutil_socket_t fd, - struct sockaddr *address, int socklen) - : process_line_(process_line) -{ - bev_ = bufferevent_socket_new( - base, fd, - BEV_OPT_CLOSE_ON_FREE | BEV_OPT_DEFER_CALLBACKS | BEV_OPT_THREADSAFE); - bufferevent_setcb(bev_, read_cb, nullptr, event_cb, this); - bufferevent_set_timeouts(bev_, common_timeout, common_timeout); - bufferevent_enable(bev_, EV_READ|EV_WRITE); - add_ref(); -} - -Session::~Session() -{} - -void Session::on_error(short events) -{ - if (events & BEV_EVENT_ERROR) - perror("Error from bufferevent"); - if (events & (BEV_EVENT_EOF | BEV_EVENT_TIMEOUT | BEV_EVENT_ERROR)) { - shutdown(); - } -} - -void Session::on_read() -{ - LineProcessJob *job; - - { - std::lock_guard _(mutex_); - - if (!bev_) - return; - - size_t len; - struct evbuffer *input = bufferevent_get_input(bev_); - char *line = evbuffer_readln(input, &len, EVBUFFER_EOL_CRLF); - - if (!line) - return; - - job = new LineProcessJob(this, nullptr, line); -#ifdef BOARDD_MT - // One request at a time. - bufferevent_disable(bev_, EV_READ); -#endif - } - -#ifdef BOARDD_MT - g_threadpool->Do(job); -#else - // This must be put out of lock to prevent recursive locking. - job->Run(); - delete job; -#endif -} - -// Call from a worker thread -void Session::process_line(void *ctx, char *line) -{ - evbuffer *output = evbuffer_new(); - if (process_line_(output, ctx, line) == 0) { - send_and_resume(output); - } else { - shutdown(); - } - evbuffer_free(output); -} - -void Session::shutdown() -{ - bool detached = false; - { - std::lock_guard _(mutex_); - if (bev_) { - bufferevent_setcb(bev_, blackhole_read_cb, nullptr, nullptr, nullptr); - bufferevent_free(bev_); - bev_ = nullptr; - detached = true; - } - } - if (detached) - dec_ref(); -} - -void Session::send_and_resume(evbuffer *buf) -{ - std::lock_guard _(mutex_); - if (bev_) { - evbuffer_add_buffer(bufferevent_get_output(bev_), buf); - bufferevent_enable(bev_, EV_READ); - } -} diff --git a/daemon/boardd/session.hpp b/daemon/boardd/session.hpp deleted file mode 100644 index 967b5feda..000000000 --- a/daemon/boardd/session.hpp +++ /dev/null @@ -1,55 +0,0 @@ -// Copyright (c) 2015, Robert Wang -// The MIT License. -#ifndef _BOARDD_SESSION_HPP -# define _BOARDD_SESSION_HPP -#include -extern "C" { -#include -} - -typedef int (*LineFunc)(struct evbuffer *output, void *ctx, char *line); - -template -class RefCounted { - public: - virtual ~RefCounted() {} - void add_ref() { - std::lock_guard _(mutex_); - ++count_; - } - void dec_ref() { - int c; - { - std::lock_guard _(mutex_); - c = --count_; - } - if (c == 0) - delete this; - } - private: - std::mutex mutex_; - int count_ = 0; -}; - -class Session : public RefCounted { - public: - Session(LineFunc process_line, - struct event_base *base, evutil_socket_t fd, - struct sockaddr *address, int socklen); - virtual ~Session(); - - void on_error(short what); - void on_read(); - void shutdown(); - - void process_line(void *ctx, char *line); - - private: - LineFunc process_line_; - bufferevent *bev_; - std::mutex mutex_; - - void send_and_resume(evbuffer *buf); -}; - -#endif diff --git a/daemon/boardd/threadpool.hpp b/daemon/boardd/threadpool.hpp deleted file mode 100644 index c08be2f25..000000000 --- a/daemon/boardd/threadpool.hpp +++ /dev/null @@ -1,52 +0,0 @@ -// Copyright (c) 2014, Robert Wang -// The MIT License. -#ifndef _THREADPOOL_HPP -# define _THREADPOOL_HPP -#include -#include "queue.hpp" - -namespace { - -template -void WorkerThread(Queue *q) -{ - while (true) { - Job *job; - if (!q->Pop(job)) - return; - job->Run(); - delete job; - } -} - -} // namespace - -template -class ThreadPool -{ -public: - ThreadPool(size_t num) - : queue_(num * 4) - { - for (size_t i = 0; i < num; ++i) - threads_.push_back(std::move(std::thread(WorkerThread, &queue_))); - } - - ~ThreadPool() - { - queue_.Close(); - for (auto &t : threads_) - t.join(); - } - - void Do(Job *job) - { - queue_.Push(job); - } - -private: - Queue queue_; - std::vector threads_; -}; - -#endif