Skip to content

Commit

Permalink
example http complete
Browse files Browse the repository at this point in the history
  • Loading branch information
shaovie committed Aug 29, 2023
1 parent ca65432 commit 171c438
Show file tree
Hide file tree
Showing 11 changed files with 256 additions and 10 deletions.
65 changes: 60 additions & 5 deletions example/http.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,21 @@
#include "../src/io_handle.h"
#include "../src/acceptor.h"
#include "../src/options.h"
#include "../src/poll_sync_opt.h"

#include <string>
#include <signal.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <sys/time.h>
#include <thread>

reactor *conn_reactor = nullptr;
const char httpheaders[] = "HTTP/1.1 200 OK\r\nConnection: keep-alive\r\nServer: goev\r\nContent-Type: text/plain\r\nContent-Length: 13\r\n\r\nHello, World!";
const char httpheaders1[] = "HTTP/1.1 200 OK\r\nConnection: keep-alive\r\nServer: goev\r\nContent-Type: text/plain\r\nDate: ";
const char httpheaders2[] = "\r\nContent-Length: 13\r\n\r\nHello, World!";

const int pcache_data_t = 1;

class http : public io_handle {
public:
Expand All @@ -34,9 +41,20 @@ class http : public io_handle {
return false;
}

int buf_len = sizeof(httpheaders)-1;
::memcpy(buf, httpheaders, buf_len);
this->send(buf, buf_len);
int writen = 0;
::memcpy(buf, httpheaders1, sizeof(httpheaders1)-1);
writen += sizeof(httpheaders1)-1;

char *date = (char *)this->poll_cache_get(pcache_data_t);
ret = ::strlen(date);
::memcpy(buf + writen, date, ret);
writen += ret;

ret = sizeof(httpheaders2)-1;
::memcpy(buf + writen, httpheaders2, ret);
writen += ret;

this->send(buf, writen);
return true;
}
virtual void on_close() {
Expand All @@ -46,17 +64,54 @@ class http : public io_handle {
ev_handler *gen_http() {
return new http();
}
void release_dates(void *p) {
delete[] (char *)p;
}
void sync_date(bool init) {
struct timeval now;
gettimeofday(&now, NULL);
struct tm tmv;
::localtime_r(&(now.tv_sec), &tmv);
char dates[32] = {0};
::strftime(dates, 32, "%a, %d %b %Y %H:%M:%S GMT", &tmv);
void **args = new void *[conn_reactor->get_poller_num()];
for (int i = 0; i < conn_reactor->get_poller_num(); ++i) {
char *ds = new char[32]{0};
::strcpy(ds, dates);
poll_sync_opt::sync_cache *arg = new poll_sync_opt::sync_cache();
arg->id = pcache_data_t;
arg->value = ds;
arg->free_func = release_dates;
args[i] = arg;
}
if (init)
conn_reactor->init_poll_sync_opt(poll_sync_opt::sync_cache_t, args);
else
conn_reactor->poll_sync_opt(poll_sync_opt::sync_cache_t, args);
}
void sync_date_timing() {
while (true) {
std::this_thread::sleep_for(std::chrono::seconds(1));
sync_date(false);
}
}
int main (int argc, char *argv[]) {
options opt;
opt.set_cpu_affinity = true;
opt.poller_num *= 2;
if (argc > 1)
opt.poller_num = atoi(argv[1]);

signal(SIGPIPE ,SIG_IGN);

conn_reactor = new reactor();
if (conn_reactor->open(opt) != 0)
::exit(1);

sync_date(true);

std::thread thr(sync_date_timing);
thr.detach();

opt.reuse_addr = true;
acceptor acc(conn_reactor, gen_http);
if (acc.open(":8080", opt) != 0)
Expand Down
1 change: 1 addition & 0 deletions makefile
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
timer_qheap.cpp \
poller.cpp \
poll_desc.cpp \
poll_sync_opt.cpp \
acceptor.cpp \
connector.cpp \
io_handle.cpp \
Expand Down
1 change: 1 addition & 0 deletions src/ev_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ class ev_handler
friend class connector;
friend class async_send;
friend class timer_qheap;
friend class poll_sync_opt;
public:
enum {
ev_read = EPOLLIN | EPOLLRDHUP,
Expand Down
3 changes: 3 additions & 0 deletions src/io_handle.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
char *io_handle::io_buf() {
return this->poll->io_buf;
}
void *io_handle::poll_cache_get(const int id) {
return this->poll->poll_cache_get(id);;
}
int io_handle::recv(char* &buff) {
if (this->fd == -1)
return -1;
Expand Down
1 change: 1 addition & 0 deletions src/io_handle.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class io_handle : public ev_handler {
int recv(char* &buff);
int send(const char *buff, const int len);
char *io_buf();
void *poll_cache_get(const int id);

virtual bool on_write();

Expand Down
75 changes: 75 additions & 0 deletions src/poll_sync_opt.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
#include "poll_sync_opt.h"
#include "poller.h"

#include <sys/eventfd.h>

poll_sync_opt::~poll_sync_opt() {
if (this->efd != -1)
::close(this->efd);

if (this->readq != nullptr)
delete this->readq;
if (this->writeq != nullptr)
delete this->writeq;
}
void poll_sync_opt::init(poll_sync_opt::opt_arg &&arg) {
this->do_sync(arg);
}
int poll_sync_opt::open(poller *poll) {
int fd = ::eventfd(0, EFD_NONBLOCK|EFD_CLOEXEC);
if (fd == -1) {
fprintf(stderr, "reactor: create eventfd fail! %s\n", strerror(errno));
return -1;
}
if (poll->add(this, fd, ev_handler::ev_read) == -1) {
fprintf(stderr, "reactor: add eventfd to poller fail! %s\n", strerror(errno));
::close(fd);
return -1;
}
this->efd = fd;
return 0;
}
void poll_sync_opt::push(poll_sync_opt::opt_arg &&arg) {
this->mtx.lock();
this->writeq->push_back(std::move(arg));
this->mtx.unlock();

int expected = 0;
if (!this->notified.compare_exchange_strong(expected, 1))
return ;
int64_t v = 1;
int ret = 0;
do {
ret = ::write(this->efd, (void *)&v, sizeof(v));
} while (ret == -1 && errno == EINTR);
}
void poll_sync_opt::do_sync(const poll_sync_opt::opt_arg &arg) {
if (arg.type == poll_sync_opt::sync_cache_t) {
auto p = (poll_sync_opt::sync_cache *)arg.arg;
this->poll->poll_cache_set(p->id, p->value, p->free_func);
delete p;
}
}
bool poll_sync_opt::on_read() {
if (this->readq->empty()) {
this->mtx.lock();
std::swap(this->readq, this->writeq);
this->mtx.unlock();
}
for (auto i = 0; i < 8 && !this->readq->empty(); ++i) {
poll_sync_opt::opt_arg &arg = this->readq->front();
this->do_sync(arg);
this->readq->pop_front();
}
if (!this->readq->empty()) // Ignore readable eventfd, continue
return true;

int64_t v = 0;
int ret = 0;
do {
ret = ::read(this->efd, (void *)&v, sizeof(v));
} while (ret == -1 && errno == EINTR);

this->notified.store(0);
return true;
}
68 changes: 68 additions & 0 deletions src/poll_sync_opt.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
#ifndef POLL_SYNC_OPT_H_
#define POLL_SYNC_OPT_H_

#include "ev_handler.h"
#include "ringq.h"

#include <mutex>
#include <atomic>

// Forward declarations
class poller;

class poll_sync_opt : public ev_handler {
public:
enum {
sync_cache_t = 1,
};
class sync_cache {
public:
sync_cache() = default;
int id = 0;
void *value = nullptr;
void (*free_func)(void *) = nullptr; // 负责释放上次一的value
};

class opt_arg {
public:
opt_arg() = default;
opt_arg(const int t, void *a) : type(t), arg(a) { }
opt_arg& operator=(opt_arg &&v) {
this->type = v.type;
this->arg = v.arg;
return *this;
}
opt_arg& operator=(const opt_arg &v) {
this->type = v.type;
this->arg = v.arg;
return *this;
}

int type = 0;
void *arg = nullptr;
};

poll_sync_opt() = delete;
poll_sync_opt(const int init_size) {
this->readq = new ringq<poll_sync_opt::opt_arg>(init_size);
this->writeq = new ringq<poll_sync_opt::opt_arg>(init_size);
}
~poll_sync_opt();

int open(poller *);

virtual bool on_read();

void init(poll_sync_opt::opt_arg &&arg);
void push(poll_sync_opt::opt_arg &&arg);
private:
void do_sync(const poll_sync_opt::opt_arg &arg);

int efd = -1;
std::mutex mtx;
std::atomic<int> notified;
ringq<poll_sync_opt::opt_arg> *readq = nullptr;
ringq<poll_sync_opt::opt_arg> *writeq = nullptr;
};

#endif // POLL_SYNC_OPT_H_
12 changes: 11 additions & 1 deletion src/poller.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#include "poll_sync_opt.h"
#include "poller.h"
#include "options.h"
#include "ev_handler.h"

#include <errno.h>
#include <stdio.h>
Expand Down Expand Up @@ -61,8 +61,18 @@ int poller::open(const options &opt) {
if (this->async_sendq->open(this) != 0)
return -1;

this->poll_sync_opterate = new poll_sync_opt(8);
if (this->poll_sync_opterate->open(this) != 0)
return -1;

return 0;
}
void poller::init_poll_sync_opt(const int t, void *arg) {
this->poll_sync_opterate->init(poll_sync_opt::opt_arg(t, arg));
}
void poller::do_poll_sync_opt(const int t, void *arg) {
this->poll_sync_opterate->push(poll_sync_opt::opt_arg(t, arg));
}
int poller::add(ev_handler *eh, const int fd, const uint32_t ev) {
eh->set_poller(this);
int64_t seq = this->seq++;
Expand Down
22 changes: 21 additions & 1 deletion src/poller.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,21 @@
#include <pthread.h>
#include <cstdint>
#include <atomic>
#include <map>

// Forward declarations
class options;
class ev_handler;
class timer_qheap;
class poll_desc_map;
class poll_sync_opt;
struct epoll_event;
class item;

class poller {
friend class reactor;
friend class io_handle;
friend class async_send;
friend class poll_sync_opt;
public:
poller() = default;

Expand All @@ -42,6 +44,22 @@ class poller {
inline void push(async_send::item &&asi) { this->async_sendq->push(std::move(asi)); }
poll_desc *get_poll_desc(const int fd) { return this->poll_descs->load(fd); }

private:
void init_poll_sync_opt(const int t, void *arg);
void do_poll_sync_opt(const int t, void *arg);
void poll_cache_set(const int id, void *val, void (*free_func)(void *)) {
auto itor = this->pcache.find(id);
if (itor != this->pcache.end())
free_func(itor->second);
this->pcache[id] = val;
}
void *poll_cache_get(const int id) {
auto itor = this->pcache.find(id);
if (itor != this->pcache.end())
return itor->second;
return nullptr;
}
private:
void set_cpu_id(const int id) { this->cpu_id = id; }
void set_cpu_affinity();
void destroy();
Expand All @@ -55,8 +73,10 @@ class poller {
timer_qheap *timer = nullptr;
async_send *async_sendq = nullptr;
poll_desc_map *poll_descs = nullptr;
poll_sync_opt *poll_sync_opterate = nullptr;
pthread_t thread_id;
std::atomic<int64_t> seq;
std::map<int, void *> pcache;
};

#endif // POLLER_H_
8 changes: 8 additions & 0 deletions src/reactor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,11 @@ void reactor::run(const bool join) {
}
delete[] threads;
}
void reactor::init_poll_sync_opt(const int t, void* args[]) {
for (int i = 0; i < this->poller_num; ++i)
this->pollers[i].init_poll_sync_opt(t, args[i]);
}
void reactor::poll_sync_opt(const int t, void* args[]) {
for (int i = 0; i < this->poller_num; ++i)
this->pollers[i].do_poll_sync_opt(t, args[i]);
}
Loading

0 comments on commit 171c438

Please sign in to comment.