Skip to content

Commit

Permalink
KQueue support for M:N threads
Browse files Browse the repository at this point in the history
* Allows macOS users to use M:N threads (and technically FreeBSD, though it has not been verified on FreeBSD)

* Include sys/event.h header check for macros, and include sys/event.h when present

* Rename epoll_fd to more generic kq_fd (Kernel event Queue) for use by both epoll and kqueue

* MAP_STACK is not available on macOS so conditionall apply it to mmap flags

* Set fd to close on exec

* Log debug messages specific to kqueue and epoll on creation

* close_invalidate raises an error for the kqueue fd on child process fork. It's unclear rn if that's a bug, or if it's kqueue specific behavior

Use kq with rb_thread_wait_for_single_fd

* Only platforms with `USE_POLL` (linux) had changes applied to take advantage of kernel event queues. It needed to be applied to the `select` so that kqueue could be properly applied

* Clean up kqueue specific code and make sure only flags that were actually set are removed (or an error is raised)

* Also handle kevent specific errnos, since most don't apply from epoll to kqueue

* Use the more platform standard close-on-exec approach of `fcntl` and `FD_CLOEXEC`. The io-event gem uses `ioctl`, but fcntl seems to be the recommended choice. It is also what Go, Bun, and Libuv use

* We're making changes in this file anyways - may as well fix a couple spelling mistakes while here

Make sure FD_CLOEXEC carries over in dup

* Otherwise the kqueue descriptor should have FD_CLOEXEC, but doesn't and fails in assert_close_on_exec
  • Loading branch information
jpcamara authored and ko1 committed Dec 20, 2023
1 parent 7ef90b3 commit 8782e02
Show file tree
Hide file tree
Showing 5 changed files with 250 additions and 42 deletions.
1 change: 1 addition & 0 deletions configure.ac
Original file line number Diff line number Diff line change
Expand Up @@ -1352,6 +1352,7 @@ AC_CHECK_HEADERS(time.h)
AC_CHECK_HEADERS(ucontext.h)
AC_CHECK_HEADERS(utime.h)
AC_CHECK_HEADERS(sys/epoll.h)
AC_CHECK_HEADERS(sys/event.h)

