Project

General

Profile

Feature #13618 » 0001-auto-fiber-schedule-for-rb_wait_for_single_fd-and-rb.patch

normalperson (Eric Wong), 06/01/2017 12:14 AM

View differences:

common.mk
thread.$(OBJEXT): {$(VPATH)}intern.h
thread.$(OBJEXT): {$(VPATH)}internal.h
thread.$(OBJEXT): {$(VPATH)}io.h
thread.$(OBJEXT): {$(VPATH)}iom.h
thread.$(OBJEXT): {$(VPATH)}iom_internal.h
thread.$(OBJEXT): {$(VPATH)}iom_common.h
thread.$(OBJEXT): {$(VPATH)}iom_epoll.h
thread.$(OBJEXT): {$(VPATH)}iom_kqueue.h
thread.$(OBJEXT): {$(VPATH)}iom_pingable_common.h
thread.$(OBJEXT): {$(VPATH)}iom_select.h
thread.$(OBJEXT): {$(VPATH)}method.h
thread.$(OBJEXT): {$(VPATH)}missing.h
thread.$(OBJEXT): {$(VPATH)}node.h
configure.in
AC_CHECK_HEADERS(pwd.h)
AC_CHECK_HEADERS(setjmpex.h)
AC_CHECK_HEADERS(sys/attr.h)
AC_CHECK_HEADERS(sys/epoll.h)
AC_CHECK_HEADERS(sys/event.h)
AC_CHECK_HEADERS(sys/fcntl.h)
AC_CHECK_HEADERS(sys/file.h)
AC_CHECK_HEADERS(sys/id.h)
......
AC_CHECK_FUNCS(dup)
AC_CHECK_FUNCS(dup3)
AC_CHECK_FUNCS(eaccess)
AC_CHECK_FUNCS(epoll_create)
AC_CHECK_FUNCS(epoll_create1)
AC_CHECK_FUNCS(epoll_ctl)
AC_CHECK_FUNCS(epoll_wait)
AC_CHECK_FUNCS(endgrent)
AC_CHECK_FUNCS(fchmod)
AC_CHECK_FUNCS(fchown)
......
AC_CHECK_FUNCS(ioctl)
AC_CHECK_FUNCS(isfinite)
AC_CHECK_FUNCS(issetugid)
AC_CHECK_FUNCS(kevent)
AC_CHECK_FUNCS(killpg)
AC_CHECK_FUNCS(kqueue)
AC_CHECK_FUNCS(lchmod)
AC_CHECK_FUNCS(lchown)
AC_CHECK_FUNCS(link)
......
AS_IF([test x$with_valgrind != xno],
[AC_CHECK_HEADERS(valgrind/memcheck.h)])
AC_DEFINE_UNQUOTED(IOM_SELECT, 0)
AC_DEFINE_UNQUOTED(IOM_KQUEUE, 1)
AC_DEFINE_UNQUOTED(IOM_EPOLL, 2)
iom_default=select
AS_CASE([$ac_cv_func_kqueue:$ac_cv_func_kevent:$ac_cv_header_sys_event_h],
[yes:yes:yes], [iom_default=kqueue],
[*],
[AS_CASE(
[$ac_cv_func_epoll_wait:$ac_cv_func_epoll_create:$ac_cv_header_sys_epoll_h],
[yes:yes:yes], [iom_default=epoll])]
)
AC_ARG_WITH(iom,
AS_HELP_STRING([--with-iom=XXXXX],
[I/O manager (select|kqueue|epoll)]),
[with_iom="$withval"], [with_iom="$iom_default"])
AS_CASE(["$with_iom"],
[select], [AC_DEFINE_UNQUOTED(RUBYVM_IOM, IOM_SELECT)],
[kqueue], [AC_DEFINE_UNQUOTED(RUBYVM_IOM, IOM_KQUEUE)],
[epoll], [AC_DEFINE_UNQUOTED(RUBYVM_IOM, IOM_EPOLL)],
[AC_MSG_ERROR(unknown I/O manager: $with_iom)])
dln_a_out_works=no
if test "$ac_cv_header_a_out_h" = yes; then
if test "$with_dln_a_out" = yes || test "$rb_cv_dlopen" = unknown; then
......
config_summary "man page type" "$MANTYPE"
config_summary "search path" "$search_path"
config_summary "static-linked-ext" ${EXTSTATIC:+"yes"}
config_summary "I/O manager" ${with_iom}
echo ""
echo "---"
cont.c
#include "vm_core.h"
#include "gc.h"
#include "eval_intern.h"
#include "iom.h"
/* FIBER_USE_NATIVE enables Fiber performance improvement using system
* dependent method such as make/setcontext on POSIX system or
......
* You shouldn't mix "transfer" and "resume".
*/
int transferred;
unsigned int auto_fiber:1;
#if FIBER_USE_NATIVE
#ifdef _WIN32
......
fiber_switch(return_fiber(), 1, &value, 0);
}
VALUE
rb_fiber_resume(VALUE fibval, int argc, const VALUE *argv)
int
rb_fiber_resumable_p(const rb_thread_t *th, const rb_fiber_t *fib)
{
rb_fiber_t *fib;
GetFiberPtr(fibval, fib);
return th->root_fiber != fib && !fib->prev;
}
static void
fiber_check_resume(const rb_fiber_t *fib)
{
if (fib->prev != 0 || fib->cont.type == ROOT_FIBER_CONTEXT) {
rb_raise(rb_eFiberError, "double resume");
}
if (fib->transferred != 0) {
rb_raise(rb_eFiberError, "cannot resume transferred Fiber");
}
}
VALUE
rb_fiber_resume(VALUE fibval, int argc, const VALUE *argv)
{
rb_fiber_t *fib;
GetFiberPtr(fibval, fib);
fiber_check_resume(fib);
return fiber_switch(fib, argc, argv, 1);
}
......
return rb_fiber_current();
}
/* Returns true if auto-fiber is enabled for current fiber */
int
rb_fiber_auto_sched_p(const rb_thread_t *th)
{
const rb_fiber_t *cur = th->fiber;
return (cur && cur->auto_fiber && th->root_fiber != cur);
}
/*
* Enable auto-scheduling for the Fiber and resume it
*/
static VALUE
rb_fiber_auto_start(int argc, VALUE *argv, VALUE self)
{
rb_thread_t *th = GET_THREAD();
rb_fiber_t *fib;
GetFiberPtr(self, fib);
if (th->root_fiber == fib) {
rb_raise(rb_eFiberError, "Root fiber cannot #start");
}
if (fib->auto_fiber) {
rb_raise(rb_eFiberError, "Fiber already started");
}
fib->auto_fiber = 1;
fiber_check_resume(fib);
return fiber_switch(fib, argc, argv, 1);
}
rb_thread_t *
rb_fiber_owner_thread(VALUE self)
{
rb_fiber_t *fib;
rb_thread_t *th;
GetFiberPtr(self, fib);
GetThreadPtr(fib->cont.saved_thread.self, th);
return th;
}
static void
fiber_auto_join(rb_fiber_t *fib, double *timeout)
{
rb_thread_t *th = GET_THREAD();
rb_fiber_t *cur = fiber_current();
if (cur == fib) {
rb_raise(rb_eFiberError, "Target fiber must not be current fiber");
}
if (th->root_fiber == fib) {
rb_raise(rb_eFiberError, "Target fiber must not be root fiber");
}
if (fib->cont.saved_thread.self != th->self) {
rb_raise(rb_eFiberError, "Target fiber not owned by current thread");
}
if (!fib->auto_fiber) {
rb_raise(rb_eFiberError, "Target fiber is not an auto-fiber");
}
while (fib->status != TERMINATED && (timeout == 0 || *timeout >= 0.0)) {
rb_iom_schedule(th, timeout);
}
}
static VALUE
rb_fiber_auto_join(int argc, VALUE *argv, VALUE self)
{
rb_fiber_t *fib;
double timeout, *t;
VALUE limit;
GetFiberPtr(self, fib);
rb_scan_args(argc, argv, "01", &limit);
if (NIL_P(limit)) {
t = 0;
} else {
timeout = rb_num2dbl(limit);
t = &timeout;
}
fiber_auto_join(fib, t);
return fib->status == TERMINATED ? fib->cont.self : Qnil;
}
static VALUE
rb_fiber_auto_value(VALUE self)
{
rb_fiber_t *fib;
GetFiberPtr(self, fib);
fiber_auto_join(fib, 0);
return fib->cont.value;
}
/*
* Document-class: FiberError
......
rb_define_singleton_method(rb_cFiber, "yield", rb_fiber_s_yield, -1);
rb_define_method(rb_cFiber, "initialize", rb_fiber_init, 0);
rb_define_method(rb_cFiber, "resume", rb_fiber_m_resume, -1);
rb_define_method(rb_cFiber, "start", rb_fiber_auto_start, -1);
rb_define_method(rb_cFiber, "join", rb_fiber_auto_join, -1);
rb_define_method(rb_cFiber, "value", rb_fiber_auto_value, 0);
}
RUBY_SYMBOL_EXPORT_BEGIN
include/ruby/io.h
/* #define FMODE_UNIX 0x00200000 */
/* #define FMODE_INET 0x00400000 */
/* #define FMODE_INET6 0x00800000 */
/* #define FMODE_IOM_PRIVATE1 0x01000000 */ /* OS-dependent */
/* #define FMODE_IOM_PRIVATE2 0x02000000 */ /* OS-dependent */
#define GetOpenFile(obj,fp) rb_io_check_closed((fp) = RFILE(rb_io_taint_check(obj))->fptr)
iom.h
/*
* iom -> I/O Manager for RubyVM (auto-Fiber-aware)
*
* On platforms with epoll or kqueue, this should be ready for multicore;
* even if the rest of the RubyVM is not.
*
* Some inspiration taken from Mio in GHC:
* http://haskell.cs.yale.edu/wp-content/uploads/2013/08/hask035-voellmy.pdf
*/
#ifndef RUBY_IOM_H
#define RUBY_IOM_H
#include "ruby.h"
#include "ruby/io.h"
#include "ruby/intern.h"
#include "vm_core.h"
typedef struct rb_iom_struct rb_iom_t;
/* WARNING: unstable API, only for Ruby internal use */
/*
* Note: the first "rb_thread_t *" is a placeholder and may be replaced
* with "rb_execution_context_t *" in the future.
*/
/*
* All functions with "wait" in it take an optional double * +timeout+
* argument specifying the timeout in seconds. If NULL, it can wait
* forever until the event happens (or the fiber is explicitly resumed).
*
* (maybe) TODO: If non-NULL, the timeout will be updated to the
* remaining time upon returning. Not sure if useful, could just be
* a a waste of cycles; so not implemented, yet.
*/
/*
* Relinquish calling fiber while waiting for +events+ on the given
* +rb_io_t+
*
* Multiple native threads can enter this function at the same time.
*
* Events are RB_WAITFD_IN, RB_WAITFD_OUT, RB_WAITFD_PRI
*
* Returns a mask of events.
*/
int rb_iom_waitio(rb_thread_t *, rb_io_t *, int events, double *timeout);
/*
* Identical to rb_iom_waitio, but takes a pointer to an integer file
* descriptor, instead of rb_io_t. Use rb_iom_waitio when possible,
* since it allows us to optimize epoll (and perhaps avoid kqueue
* portability bugs across different *BSDs).
*/
int rb_iom_waitfd(rb_thread_t *, int *fdp, int events, double *timeout);
/*
* Relinquish calling fiber to wait for the given PID to change status.
* Multiple native threads can enter this function at the same time.
* If timeout is negative, wait forever.
*/
rb_pid_t rb_iom_waitpid(rb_thread_t *,
rb_pid_t, int *status, int options, double *timeout);
/*
* Relinquish calling fiber for at least the duration of given timeout
* in seconds. If timeout is negative, wait forever (until explicitly
* resumed).
* Multiple native threads can enter this function at the same time.
*/
void rb_iom_sleep(rb_thread_t *, double *timeout);
/* callback for SIGCHLD, needed to implemented for rb_iom_waitpid */
void rb_iom_sigchld(rb_vm_t *);
/*
* there is no public create function, creation is lazy to avoid incurring
* overhead for small scripts which do not need fibers, we only need this
* at VM destruction
*/
void rb_iom_destroy(rb_vm_t *);
/*
* schedule
*/
void rb_iom_schedule(rb_thread_t *th, double *timeout);
/* cont.c */
int rb_fiber_auto_sched_p(const rb_thread_t *);
rb_thread_t *rb_fiber_owner_thread(VALUE);
#endif /* RUBY_IOM_H */
iom_common.h
/* included by iom_(epoll|select|kqueue).h */
/* we lazily create this, small scripts may never need iom */
static rb_iom_t *
rb_iom_new(rb_thread_t *th)
{
rb_iom_t *iom = ALLOC(rb_iom_t);
rb_iom_init(iom);
return iom;
}
static rb_iom_t *
rb_iom_get(rb_thread_t *th)
{
VM_ASSERT(th && th->vm);
if (!th->vm->iom) {
th->vm->iom = rb_iom_new(th);
}
return th->vm->iom;
}
/* check for expired timers */
static void
rb_iom_timer_check(const rb_thread_t *th)
{
rb_iom_t *iom = th->vm->iom;
if (iom) {
struct rb_iom_timer *t = 0, *next = 0;
double now = timeofday();
list_for_each_safe(&iom->timers, t, next, n.tnode) {
if (t->expires_at <= now) {
struct rb_iom_waiter *w = rb_iom_waiter_of(t);
VALUE fibval = rb_iom_timer_fibval(t);
if (w) {
list_del_init(&w->wnode);
}
list_del_init(&t->n.tnode);
/* non-auto-fibers may set timer in rb_iom_schedule */
if (fibval != Qfalse) {
rb_thread_t *owner = rb_fiber_owner_thread(fibval);
list_add_tail(&owner->afrunq, &t->n.rnode);
}
}
return; /* done, timers is a sorted list */
}
}
}
/* insert a new +timer+ into +timers+, maintain sort order by expires_at */
static void
rb_iom_timer_add(rb_thread_t *th, struct rb_iom_timer *add,
const double *timeout, int flags)
{
add->_fibval = flags & IOM_FIB ? rb_fiber_current() : Qfalse;
add->_fibval |= flags & IOM_WAIT ? 0 : IOM_FIBMASK;
rb_iom_timer_check(th);
if (timeout) {
rb_iom_t *iom = rb_iom_get(th);
struct rb_iom_timer *i = 0;
add->expires_at = timeofday() + *timeout;
/*
* search backwards: assume typical projects have multiple objects
* sharing the same timeout values, so new timers will expire later
* than existing timers
*/
list_for_each_rev(&iom->timers, i, n.tnode) {
if (add->expires_at > i->expires_at) {
list_add_after(&iom->timers, &i->n.tnode, &add->n.tnode);
return;
}
}
list_add(&iom->timers, &add->n.tnode);
}
else {
/* not active, just allow list_del to function */
list_node_init(&add->n.tnode);
}
}
/* max == -1 : wake all */
static void
rb_iom_blockers_notify(rb_iom_t *iom, int max)
{
struct rb_iom_blocker *b = 0, *next = 0;
list_for_each_safe(&iom->blockers, b, next, bnode) {
list_del_init(&b->bnode);
ubf_select(b->th);
if (--max == 0) {
break;
}
}
}
/*
* TODO: consider EVFILT_PROC for kqueue and netlink+epoll on Linux;
* see the "god" RubyGem for usage examples.
* However, I doubt rb_waitpid scalability will be a problem and
* the simplicity of a single implementation for all is appealing.
*/
#ifdef HAVE_SYS_TYPES_H
# include <sys/types.h>
#endif
#ifdef HAVE_SYS_WAIT_H
# include <sys/wait.h>
#endif
#if defined(WNOHANG) && WNOHANG != 0 && \
(defined(HAVE_WAITPID) || defined(HAVE_WAIT4))
static VALUE
iom_schedule_pid(VALUE ptr)
{
struct rb_iom_pid_waiter *pw = (struct rb_iom_pid_waiter *)ptr;
rb_thread_t *th = pw->th;
rb_fiber_auto_do_yield_p(th);
RUBY_VM_CHECK_INTS_BLOCKING(th);
return rb_fiber_yield(0, 0);
}
rb_pid_t
rb_iom_waitpid(rb_thread_t *th, rb_pid_t pid, int *status, int options,
double *timeout)
{
struct rb_iom_pid_waiter pw;
pw.options = options;
VM_ASSERT((options & WNOHANG) == 0 &&
"WNOHANG should be handled in rb_waitpid");
/*
* unlike rb_iom_waitfd, we typically call *waitpid before
* trying with a non-blocking operation
*/
pw.pid = rb_waitpid(pid, &pw.status, pw.options | WNOHANG);
if (pw.pid == 0) {
rb_iom_t *iom = rb_iom_get(th);
pw.th = th;
pw.pid = pid;
rb_iom_timer_add(th, &pw.w.timer, timeout, IOM_FIB|IOM_WAIT);
/* LIFO, to match Linux wait4() blocking behavior */
list_add(&iom->pids, &pw.w.wnode);
rb_ensure(iom_schedule_pid, (VALUE)&pw,
rb_iom_waiter_done, (VALUE)&pw.w);
if (pw.pid == -1) {
errno = pw.errnum;
}
}
if (status) {
*status = pw.status;
}
if (pw.pid > 0) {
rb_last_status_set(pw.status, pw.pid);
}
return pw.pid;
}
void
rb_iom_sigchld(rb_vm_t *vm)
{
rb_iom_t *iom = vm->iom;
if (iom) {
struct rb_iom_pid_waiter *pw = 0, *next = 0;
size_t nr = 0;
list_for_each_safe(&iom->pids, pw, next, w.wnode) {
pid_t r = rb_waitpid(pw->pid, &pw->status, pw->options | WNOHANG);
if (r == 0) {
continue;
}
if (r == -1) {
pw->errnum = errno;
}
nr++;
pw->pid = r;
rb_iom_waiter_ready(&pw->w);
}
if (nr) {
rb_iom_blockers_notify(iom, -1);
}
}
}
#else
rb_pid_t
rb_iom_waitpid(rb_thread_t *th, rb_pid_t pid, int *status, int options,
double *timeout)
{
rb_bug("Should not get here, WNOHANG not implemented");
}
#endif /* defined(WNOHANG) && (defined(HAVE_WAITPID) || defined(HAVE_WAIT4)) */
iom_epoll.h
/*
* Linux-only epoll-based implementation of I/O Manager for RubyVM
*
* Notes:
*
* TODO: epoll_wait only has millisecond resolution; if we need higher
* resolution we can use timerfd or ppoll on the epoll_fd itself.
*
* Inside the Linux kernel, select/poll/ppoll/epoll_wait all use the
* same notification callback (struct file_operations)->poll.
* Unlike with kqueue across different *BSDs; we do not need to worry
* about inconsistencies between these syscalls.
*
* See also notes in iom_kqueue.h
*/
#include "iom_internal.h"
#include <sys/epoll.h>
#include <math.h> /* round() */
#define FMODE_IOM_ADDED FMODE_IOM_PRIVATE1
/* allocated on heap (rb_vm_t.iom) */
struct rb_iom_struct {
/*
* Everything here is protected by GVL at this time,
* URCU lists (LGPL-2.1+) may be used in the future
*/
/* we NEVER need to scan epws, only insert + delete + empty check */
struct list_head epws; /* -epw.w.wnode, order agnostic */
struct list_head timers; /* -rb_iom_timer.n.tnode, sort by expire_at */
struct list_head pids; /* -rb_iom_pid_waiter.w.wnode, LIFO order */
struct rb_iom_fdmap fdmap; /* maps each FD to multiple epw */
int epoll_fd;
int maxevents; /* auto-increases */
struct list_head blockers; /* -rb_iom_blocker.bnode */
};
/*
* Not using rb_iom_fd_waiter here, since we never need to reread the
* FD on this implementation.
* Allocated on stack
*/
struct epw {
struct rb_iom_waiter w;
union {
struct list_node fdnode;
struct {
rb_thread_t *th;
struct rb_iom_fd *fdh;
} pre_ctl;
} as;
int fd; /* no need for "int *", here, we never reread */
short events; /* requested events, like poll(2) */
short revents; /* returned events, like poll(2) */
int *flags; /* &fptr->mode */
};
static void
increase_maxevents(rb_iom_t *iom, int retries)
{
/* 1024 is the RUBY_ALLOCV_LIMIT on such systems */
const int max_alloca = 1024 / sizeof(struct epoll_event);
const int max = max_alloca * 2;
if (retries) {
iom->maxevents *= retries;
if (iom->maxevents > max || iom->maxevents <= 0) {
iom->maxevents = max;
}
}
}
static int
double2msec(double sec)
{
/*
* clamp timeout to workaround a Linux <= 2.6.37 bug,
* see epoll_wait(2) manpage
*/
const int max_msec = 35 * 60 * 1000; /* floor(35.79 minutes) */
if (sec < 0) {
return -1;
}
else {
double msec = round(sec * 1000);
if (msec < (double)max_msec) {
int ret = (int)msec;
return ret < 0 ? 0 : ret;
}
return max_msec;
}
}
/* we can avoid branches when mapping RB_WAIT_* bits to EPOLL* bits */
STATIC_ASSERT(epbits_matches_waitfd_bits,
RB_WAITFD_IN == EPOLLIN && RB_WAITFD_OUT == EPOLLOUT &&
RB_WAITFD_PRI == EPOLLPRI);
/* what goes into epoll_ctl... */
static int
rb_events2ep(int events)
{
return EPOLLONESHOT | events;
}
/* ...what comes out of epoll_wait */
static short
rb_ep2revents(int revents)
{
return (short)(revents & (EPOLLIN|EPOLLOUT|EPOLLPRI));
}
/* lazily create epoll FD, since not everybody waits on I/O */
static int
iom_epfd(rb_iom_t *iom)
{
if (iom->epoll_fd < 0) {
#if defined(EPOLL_CLOEXEC) && defined(HAVE_EPOLL_CREATE1)
iom->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
if (iom->epoll_fd < 0) {
int err = errno;
if (rb_gc_for_fd(err)) {
iom->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
if (iom->epoll_fd < 0) {
rb_sys_fail("epoll_create1");
}
}
else if (err != ENOSYS) {
rb_syserr_fail(err, "epoll_create1");
}
else { /* libc >= kernel || build-env > run-env */
#endif /* HAVE_EPOLL_CREATE1 */
iom->epoll_fd = epoll_create(1);
if (iom->epoll_fd < 0) {
if (rb_gc_for_fd(errno)) {
iom->epoll_fd = epoll_create(1);
}
}
if (iom->epoll_fd < 0) {
rb_sys_fail("epoll_create");
}
rb_maygvl_fd_fix_cloexec(iom->epoll_fd);
#if defined(EPOLL_CLOEXEC) && defined(HAVE_EPOLL_CREATE1)
}
}
#endif /* HAVE_EPOLL_CREATE1 */
rb_update_max_fd(iom->epoll_fd);
}
return iom->epoll_fd;
}
static void
rb_iom_init(rb_iom_t *iom)
{
list_head_init(&iom->timers);
list_head_init(&iom->epws);
list_head_init(&iom->pids);
list_head_init(&iom->blockers);
iom->maxevents = 8;
iom->epoll_fd = -1;
rb_iom_fdmap_init(&iom->fdmap);
}
static void
check_epoll_wait(rb_thread_t *th, int nr, struct epoll_event *ev)
{
if (nr >= 0) {
int i;
for (i = 0; i < nr; i++) {
struct rb_iom_fd *fdh = ev[i].data.ptr;
struct epw *epw = 0, *next = 0;
short revents = rb_ep2revents(ev[i].events);
/*
* Typical list size is 1; only multiple fibers waiting
* on the same FD increases fdh list size
*/
list_for_each_safe(&fdh->fdhead, epw, next, as.fdnode) {
epw->revents = epw->events & revents;
list_del_init(&epw->as.fdnode);
rb_iom_waiter_ready(&epw->w);
}
}
/* notify the waiter thread in case we enqueued fibers for them */
if (nr > 0) {
rb_iom_blockers_notify(th->vm->iom, -1);
}
}
else {
int err = errno;
if (err != EINTR) {
rb_syserr_fail(err, "epoll_wait");
}
}
rb_iom_timer_check(th);
RUBY_VM_CHECK_INTS_BLOCKING(th);
}
/* perform a non-blocking epoll_wait while holding GVL */
static void
ping_events(rb_thread_t *th)
{
rb_iom_t *iom = th->vm->iom;
int epfd = iom ? iom->epoll_fd : -1;
if (epfd >= 0) {
VALUE v;
int nr;
int maxevents = iom->maxevents;
struct epoll_event *ev = ALLOCV_N(struct epoll_event, v, maxevents);
int retries = 0;
do {
nr = epoll_wait(epfd, ev, maxevents, 0);
check_epoll_wait(th, nr, ev);
} while (nr == maxevents && ++retries);
if (v) {
ALLOCV_END(v);
}
increase_maxevents(iom, retries);
}
}
/* for iom_pingable_common.h */
static void
rb_iom_do_wait(rb_thread_t *th, rb_iom_t *iom)
{
int maxevents = iom->maxevents;
int nr = maxevents;
double timeout;
RUBY_VM_CHECK_INTS_BLOCKING(th);
timeout = rb_iom_next_timeout(&iom->timers);
if (timeout != 0 && (!list_empty(&iom->epws) || !list_empty(&iom->pids))) {
VALUE v;
int epfd = iom_epfd(th->vm->iom); /* may raise */
struct epoll_event *ev = ALLOCV_N(struct epoll_event, v, maxevents);
int msec = double2msec(timeout);
struct rb_iom_blocker cur;
VM_ASSERT(epfd >= 0);
cur.th = th;
list_add_tail(&iom->blockers, &cur.bnode);
BLOCKING_REGION({
nr = epoll_wait(epfd, ev, maxevents, msec);
}, ubf_select, th, FALSE);
list_del(&cur.bnode);
check_epoll_wait(th, nr, ev);
if (v) {
ALLOCV_END(v);
}
}
if (nr == maxevents) { /* || timeout == 0 */
ping_events(th);
}
}
static void
epoll_ctl_or_raise(rb_thread_t *th, struct epw *epw)
{
int e;
int epfd;
struct epoll_event ev;
/* we cannot raise until list_add: */
{
struct rb_iom_fd *fdh = epw->as.pre_ctl.fdh;
ev.data.ptr = fdh;
ev.events = rb_events2ep(epw->events);
/*
* merge events from other threads/fibers waiting on the same
* [ descriptor (int fd), description (struct file *) ] tuplet
*/
if (!list_empty(&fdh->fdhead)) { /* uncommon, I hope... */
struct epw *cur;
list_for_each(&fdh->fdhead, cur, as.fdnode) {
ev.events |= rb_events2ep(cur->events);
}
}
list_add(&fdh->fdhead, &epw->as.fdnode);
}
epfd = iom_epfd(th->vm->iom); /* may raise */
/* we want to track if an FD is already being watched ourselves */
if (epw->flags) {
if (*epw->flags & FMODE_IOM_ADDED) { /* ideal situation */
e = epoll_ctl(epfd, EPOLL_CTL_MOD, epw->fd, &ev);
}
else {
e = epoll_ctl(epfd, EPOLL_CTL_ADD, epw->fd, &ev);
if (e == 0) {
*epw->flags |= FMODE_IOM_ADDED;
}
else if (e < 0 && errno == EEXIST) {
/*
* possible EEXIST if several fptrs point to the same FD:
* f1 = Fiber.start { io1.read(1) }
* io2 = IO.for_fd(io1.fileno)
* f2 = Fiber.start { io2.read(1) }
*/
*epw->flags |= FMODE_IOM_ADDED;
e = epoll_ctl(epfd, EPOLL_CTL_MOD, epw->fd, &ev);
}
}
}
else { /* don't know if added or not, fall back to add on ENOENT */
e = epoll_ctl(epfd, EPOLL_CTL_MOD, epw->fd, &ev);
if (e < 0 && errno == ENOENT) {
e = epoll_ctl(epfd, EPOLL_CTL_ADD, epw->fd, &ev);
}
}
if (e < 0) {
rb_sys_fail("epoll_ctl");
}
}
static VALUE
epmod_yield(VALUE ptr)
{
/* we must have no posibility of raising until list_add: */
struct epw *epw = (struct epw *)ptr;
rb_thread_t *th = epw->as.pre_ctl.th;
epoll_ctl_or_raise(th, epw);
ping_events(th);
(void)rb_fiber_auto_do_yield_p(th);
return rb_fiber_yield(0, 0);
}
static VALUE
epw_done(VALUE ptr)
{
struct epw *epw = (struct epw *)ptr;
list_del(&epw->as.fdnode);
return rb_iom_waiter_done((VALUE)&epw->w);
}
static int
iom_waitfd(rb_thread_t *th, int fd, int *flags, int events, double *timeout)
{
rb_iom_t *iom = rb_iom_get(th);
struct epw epw;
/* unlike kqueue or select, we never need to reread fd */
epw.fd = fd;
if (epw.fd < 0) { /* TODO: behave like poll(2) and sleep? */
return 0;
}
/* may raise on OOM: */
epw.as.pre_ctl.fdh = rb_iom_fd_get(&iom->fdmap, epw.fd);
epw.as.pre_ctl.th = th;
epw.flags = flags;
/*
* if we did not have GVL, revents may be set immediately
* upon epoll_ctl by another thread running epoll_wait,
* so we must initialize it before epoll_ctl:
*/
epw.revents = 0;
epw.events = (short)events;
list_add(&iom->epws, &epw.w.wnode);
rb_iom_timer_add(th, &epw.w.timer, timeout, IOM_FIB|IOM_WAIT);
rb_ensure(epmod_yield, (VALUE)&epw, epw_done, (VALUE)&epw);
return (int)epw.revents; /* may be zero if timed out */
}
int
rb_iom_waitio(rb_thread_t *th, rb_io_t *fptr, int events, double *timeout)
{
return iom_waitfd(th, fptr->fd, &fptr->mode, events, timeout);
}
int
rb_iom_waitfd(rb_thread_t *th, int *fdp, int events, double *timeout)
{
return iom_waitfd(th, *fdp, 0, events, timeout);
}
void
rb_iom_destroy(rb_vm_t *vm)
{
rb_iom_t *iom = vm->iom;
vm->iom = 0;
if (iom) {
/*
* it's possible; but crazy to share epoll FDs across processes
* (kqueue has a rather unique close-on-fork behavior)
*/
if (iom->epoll_fd >= 0) {
close(iom->epoll_fd);
}
rb_iom_fdmap_destroy(&iom->fdmap);
xfree(iom);
}
}
/* used by thread.c::rb_thread_atfork */
static void
rb_iom_atfork_child(rb_thread_t *th)
{
rb_iom_destroy(th->vm);
}
/* used by thread_pthread.c */
static int
rb_iom_reserved_fd(int fd)
{
rb_iom_t *iom = GET_VM()->iom;
return iom && fd == iom->epoll_fd;
}
#include "iom_pingable_common.h"
#include "iom_common.h"
iom_internal.h
#ifndef RB_IOM_COMMON_H
#define RB_IOM_COMMON_H
#include "internal.h"
#include "iom.h"
/* cont.c */
void rb_fiber_auto_enqueue(VALUE fibval);
#define FMODE_IOM_PRIVATE1 0x01000000
#define FMODE_IOM_PRIVATE2 0x02000000
#define IOM_FIBMASK ((VALUE)0x1)
#define IOM_FIB (0x2)
#define IOM_WAIT (0x1) /* container_of(..., struct rb_iom_waiter, timer) */
/*
* fdmap is a singleton.
*
* It makes zero sense to have multiple fdmaps per-process; even less so
* than multiple ioms. The file descriptor table in POSIX is per-process;
* and POSIX requires syscalls to allocate the lowest available FD.
* This is also why we use an array instead of a hash table, as there will
* be no holes for big processes.
*
* If contention becomes a problem, we can pad (struct rb_iom_fd) to
* 64-bytes for cache alignment.
*
* Currently we use fdmap to deal with FD aliasing with epoll
* and kqueue interfaces.. FD aliasing happens when multiple
* Fibers wait on the same FD; but epoll/kqueue APIs only allow
* registering a single data pointer per FD.
*
* In the future, we may implement rb_notify_fd_close using fdmap.
*/
/* linear growth based on power-of-two */
#define RB_IOM_FD_PER_HEAP 64
/* on-heap and persistent for process lifetime keep as small as possible. */
struct rb_iom_fd {
struct list_head fdhead; /* -kev.(rfdnode|wfdnode), epw.fdnode */
};
/* singleton (per-rb_iom_t, or per process, if we ever need > 1 iom) */
struct rb_iom_fdmap {
struct rb_iom_fd **map;
unsigned int heaps;
int max_fd;
};
/* allocated on stack */
/* Every auto-yielded fiber has this on stack */
struct rb_iom_timer {
union {
struct list_node rnode; /* <=> rb_thread_t.afrunq */
struct list_node tnode; /* <=> rb_iom_struct.timers */
} n;
double expires_at; /* absolute monotonic time */
VALUE _fibval;
};
/* common waiter struct for waiting fds and pids */
struct rb_iom_waiter {
struct rb_iom_timer timer;
struct list_node wnode; /* <=> rb_iom_struct.(fds|pids) */
};
struct rb_iom_fd_waiter {
struct rb_iom_waiter w; /* w.wnode - iom->fds */
int *fdp; /* (ideally), a pointer fptr->fd to detect closed FDs */
short events; /* requested events, like poll(2) */
short revents; /* returned events, like poll(2) */
};
struct rb_iom_pid_waiter {
struct rb_iom_waiter w; /* w.wnode - iom->pids */
rb_thread_t *th;
/* same pid, status, options same as waitpid(2) */
rb_pid_t pid;
int status;
int options;
int errnum;
};
/* threads sleeping in select, epoll_wait or kevent w/o GVL; on stack */
struct rb_iom_blocker {
rb_thread_t *th;
struct list_node bnode; /* -iom->blockers */
};
#if (RUBYVM_IOM == IOM_KQUEUE || RUBYVM_IOM == IOM_EPOLL)
/* TODO: IOM_SELECT may use this for rb_notify_fd_close */
static struct rb_iom_fd *
iom_fdhead_aref(struct rb_iom_fdmap *fdmap, int fd)
{
VM_ASSERT(fd >= 0);
return &fdmap->map[fd / RB_IOM_FD_PER_HEAP][fd % RB_IOM_FD_PER_HEAP];
}
static struct rb_iom_fd *
rb_iom_fd_get(struct rb_iom_fdmap *fdmap, int fd)
{
if (fd >= fdmap->max_fd) {
struct rb_iom_fd *base, *h;
unsigned n = fdmap->heaps + 1;
unsigned i;
fdmap->map = xrealloc2(fdmap->map, n, sizeof(struct rb_iom_fd *));
base = h = ALLOC_N(struct rb_iom_fd, RB_IOM_FD_PER_HEAP);
for (i = 0; i < RB_IOM_FD_PER_HEAP; i++) {
list_head_init(&h->fdhead);
h++;
}
fdmap->map[fdmap->heaps] = base;
fdmap->max_fd += RB_IOM_FD_PER_HEAP;
}
return iom_fdhead_aref(fdmap, fd);
}
static void
rb_iom_fdmap_init(struct rb_iom_fdmap *fdmap)
{
fdmap->max_fd = 0;
fdmap->heaps = 0;
fdmap->map = 0;
}
static void
rb_iom_fdmap_destroy(struct rb_iom_fdmap *fdmap)
{
unsigned n;
for (n = 0; n < fdmap->heaps; n++) {
xfree(fdmap->map[n]);
}
xfree(fdmap->map);
rb_iom_fdmap_init(fdmap);
}
#endif /* (RUBYVM_IOM == IOM_KQUEUE || RUBYVM_IOM == IOM_EPOLL) */
static VALUE
rb_iom_timer_fibval(const struct rb_iom_timer *t)
{
return t->_fibval & ~IOM_FIBMASK;
}
static struct rb_iom_waiter *
rb_iom_waiter_of(struct rb_iom_timer *t)
{
if (t->_fibval & IOM_FIBMASK) {
return 0;
}
return container_of(t, struct rb_iom_waiter, timer);
}
static double
rb_iom_next_timeout(struct list_head *timers)
{
struct rb_iom_timer *t = list_top(timers, struct rb_iom_timer, n.tnode);
if (t) {
double diff = t->expires_at - timeofday();
return diff <= 0.0 ? 0 : diff;
}
else {
return -1;
}
}
static void rb_iom_timer_check(const rb_thread_t *);
static void rb_iom_timer_add(rb_thread_t *, struct rb_iom_timer *,
const double *timeout, int flags);
static VALUE
rb_iom_timer_done(VALUE ptr)
{
struct rb_iom_timer *t = (struct rb_iom_timer *)ptr;
list_del(&t->n.tnode);
return Qfalse;
}
static void
rb_iom_waiter_ready(struct rb_iom_waiter *w)
{
VALUE fibval = rb_iom_timer_fibval(&w->timer);
list_del_init(&w->wnode);
list_del_init(&w->timer.n.tnode);
if (fibval != Qfalse) {
rb_thread_t *owner = rb_fiber_owner_thread(fibval);
list_add_tail(&owner->afrunq, &w->timer.n.rnode);
}
}
static VALUE
rb_iom_waiter_done(VALUE ptr)
{
struct rb_iom_waiter *w = (struct rb_iom_waiter *)ptr;
list_del(&w->timer.n.tnode);
list_del(&w->wnode);
return Qfalse;
}
/* cont.c */
int rb_fiber_resumable_p(const rb_thread_t *, const rb_fiber_t *);
/*
* resume all "ready" fibers belonging to a given thread
* stop when a fiber has not yielded, yet.
*/
static int
rb_fiber_auto_do_yield_p(rb_thread_t *th)
{
rb_fiber_t *current_auto = rb_fiber_auto_sched_p(th) ? th->fiber : 0;
struct rb_iom_timer *t = 0, *next = 0;
LIST_HEAD(tmp);
/*
* do not infinite loop as new fibers get added to
* th->afrunq, only work off a temporary list:
*/
list_append_list(&tmp, &th->afrunq);
list_for_each_safe(&tmp, t, next, n.rnode) {
VALUE fibval = rb_iom_timer_fibval(t);
rb_fiber_t *fib = RTYPEDDATA_DATA(fibval);
if (fib == current_auto || !rb_fiber_resumable_p(th, fib)) {
/* tell the caller to yield */
list_prepend_list(&th->afrunq, &tmp);
return 1;
}
rb_fiber_resume(fibval, 0, 0);
}
return 0;
}
/* XXX: is this necessary? */
void
rb_iom_mark_runq(const rb_thread_t *th)
{
struct rb_iom_timer *t = 0;
list_for_each(&th->afrunq, t, n.rnode) {
rb_gc_mark(rb_iom_timer_fibval(t));
}
}
static rb_iom_t *rb_iom_get(rb_thread_t *);
static void rb_iom_blockers_notify(rb_iom_t *, int max);
#endif /* IOM_COMMON_H */
iom_kqueue.h
/*
* kqueue-based implementation of I/O Manager for RubyVM on *BSD
*
* The kevent API has an advantage over epoll_ctl+epoll_wait since
* it can simultaneously add filters and check for events with one
* syscall. It also allows EV_ADD to be used idempotently for
* enabling filters, where as epoll_ctl requires separate ADD and
* MOD operations.
*
* These are advantages in the common case...
*
* The epoll API has advantages in more esoteric cases:
*
* epoll has the advantage over kqueue when watching for multiple
* events (POLLIN|POLLOUT|POLLPRI) (which is rare). We also have
* to install two kevent filters to watch POLLIN|POLLOUT simutaneously.
* See udata_set/udata_get functions below for more on this.
*
* Finally, kevent does not support POLLPRI directly, we need to use
* select() (or perhaps poll() on some platforms) with a zero
* timeout to check for POLLPRI after EVFILT_READ returns.
*
* Finally, several *BSDs implement kqueue; and the quality of each
* implementation may vary. Anecdotally, *BSDs are not known to even
* support poll() consistently across different types of files.
* We will need to selective and careful about injecting them into
* kevent().
*/
#include "iom_internal.h"
/* LIST_HEAD (via ccan/list) conflicts with sys/queue.h (via sys/event.h) */
#undef LIST_HEAD
#include <sys/types.h>
#include <sys/event.h>
#include <sys/time.h>
/* We need to use EVFILT_READ to watch RB_WAITFD_PRI */
#define WAITFD_READ (RB_WAITFD_IN|RB_WAITFD_PRI)
/* allocated on heap (rb_vm_t.iom) */
struct rb_iom_struct {
/*
* Everything here is protected by GVL at this time,
* URCU lists (LGPL-2.1+) may be used in the future
*/
/* we NEVER need to scan kevs , only insert + delete + empty check */
struct list_head kevs; /* -kev.fdw.w.wnode, order agnostic */
struct list_head timers; /* -rb_iom_timer.n.tnode, sort by expire_at */
struct list_head pids; /* -rb_iom_pid_waiter.w.wnode, LIFO order */
struct rb_iom_fdmap rfdmap; /* holds fdh for EVFILT_READ */
struct rb_iom_fdmap wfdmap; /* holds fdh for EVFILT_WRITE */
int kqueue_fd;
int nevents; /* auto-increases */
struct list_head blockers; /* -rb_iom_blocker.bnode */
};
/* allocated on stack */
struct kev {
/* fdw.w.wnode is overloaded for checking RB_WAITFD_PRI (seee check_pri) */
struct rb_iom_fd_waiter fdw;
rb_thread_t *th;
/*
* both rfdnode and wfdnode are overloaded for deleting paired
* filters when watching both EVFILT_READ and EVFILT_WRITE on
* a single FD
*/
struct list_node rfdnode; /* -(ev.udata==fdh)->fdhead (EVFILT_READ) */
struct list_node wfdnode; /* -(ev.udata==fdh)->fdhead (EVFILT_WRITE) */
};
/*
* like our epoll implementation, we "ping" using kevent with zero-timeout
* and can do so on any thread.
*/
... This diff was truncated because it exceeds the maximum size that can be displayed.
(1-1/3)