AS_CASE("$target_cpu", [x64|x86_64|i[3-6]86*], [
AC_CHECK_HEADERS(x86intrin.h)
Expand Down
7 changes: 7 additions & 0 deletions process.c
Original file line number Diff line number Diff line change
Expand Up @@ -3354,6 +3354,13 @@ run_exec_dup2(VALUE ary, VALUE tmpbuf, struct rb_execarg *sargp, char *errmsg, s
ERRMSG("dup");
goto fail;
}
// without this, kqueue timer_th.event_fd fails with a reserved FD did not have close-on-exec
// in #assert_close_on_exec because the FD_CLOEXEC is not dup'd by default
if (fd_get_cloexec(pairs[i].oldfd, errmsg, errmsg_buflen)) {
if (fd_set_cloexec(extra_fd, errmsg, errmsg_buflen)) {
goto fail;
}
}
rb_update_max_fd(extra_fd);
}
else {
Expand Down
44 changes: 32 additions & 12 deletions thread.c
Original file line number Diff line number Diff line change
Expand Up @@ -4265,6 +4265,27 @@ rb_thread_fd_select(int max, rb_fdset_t * read, rb_fdset_t * write, rb_fdset_t *
return (int)rb_ensure(do_select, (VALUE)&set, select_set_free, (VALUE)&set);
}

#ifdef RUBY_THREAD_PTHREAD_H

static bool
thread_sched_wait_events_timeval(int fd, int events, struct timeval *timeout)
{
rb_thread_t *th = GET_THREAD();
rb_hrtime_t rel, *prel;

if (timeout) {
rel = rb_timeval2hrtime(timeout);
prel = &rel;
}
else {
prel = NULL;
}

return thread_sched_wait_events(TH_SCHED(th), th, fd, waitfd_to_waiting_flag(events), prel);
}

#endif

#ifdef USE_POLL

/* The same with linux kernel. TODO: make platform independent definition. */
Expand Down Expand Up @@ -4294,18 +4315,8 @@ rb_thread_wait_for_single_fd(int fd, int events, struct timeval *timeout)
wfd.busy = NULL;

#ifdef RUBY_THREAD_PTHREAD_H
if (!th->nt->dedicated) {
rb_hrtime_t rel, *prel;

if (timeout) {
rel = rb_timeval2hrtime(timeout);
prel = &rel;
}
else {
prel = NULL;
}

if (thread_sched_wait_events(TH_SCHED(th), th, fd, waitfd_to_waiting_flag(events), prel)) {
if (!th_has_dedicated_nt(th)) {
if (thread_sched_wait_events_timeval(fd, events, timeout)) {
return 0; // timeout
}
}
Expand Down Expand Up @@ -4445,6 +4456,15 @@ rb_thread_wait_for_single_fd(int fd, int events, struct timeval *timeout)
int r;
VALUE ptr = (VALUE)&args;

#ifdef RUBY_THREAD_PTHREAD_H
rb_thread_t *th = GET_THREAD();
if (!th_has_dedicated_nt(th)) {
if (thread_sched_wait_events_timeval(fd, events, timeout)) {
return 0; // timeout
}
}
#endif

args.as.fd = fd;
args.read = (events & RB_WAITFD_IN) ? init_set_fd(fd, &rfds) : NULL;
args.write = (events & RB_WAITFD_OUT) ? init_set_fd(fd, &wfds) : NULL;
Expand Down
24 changes: 18 additions & 6 deletions thread_pthread.c
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ static const void *const condattr_monotonic = NULL;

#include COROUTINE_H

#ifndef HAVE_SYS_EVENT_H
#define HAVE_SYS_EVENT_H 0
#endif

#ifndef HAVE_SYS_EPOLL_H
#define HAVE_SYS_EPOLL_H 0
#else
Expand All @@ -78,6 +82,9 @@ static const void *const condattr_monotonic = NULL;
#elif HAVE_SYS_EPOLL_H
#include <sys/epoll.h>
#define USE_MN_THREADS 1
#elif HAVE_SYS_EVENT_H
#include <sys/event.h>
#define USE_MN_THREADS 1
#else
#define USE_MN_THREADS 0
#endif
Expand Down Expand Up @@ -2799,10 +2806,15 @@ static struct {

int comm_fds[2]; // r, w

#if (HAVE_SYS_EPOLL_H || HAVE_SYS_EVENT_H) && USE_MN_THREADS
int event_fd; // kernel event queue fd (epoll/kqueue)
#endif
#if HAVE_SYS_EPOLL_H && USE_MN_THREADS
#define EPOLL_EVENTS_MAX 0x10
int epoll_fd;
struct epoll_event finished_events[EPOLL_EVENTS_MAX];
#elif HAVE_SYS_EVENT_H && USE_MN_THREADS
#define KQUEUE_EVENTS_MAX 0x10
struct kevent finished_events[KQUEUE_EVENTS_MAX];
#endif

// waiting threads list
Expand Down Expand Up @@ -3088,7 +3100,7 @@ rb_thread_create_timer_thread(void)

CLOSE_INVALIDATE_PAIR(timer_th.comm_fds);
#if HAVE_SYS_EPOLL_H && USE_MN_THREADS
close_invalidate(&timer_th.epoll_fd, "close epoll_fd");
close_invalidate(&timer_th.event_fd, "close event_fd");
#endif
rb_native_mutex_destroy(&timer_th.waiting_lock);
}
Expand All @@ -3099,8 +3111,8 @@ rb_thread_create_timer_thread(void)
// open communication channel
setup_communication_pipe_internal(timer_th.comm_fds);

// open epoll fd
timer_thread_setup_nm();
// open event fd
timer_thread_setup_mn();
}

pthread_create(&timer_th.pthread_id, NULL, timer_thread_func, GET_VM());
Expand Down Expand Up @@ -3181,8 +3193,8 @@ rb_reserved_fd_p(int fd)

if (fd == timer_th.comm_fds[0] ||
fd == timer_th.comm_fds[1]
#if HAVE_SYS_EPOLL_H && USE_MN_THREADS
|| fd == timer_th.epoll_fd
#if (HAVE_SYS_EPOLL_H || HAVE_SYS_EVENT_H) && USE_MN_THREADS
|| fd == timer_th.event_fd
#endif
) {
goto check_fork_gen;
Expand Down
Loading

0 comments on commit 8782e02

Please sign in to comment.