Project

General

Profile

Feature #13618 ยป 0001-auto-fiber-schedule-for-rb_wait_for_single_fd-and-rb.patch

normalperson (Eric Wong), 06/01/2017 12:14 AM

View differences:

common.mk
2607 2607
thread.$(OBJEXT): {$(VPATH)}intern.h
2608 2608
thread.$(OBJEXT): {$(VPATH)}internal.h
2609 2609
thread.$(OBJEXT): {$(VPATH)}io.h
2610
thread.$(OBJEXT): {$(VPATH)}iom.h
2611
thread.$(OBJEXT): {$(VPATH)}iom_internal.h
2612
thread.$(OBJEXT): {$(VPATH)}iom_common.h
2613
thread.$(OBJEXT): {$(VPATH)}iom_epoll.h
2614
thread.$(OBJEXT): {$(VPATH)}iom_kqueue.h
2615
thread.$(OBJEXT): {$(VPATH)}iom_pingable_common.h
2616
thread.$(OBJEXT): {$(VPATH)}iom_select.h
2610 2617
thread.$(OBJEXT): {$(VPATH)}method.h
2611 2618
thread.$(OBJEXT): {$(VPATH)}missing.h
2612 2619
thread.$(OBJEXT): {$(VPATH)}node.h
configure.in
1389 1389
AC_CHECK_HEADERS(pwd.h)
1390 1390
AC_CHECK_HEADERS(setjmpex.h)
1391 1391
AC_CHECK_HEADERS(sys/attr.h)
1392
AC_CHECK_HEADERS(sys/epoll.h)
1393
AC_CHECK_HEADERS(sys/event.h)
1392 1394
AC_CHECK_HEADERS(sys/fcntl.h)
1393 1395
AC_CHECK_HEADERS(sys/file.h)
1394 1396
AC_CHECK_HEADERS(sys/id.h)
......
2405 2407
AC_CHECK_FUNCS(dup)
2406 2408
AC_CHECK_FUNCS(dup3)
2407 2409
AC_CHECK_FUNCS(eaccess)
2410
AC_CHECK_FUNCS(epoll_create)
2411
AC_CHECK_FUNCS(epoll_create1)
2412
AC_CHECK_FUNCS(epoll_ctl)
2413
AC_CHECK_FUNCS(epoll_wait)
2408 2414
AC_CHECK_FUNCS(endgrent)
2409 2415
AC_CHECK_FUNCS(fchmod)
2410 2416
AC_CHECK_FUNCS(fchown)
......
2438 2444
AC_CHECK_FUNCS(ioctl)
2439 2445
AC_CHECK_FUNCS(isfinite)
2440 2446
AC_CHECK_FUNCS(issetugid)
2447
AC_CHECK_FUNCS(kevent)
2441 2448
AC_CHECK_FUNCS(killpg)
2449
AC_CHECK_FUNCS(kqueue)
2442 2450
AC_CHECK_FUNCS(lchmod)
2443 2451
AC_CHECK_FUNCS(lchown)
2444 2452
AC_CHECK_FUNCS(link)
......
3590 3598
AS_IF([test x$with_valgrind != xno],
3591 3599
        [AC_CHECK_HEADERS(valgrind/memcheck.h)])
3592 3600

  
3601
AC_DEFINE_UNQUOTED(IOM_SELECT, 0)
3602
AC_DEFINE_UNQUOTED(IOM_KQUEUE, 1)
3603
AC_DEFINE_UNQUOTED(IOM_EPOLL, 2)
3604

  
3605
iom_default=select
3606
AS_CASE([$ac_cv_func_kqueue:$ac_cv_func_kevent:$ac_cv_header_sys_event_h],
3607
[yes:yes:yes], [iom_default=kqueue],
3608
[*],
3609
  [AS_CASE(
3610
    [$ac_cv_func_epoll_wait:$ac_cv_func_epoll_create:$ac_cv_header_sys_epoll_h],
3611
    [yes:yes:yes], [iom_default=epoll])]
3612
)
3613

  
3614
AC_ARG_WITH(iom,
3615
  AS_HELP_STRING([--with-iom=XXXXX],
3616
		 [I/O manager (select|kqueue|epoll)]),
3617
  [with_iom="$withval"], [with_iom="$iom_default"])
3618
AS_CASE(["$with_iom"],
3619
  [select], [AC_DEFINE_UNQUOTED(RUBYVM_IOM, IOM_SELECT)],
3620
  [kqueue], [AC_DEFINE_UNQUOTED(RUBYVM_IOM, IOM_KQUEUE)],
3621
  [epoll], [AC_DEFINE_UNQUOTED(RUBYVM_IOM, IOM_EPOLL)],
3622
  [AC_MSG_ERROR(unknown I/O manager: $with_iom)])
3623

  
3593 3624
dln_a_out_works=no
3594 3625
if test "$ac_cv_header_a_out_h" = yes; then
3595 3626
  if test "$with_dln_a_out" = yes || test "$rb_cv_dlopen" = unknown; then
......
4745 4776
config_summary "man page type"       "$MANTYPE"
4746 4777
config_summary "search path"         "$search_path"
4747 4778
config_summary "static-linked-ext"   ${EXTSTATIC:+"yes"}
4779
config_summary "I/O manager"         ${with_iom}
4748 4780
echo ""
4749 4781
echo "---"
cont.c
13 13
#include "vm_core.h"
14 14
#include "gc.h"
15 15
#include "eval_intern.h"
16
#include "iom.h"
16 17

  
17 18
/* FIBER_USE_NATIVE enables Fiber performance improvement using system
18 19
 * dependent method such as make/setcontext on POSIX system or
......
132 133
     * You shouldn't mix "transfer" and "resume".
133 134
     */
134 135
    int transferred;
136
    unsigned int auto_fiber:1;
135 137

  
136 138
#if FIBER_USE_NATIVE
137 139
#ifdef _WIN32
......
1496 1498
    fiber_switch(return_fiber(), 1, &value, 0);
1497 1499
}
1498 1500

  
1499
VALUE
1500
rb_fiber_resume(VALUE fibval, int argc, const VALUE *argv)
1501
int
1502
rb_fiber_resumable_p(const rb_thread_t *th, const rb_fiber_t *fib)
1501 1503
{
1502
    rb_fiber_t *fib;
1503
    GetFiberPtr(fibval, fib);
1504
    return th->root_fiber != fib && !fib->prev;
1505
}
1504 1506

  
1507
static void
1508
fiber_check_resume(const rb_fiber_t *fib)
1509
{
1505 1510
    if (fib->prev != 0 || fib->cont.type == ROOT_FIBER_CONTEXT) {
1506 1511
	rb_raise(rb_eFiberError, "double resume");
1507 1512
    }
1508 1513
    if (fib->transferred != 0) {
1509 1514
	rb_raise(rb_eFiberError, "cannot resume transferred Fiber");
1510 1515
    }
1516
}
1511 1517

  
1518
VALUE
1519
rb_fiber_resume(VALUE fibval, int argc, const VALUE *argv)
1520
{
1521
    rb_fiber_t *fib;
1522
    GetFiberPtr(fibval, fib);
1523

  
1524
    fiber_check_resume(fib);
1512 1525
    return fiber_switch(fib, argc, argv, 1);
1513 1526
}
1514 1527

  
......
1651 1664
    return rb_fiber_current();
1652 1665
}
1653 1666

  
1667
/* Returns true if auto-fiber is enabled for current fiber */
1668
int
1669
rb_fiber_auto_sched_p(const rb_thread_t *th)
1670
{
1671
    const rb_fiber_t *cur = th->fiber;
1672

  
1673
    return (cur && cur->auto_fiber && th->root_fiber != cur);
1674
}
1675

  
1676
/*
1677
 * Enable auto-scheduling for the Fiber and resume it
1678
 */
1679
static VALUE
1680
rb_fiber_auto_start(int argc, VALUE *argv, VALUE self)
1681
{
1682
    rb_thread_t *th = GET_THREAD();
1683
    rb_fiber_t *fib;
1684
    GetFiberPtr(self, fib);
1685

  
1686
    if (th->root_fiber == fib) {
1687
	rb_raise(rb_eFiberError, "Root fiber cannot #start");
1688
    }
1689
    if (fib->auto_fiber) {
1690
	rb_raise(rb_eFiberError, "Fiber already started");
1691
    }
1692
    fib->auto_fiber = 1;
1693
    fiber_check_resume(fib);
1694
    return fiber_switch(fib, argc, argv, 1);
1695
}
1654 1696

  
1697
rb_thread_t *
1698
rb_fiber_owner_thread(VALUE self)
1699
{
1700
    rb_fiber_t *fib;
1701
    rb_thread_t *th;
1702

  
1703
    GetFiberPtr(self, fib);
1704
    GetThreadPtr(fib->cont.saved_thread.self, th);
1705

  
1706
    return th;
1707
}
1708

  
1709
static void
1710
fiber_auto_join(rb_fiber_t *fib, double *timeout)
1711
{
1712
    rb_thread_t *th = GET_THREAD();
1713
    rb_fiber_t *cur = fiber_current();
1714

  
1715
    if (cur == fib) {
1716
	rb_raise(rb_eFiberError, "Target fiber must not be current fiber");
1717
    }
1718
    if (th->root_fiber == fib) {
1719
	rb_raise(rb_eFiberError, "Target fiber must not be root fiber");
1720
    }
1721
    if (fib->cont.saved_thread.self != th->self) {
1722
	rb_raise(rb_eFiberError, "Target fiber not owned by current thread");
1723
    }
1724
    if (!fib->auto_fiber) {
1725
	rb_raise(rb_eFiberError, "Target fiber is not an auto-fiber");
1726
    }
1727

  
1728
    while (fib->status != TERMINATED && (timeout == 0 || *timeout >= 0.0)) {
1729
	rb_iom_schedule(th, timeout);
1730
    }
1731
}
1732

  
1733
static VALUE
1734
rb_fiber_auto_join(int argc, VALUE *argv, VALUE self)
1735
{
1736
    rb_fiber_t *fib;
1737
    double timeout, *t;
1738
    VALUE limit;
1739

  
1740
    GetFiberPtr(self, fib);
1741
    rb_scan_args(argc, argv, "01", &limit);
1742

  
1743
    if (NIL_P(limit)) {
1744
	t = 0;
1745
    } else {
1746
	timeout = rb_num2dbl(limit);
1747
	t = &timeout;
1748
    }
1749

  
1750
    fiber_auto_join(fib, t);
1751
    return fib->status == TERMINATED ? fib->cont.self : Qnil;
1752
}
1753

  
1754
static VALUE
1755
rb_fiber_auto_value(VALUE self)
1756
{
1757
    rb_fiber_t *fib;
1758
    GetFiberPtr(self, fib);
1759

  
1760
    fiber_auto_join(fib, 0);
1761
    return fib->cont.value;
1762
}
1655 1763

  
1656 1764
/*
1657 1765
 *  Document-class: FiberError
......
1688 1796
    rb_define_singleton_method(rb_cFiber, "yield", rb_fiber_s_yield, -1);
1689 1797
    rb_define_method(rb_cFiber, "initialize", rb_fiber_init, 0);
1690 1798
    rb_define_method(rb_cFiber, "resume", rb_fiber_m_resume, -1);
1799
    rb_define_method(rb_cFiber, "start", rb_fiber_auto_start, -1);
1800
    rb_define_method(rb_cFiber, "join", rb_fiber_auto_join, -1);
1801
    rb_define_method(rb_cFiber, "value", rb_fiber_auto_value, 0);
1691 1802
}
1692 1803

  
1693 1804
RUBY_SYMBOL_EXPORT_BEGIN
include/ruby/io.h
116 116
/* #define FMODE_UNIX                  0x00200000 */
117 117
/* #define FMODE_INET                  0x00400000 */
118 118
/* #define FMODE_INET6                 0x00800000 */
119
/* #define FMODE_IOM_PRIVATE1          0x01000000 */ /* OS-dependent */
120
/* #define FMODE_IOM_PRIVATE2          0x02000000 */ /* OS-dependent */
119 121

  
120 122
#define GetOpenFile(obj,fp) rb_io_check_closed((fp) = RFILE(rb_io_taint_check(obj))->fptr)
121 123

  
iom.h
1
/*
2
 * iom -> I/O Manager for RubyVM (auto-Fiber-aware)
3
 *
4
 * On platforms with epoll or kqueue, this should be ready for multicore;
5
 * even if the rest of the RubyVM is not.
6
 *
7
 * Some inspiration taken from Mio in GHC:
8
 * http://haskell.cs.yale.edu/wp-content/uploads/2013/08/hask035-voellmy.pdf
9
 */
10
#ifndef RUBY_IOM_H
11
#define RUBY_IOM_H
12
#include "ruby.h"
13
#include "ruby/io.h"
14
#include "ruby/intern.h"
15
#include "vm_core.h"
16

  
17
typedef struct rb_iom_struct rb_iom_t;
18

  
19
/* WARNING: unstable API, only for Ruby internal use */
20

  
21
/*
22
 * Note: the first "rb_thread_t *" is a placeholder and may be replaced
23
 * with "rb_execution_context_t *" in the future.
24
 */
25

  
26
/*
27
 * All functions with "wait" in it take an optional double * +timeout+
28
 * argument specifying the timeout in seconds.  If NULL, it can wait
29
 * forever until the event happens (or the fiber is explicitly resumed).
30
 *
31
 * (maybe) TODO: If non-NULL, the timeout will be updated to the
32
 * remaining time upon returning.  Not sure if useful, could just be
33
 * a a waste of cycles; so not implemented, yet.
34
 */
35

  
36
/*
37
 * Relinquish calling fiber while waiting for +events+ on the given
38
 * +rb_io_t+
39
 *
40
 * Multiple native threads can enter this function at the same time.
41
 *
42
 * Events are RB_WAITFD_IN, RB_WAITFD_OUT, RB_WAITFD_PRI
43
 *
44
 * Returns a mask of events.
45
 */
46

  
47
int rb_iom_waitio(rb_thread_t *, rb_io_t *, int events, double *timeout);
48

  
49
/*
50
 * Identical to rb_iom_waitio, but takes a pointer to an integer file
51
 * descriptor, instead of rb_io_t.  Use rb_iom_waitio when possible,
52
 * since it allows us to optimize epoll (and perhaps avoid kqueue
53
 * portability bugs across different *BSDs).
54
 */
55
int rb_iom_waitfd(rb_thread_t *, int *fdp, int events, double *timeout);
56

  
57
/*
58
 * Relinquish calling fiber to wait for the given PID to change status.
59
 * Multiple native threads can enter this function at the same time.
60
 * If timeout is negative, wait forever.
61
 */
62
rb_pid_t rb_iom_waitpid(rb_thread_t *,
63
			rb_pid_t, int *status, int options, double *timeout);
64

  
65
/*
66
 * Relinquish calling fiber for at least the duration of given timeout
67
 * in seconds.  If timeout is negative, wait forever (until explicitly
68
 * resumed).
69
 * Multiple native threads can enter this function at the same time.
70
 */
71
void rb_iom_sleep(rb_thread_t *, double *timeout);
72

  
73
/* callback for SIGCHLD, needed to implemented for rb_iom_waitpid */
74
void rb_iom_sigchld(rb_vm_t *);
75

  
76
/*
77
 * there is no public create function, creation is lazy to avoid incurring
78
 * overhead for small scripts which do not need fibers, we only need this
79
 * at VM destruction
80
 */
81
void rb_iom_destroy(rb_vm_t *);
82

  
83
/*
84
 * schedule
85
 */
86
void rb_iom_schedule(rb_thread_t *th, double *timeout);
87

  
88
/* cont.c */
89
int rb_fiber_auto_sched_p(const rb_thread_t *);
90
rb_thread_t *rb_fiber_owner_thread(VALUE);
91

  
92
#endif /* RUBY_IOM_H */
iom_common.h
1
/* included by iom_(epoll|select|kqueue).h */
2

  
3
/* we lazily create this, small scripts may never need iom */
4
static rb_iom_t *
5
rb_iom_new(rb_thread_t *th)
6
{
7
    rb_iom_t *iom = ALLOC(rb_iom_t);
8
    rb_iom_init(iom);
9
    return iom;
10
}
11

  
12
static rb_iom_t *
13
rb_iom_get(rb_thread_t *th)
14
{
15
    VM_ASSERT(th && th->vm);
16
    if (!th->vm->iom) {
17
	th->vm->iom = rb_iom_new(th);
18
    }
19
    return th->vm->iom;
20
}
21

  
22
/* check for expired timers */
23
static void
24
rb_iom_timer_check(const rb_thread_t *th)
25
{
26
    rb_iom_t *iom = th->vm->iom;
27
    if (iom) {
28
	struct rb_iom_timer *t = 0, *next = 0;
29
	double now = timeofday();
30

  
31
	list_for_each_safe(&iom->timers, t, next, n.tnode) {
32
	    if (t->expires_at <= now) {
33
		struct rb_iom_waiter *w = rb_iom_waiter_of(t);
34
		VALUE fibval = rb_iom_timer_fibval(t);
35

  
36
		if (w) {
37
		    list_del_init(&w->wnode);
38
		}
39
		list_del_init(&t->n.tnode);
40
		/* non-auto-fibers may set timer in rb_iom_schedule */
41
		if (fibval != Qfalse) {
42
		    rb_thread_t *owner = rb_fiber_owner_thread(fibval);
43
		    list_add_tail(&owner->afrunq, &t->n.rnode);
44
		}
45
	    }
46
	    return; /* done, timers is a sorted list */
47
	}
48
    }
49
}
50

  
51
/* insert a new +timer+ into +timers+, maintain sort order by expires_at */
52
static void
53
rb_iom_timer_add(rb_thread_t *th, struct rb_iom_timer *add,
54
		const double *timeout, int flags)
55
{
56
    add->_fibval = flags & IOM_FIB ? rb_fiber_current() : Qfalse;
57
    add->_fibval |= flags & IOM_WAIT ? 0 : IOM_FIBMASK;
58
    rb_iom_timer_check(th);
59

  
60
    if (timeout) {
61
	rb_iom_t *iom = rb_iom_get(th);
62
	struct rb_iom_timer *i = 0;
63
	add->expires_at = timeofday() + *timeout;
64

  
65
	/*
66
	 * search backwards: assume typical projects have multiple objects
67
	 * sharing the same timeout values, so new timers will expire later
68
	 * than existing timers
69
	 */
70
	list_for_each_rev(&iom->timers, i, n.tnode) {
71
	    if (add->expires_at > i->expires_at) {
72
		list_add_after(&iom->timers, &i->n.tnode, &add->n.tnode);
73
		return;
74
	    }
75
	}
76
	list_add(&iom->timers, &add->n.tnode);
77
    }
78
    else {
79
	/* not active, just allow list_del to function */
80
	list_node_init(&add->n.tnode);
81
    }
82
}
83

  
84
/* max == -1 : wake all */
85
static void
86
rb_iom_blockers_notify(rb_iom_t *iom, int max)
87
{
88
    struct rb_iom_blocker *b = 0, *next = 0;
89

  
90
    list_for_each_safe(&iom->blockers, b, next, bnode) {
91
	list_del_init(&b->bnode);
92
	ubf_select(b->th);
93
	if (--max == 0) {
94
	    break;
95
	}
96
    }
97
}
98

  
99
/*
100
 * TODO: consider EVFILT_PROC for kqueue and netlink+epoll on Linux;
101
 * see the "god" RubyGem for usage examples.
102
 * However, I doubt rb_waitpid scalability will be a problem and
103
 * the simplicity of a single implementation for all is appealing.
104
 */
105
#ifdef HAVE_SYS_TYPES_H
106
#  include <sys/types.h>
107
#endif
108
#ifdef HAVE_SYS_WAIT_H
109
#  include <sys/wait.h>
110
#endif
111
#if defined(WNOHANG) && WNOHANG != 0 && \
112
    (defined(HAVE_WAITPID) || defined(HAVE_WAIT4))
113

  
114
static VALUE
115
iom_schedule_pid(VALUE ptr)
116
{
117
    struct rb_iom_pid_waiter *pw  = (struct rb_iom_pid_waiter *)ptr;
118
    rb_thread_t *th = pw->th;
119

  
120
    rb_fiber_auto_do_yield_p(th);
121
    RUBY_VM_CHECK_INTS_BLOCKING(th);
122
    return rb_fiber_yield(0, 0);
123
}
124

  
125
rb_pid_t
126
rb_iom_waitpid(rb_thread_t *th, rb_pid_t pid, int *status, int options,
127
		double *timeout)
128
{
129
    struct rb_iom_pid_waiter pw;
130

  
131
    pw.options = options;
132
    VM_ASSERT((options & WNOHANG) == 0 &&
133
		"WNOHANG should be handled in rb_waitpid");
134

  
135
    /*
136
     * unlike rb_iom_waitfd, we typically call *waitpid before
137
     * trying with a non-blocking operation
138
     */
139
    pw.pid = rb_waitpid(pid, &pw.status, pw.options | WNOHANG);
140

  
141
    if (pw.pid == 0) {
142
	rb_iom_t *iom = rb_iom_get(th);
143

  
144
	pw.th = th;
145
	pw.pid = pid;
146
	rb_iom_timer_add(th, &pw.w.timer, timeout, IOM_FIB|IOM_WAIT);
147

  
148
	/* LIFO, to match Linux wait4() blocking behavior */
149
	list_add(&iom->pids, &pw.w.wnode);
150
	rb_ensure(iom_schedule_pid, (VALUE)&pw,
151
			rb_iom_waiter_done, (VALUE)&pw.w);
152
	if (pw.pid == -1) {
153
	    errno = pw.errnum;
154
	}
155
    }
156
    if (status) {
157
	*status = pw.status;
158
    }
159
    if (pw.pid > 0) {
160
	rb_last_status_set(pw.status, pw.pid);
161
    }
162
    return pw.pid;
163
}
164

  
165
void
166
rb_iom_sigchld(rb_vm_t *vm)
167
{
168
    rb_iom_t *iom = vm->iom;
169
    if (iom) {
170
	struct rb_iom_pid_waiter *pw = 0, *next = 0;
171
	size_t nr = 0;
172

  
173
	list_for_each_safe(&iom->pids, pw, next, w.wnode) {
174
	    pid_t r = rb_waitpid(pw->pid, &pw->status, pw->options | WNOHANG);
175

  
176
	    if (r == 0) {
177
		continue;
178
	    }
179
	    if (r == -1) {
180
		pw->errnum = errno;
181
	    }
182
	    nr++;
183
	    pw->pid = r;
184
	    rb_iom_waiter_ready(&pw->w);
185
	}
186
	if (nr) {
187
	    rb_iom_blockers_notify(iom, -1);
188
	}
189
    }
190
}
191
#else
192
rb_pid_t
193
rb_iom_waitpid(rb_thread_t *th, rb_pid_t pid, int *status, int options,
194
		double *timeout)
195
{
196
    rb_bug("Should not get here, WNOHANG not implemented");
197
}
198
#endif /* defined(WNOHANG) && (defined(HAVE_WAITPID) || defined(HAVE_WAIT4)) */
iom_epoll.h
1
/*
2
 * Linux-only epoll-based implementation of I/O Manager for RubyVM
3
 *
4
 * Notes:
5
 *
6
 * TODO: epoll_wait only has millisecond resolution; if we need higher
7
 * resolution we can use timerfd or ppoll on the epoll_fd itself.
8
 *
9
 * Inside the Linux kernel, select/poll/ppoll/epoll_wait all use the
10
 * same notification callback (struct file_operations)->poll.
11
 * Unlike with kqueue across different *BSDs; we do not need to worry
12
 * about inconsistencies between these syscalls.
13
 *
14
 * See also notes in iom_kqueue.h
15
 */
16
#include "iom_internal.h"
17
#include <sys/epoll.h>
18
#include <math.h> /* round() */
19
#define FMODE_IOM_ADDED           FMODE_IOM_PRIVATE1
20

  
21
/* allocated on heap (rb_vm_t.iom) */
22
struct rb_iom_struct {
23
    /*
24
     * Everything here is protected by GVL at this time,
25
     * URCU lists (LGPL-2.1+) may be used in the future
26
     */
27

  
28
    /* we NEVER need to scan epws, only insert + delete + empty check */
29
    struct list_head epws; /* -epw.w.wnode, order agnostic */
30

  
31
    struct list_head timers; /* -rb_iom_timer.n.tnode, sort by expire_at */
32
    struct list_head pids; /* -rb_iom_pid_waiter.w.wnode, LIFO order */
33
    struct rb_iom_fdmap fdmap; /* maps each FD to multiple epw */
34

  
35
    int epoll_fd;
36
    int maxevents; /* auto-increases */
37
    struct list_head blockers; /* -rb_iom_blocker.bnode */
38
};
39

  
40
/*
41
 * Not using rb_iom_fd_waiter here, since we never need to reread the
42
 * FD on this implementation.
43
 * Allocated on stack
44
 */
45
struct epw {
46
    struct rb_iom_waiter w;
47
    union {
48
	struct list_node fdnode;
49
	struct {
50
	    rb_thread_t *th;
51
	    struct rb_iom_fd *fdh;
52
	} pre_ctl;
53
    } as;
54
    int fd; /* no need for "int *", here, we never reread */
55
    short events; /* requested events, like poll(2) */
56
    short revents; /* returned events, like poll(2) */
57
    int *flags; /* &fptr->mode */
58
};
59

  
60
static void
61
increase_maxevents(rb_iom_t *iom, int retries)
62
{
63
    /* 1024 is the RUBY_ALLOCV_LIMIT on such systems */
64
    const int max_alloca = 1024 / sizeof(struct epoll_event);
65
    const int max = max_alloca * 2;
66

  
67
    if (retries) {
68
	iom->maxevents *= retries;
69
	if (iom->maxevents > max || iom->maxevents <= 0) {
70
	    iom->maxevents = max;
71
	}
72
    }
73
}
74

  
75
static int
76
double2msec(double sec)
77
{
78
    /*
79
     * clamp timeout to workaround a Linux <= 2.6.37 bug,
80
     * see epoll_wait(2) manpage
81
     */
82
    const int max_msec = 35 * 60 * 1000; /* floor(35.79 minutes) */
83
    if (sec < 0) {
84
	return -1;
85
    }
86
    else {
87
      double msec = round(sec * 1000);
88

  
89
      if (msec < (double)max_msec) {
90
          int ret = (int)msec;
91
          return ret < 0 ? 0 : ret;
92
      }
93
      return max_msec;
94
    }
95
}
96

  
97
/* we can avoid branches when mapping RB_WAIT_* bits to EPOLL* bits */
98
STATIC_ASSERT(epbits_matches_waitfd_bits,
99
    RB_WAITFD_IN == EPOLLIN && RB_WAITFD_OUT == EPOLLOUT &&
100
    RB_WAITFD_PRI == EPOLLPRI);
101

  
102
/* what goes into epoll_ctl... */
103
static int
104
rb_events2ep(int events)
105
{
106
    return EPOLLONESHOT | events;
107
}
108

  
109
/* ...what comes out of epoll_wait */
110
static short
111
rb_ep2revents(int revents)
112
{
113
    return (short)(revents & (EPOLLIN|EPOLLOUT|EPOLLPRI));
114
}
115

  
116
/* lazily create epoll FD, since not everybody waits on I/O */
117
static int
118
iom_epfd(rb_iom_t *iom)
119
{
120
    if (iom->epoll_fd < 0) {
121
#if defined(EPOLL_CLOEXEC) && defined(HAVE_EPOLL_CREATE1)
122
	iom->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
123
	if (iom->epoll_fd < 0) {
124
	    int err = errno;
125
	    if (rb_gc_for_fd(err)) {
126
		iom->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
127
		if (iom->epoll_fd < 0) {
128
		    rb_sys_fail("epoll_create1");
129
		}
130
	    }
131
	    else if (err != ENOSYS) {
132
		rb_syserr_fail(err, "epoll_create1");
133
	    }
134
	    else { /* libc >= kernel || build-env > run-env */
135
#endif /* HAVE_EPOLL_CREATE1 */
136
		iom->epoll_fd = epoll_create(1);
137
		if (iom->epoll_fd < 0) {
138
		    if (rb_gc_for_fd(errno)) {
139
			iom->epoll_fd = epoll_create(1);
140
		    }
141
		}
142
		if (iom->epoll_fd < 0) {
143
		    rb_sys_fail("epoll_create");
144
		}
145
		rb_maygvl_fd_fix_cloexec(iom->epoll_fd);
146
#if defined(EPOLL_CLOEXEC) && defined(HAVE_EPOLL_CREATE1)
147
	    }
148
	}
149
#endif /* HAVE_EPOLL_CREATE1 */
150
	rb_update_max_fd(iom->epoll_fd);
151
    }
152
    return iom->epoll_fd;
153
}
154

  
155
static void
156
rb_iom_init(rb_iom_t *iom)
157
{
158
    list_head_init(&iom->timers);
159
    list_head_init(&iom->epws);
160
    list_head_init(&iom->pids);
161
    list_head_init(&iom->blockers);
162
    iom->maxevents = 8;
163
    iom->epoll_fd = -1;
164
    rb_iom_fdmap_init(&iom->fdmap);
165
}
166

  
167
static void
168
check_epoll_wait(rb_thread_t *th, int nr, struct epoll_event *ev)
169
{
170
    if (nr >= 0) {
171
	int i;
172

  
173
	for (i = 0; i < nr; i++) {
174
	    struct rb_iom_fd *fdh = ev[i].data.ptr;
175
	    struct epw *epw = 0, *next = 0;
176
	    short revents = rb_ep2revents(ev[i].events);
177

  
178
	    /*
179
	     * Typical list size is 1; only multiple fibers waiting
180
	     * on the same FD increases fdh list size
181
	     */
182
	    list_for_each_safe(&fdh->fdhead, epw, next, as.fdnode) {
183
		epw->revents = epw->events & revents;
184
		list_del_init(&epw->as.fdnode);
185
		rb_iom_waiter_ready(&epw->w);
186
	    }
187
	}
188

  
189
	/* notify the waiter thread in case we enqueued fibers for them */
190
	if (nr > 0) {
191
	    rb_iom_blockers_notify(th->vm->iom, -1);
192
	}
193
    }
194
    else {
195
	int err = errno;
196
	if (err != EINTR) {
197
	    rb_syserr_fail(err, "epoll_wait");
198
	}
199
    }
200
    rb_iom_timer_check(th);
201
    RUBY_VM_CHECK_INTS_BLOCKING(th);
202
}
203

  
204
/* perform a non-blocking epoll_wait while holding GVL */
205
static void
206
ping_events(rb_thread_t *th)
207
{
208
    rb_iom_t *iom = th->vm->iom;
209
    int epfd = iom ? iom->epoll_fd : -1;
210

  
211
    if (epfd >= 0) {
212
	VALUE v;
213
	int nr;
214
	int maxevents = iom->maxevents;
215
	struct epoll_event *ev = ALLOCV_N(struct epoll_event, v, maxevents);
216
	int retries = 0;
217

  
218
	do {
219
	    nr = epoll_wait(epfd, ev, maxevents, 0);
220
	    check_epoll_wait(th, nr, ev);
221
	} while (nr == maxevents && ++retries);
222
	if (v) {
223
	    ALLOCV_END(v);
224
	}
225
	increase_maxevents(iom, retries);
226
    }
227
}
228

  
229
/* for iom_pingable_common.h */
230
static void
231
rb_iom_do_wait(rb_thread_t *th, rb_iom_t *iom)
232
{
233
    int maxevents = iom->maxevents;
234
    int nr = maxevents;
235
    double timeout;
236

  
237
    RUBY_VM_CHECK_INTS_BLOCKING(th);
238
    timeout = rb_iom_next_timeout(&iom->timers);
239

  
240
    if (timeout != 0 && (!list_empty(&iom->epws) || !list_empty(&iom->pids))) {
241
	VALUE v;
242
	int epfd = iom_epfd(th->vm->iom); /* may raise */
243
	struct epoll_event *ev = ALLOCV_N(struct epoll_event, v, maxevents);
244
	int msec = double2msec(timeout);
245
	struct rb_iom_blocker cur;
246

  
247
	VM_ASSERT(epfd >= 0);
248
	cur.th = th;
249
	list_add_tail(&iom->blockers, &cur.bnode);
250
	BLOCKING_REGION({
251
	    nr = epoll_wait(epfd, ev, maxevents, msec);
252
	}, ubf_select, th, FALSE);
253
	list_del(&cur.bnode);
254
	check_epoll_wait(th, nr, ev);
255
	if (v) {
256
	    ALLOCV_END(v);
257
	}
258
    }
259
    if (nr == maxevents) { /* || timeout == 0 */
260
	ping_events(th);
261
    }
262
}
263

  
264
static void
265
epoll_ctl_or_raise(rb_thread_t *th, struct epw *epw)
266
{
267
    int e;
268
    int epfd;
269
    struct epoll_event ev;
270

  
271
    /* we cannot raise until list_add: */
272
    {
273
	struct rb_iom_fd *fdh = epw->as.pre_ctl.fdh;
274

  
275
	ev.data.ptr = fdh;
276
	ev.events = rb_events2ep(epw->events);
277
	/*
278
	 * merge events from other threads/fibers waiting on the same
279
	 * [ descriptor (int fd), description (struct file *) ] tuplet
280
	 */
281
	if (!list_empty(&fdh->fdhead)) { /* uncommon, I hope... */
282
	    struct epw *cur;
283
	    list_for_each(&fdh->fdhead, cur, as.fdnode) {
284
		ev.events |= rb_events2ep(cur->events);
285
	    }
286
	}
287
	list_add(&fdh->fdhead, &epw->as.fdnode);
288
    }
289

  
290
    epfd = iom_epfd(th->vm->iom); /* may raise */
291

  
292
    /* we want to track if an FD is already being watched ourselves */
293
    if (epw->flags) {
294
	if (*epw->flags & FMODE_IOM_ADDED) { /* ideal situation */
295
	    e = epoll_ctl(epfd, EPOLL_CTL_MOD, epw->fd, &ev);
296
	}
297
	else {
298
	    e = epoll_ctl(epfd, EPOLL_CTL_ADD, epw->fd, &ev);
299
	    if (e == 0) {
300
		*epw->flags |= FMODE_IOM_ADDED;
301
	    }
302
	    else if (e < 0 && errno == EEXIST) {
303
		/*
304
		 * possible EEXIST if several fptrs point to the same FD:
305
		 *   f1 = Fiber.start { io1.read(1) }
306
		 *   io2 = IO.for_fd(io1.fileno)
307
		 *   f2 = Fiber.start { io2.read(1) }
308
		 */
309
		*epw->flags |= FMODE_IOM_ADDED;
310
		e = epoll_ctl(epfd, EPOLL_CTL_MOD, epw->fd, &ev);
311
	    }
312
	}
313
    }
314
    else { /* don't know if added or not, fall back to add on ENOENT */
315
	e = epoll_ctl(epfd, EPOLL_CTL_MOD, epw->fd, &ev);
316
	if (e < 0 && errno == ENOENT) {
317
	    e = epoll_ctl(epfd, EPOLL_CTL_ADD, epw->fd, &ev);
318
	}
319
    }
320
    if (e < 0) {
321
	rb_sys_fail("epoll_ctl");
322
    }
323
}
324

  
325
static VALUE
326
epmod_yield(VALUE ptr)
327
{
328
    /* we must have no posibility of raising until list_add: */
329
    struct epw *epw = (struct epw *)ptr;
330
    rb_thread_t *th = epw->as.pre_ctl.th;
331
    epoll_ctl_or_raise(th, epw);
332
    ping_events(th);
333
    (void)rb_fiber_auto_do_yield_p(th);
334
    return rb_fiber_yield(0, 0);
335
}
336

  
337
static VALUE
338
epw_done(VALUE ptr)
339
{
340
    struct epw *epw = (struct epw *)ptr;
341
    list_del(&epw->as.fdnode);
342
    return rb_iom_waiter_done((VALUE)&epw->w);
343
}
344

  
345
static int
346
iom_waitfd(rb_thread_t *th, int fd, int *flags, int events, double *timeout)
347
{
348
    rb_iom_t *iom = rb_iom_get(th);
349
    struct epw epw;
350

  
351
    /* unlike kqueue or select, we never need to reread fd */
352
    epw.fd = fd;
353
    if (epw.fd < 0) { /* TODO: behave like poll(2) and sleep? */
354
	return 0;
355
    }
356

  
357
    /* may raise on OOM: */
358
    epw.as.pre_ctl.fdh = rb_iom_fd_get(&iom->fdmap, epw.fd);
359
    epw.as.pre_ctl.th = th;
360
    epw.flags = flags;
361
    /*
362
     * if we did not have GVL, revents may be set immediately
363
     * upon epoll_ctl by another thread running epoll_wait,
364
     * so we must initialize it before epoll_ctl:
365
     */
366
    epw.revents = 0;
367
    epw.events = (short)events;
368

  
369
    list_add(&iom->epws, &epw.w.wnode);
370
    rb_iom_timer_add(th, &epw.w.timer, timeout, IOM_FIB|IOM_WAIT);
371
    rb_ensure(epmod_yield, (VALUE)&epw, epw_done, (VALUE)&epw);
372

  
373
    return (int)epw.revents; /* may be zero if timed out */
374
}
375

  
376
int
377
rb_iom_waitio(rb_thread_t *th, rb_io_t *fptr, int events, double *timeout)
378
{
379
    return iom_waitfd(th, fptr->fd, &fptr->mode, events, timeout);
380
}
381

  
382
int
383
rb_iom_waitfd(rb_thread_t *th, int *fdp, int events, double *timeout)
384
{
385
    return iom_waitfd(th, *fdp, 0, events, timeout);
386
}
387

  
388
void
389
rb_iom_destroy(rb_vm_t *vm)
390
{
391
    rb_iom_t *iom = vm->iom;
392
    vm->iom = 0;
393
    if (iom) {
394
	/*
395
	 * it's possible; but crazy to share epoll FDs across processes
396
	 * (kqueue has a rather unique close-on-fork behavior)
397
	 */
398
	if (iom->epoll_fd >= 0) {
399
	    close(iom->epoll_fd);
400
	}
401
	rb_iom_fdmap_destroy(&iom->fdmap);
402
	xfree(iom);
403
    }
404
}
405

  
406
/* used by thread.c::rb_thread_atfork */
407
static void
408
rb_iom_atfork_child(rb_thread_t *th)
409
{
410
    rb_iom_destroy(th->vm);
411
}
412

  
413
/* used by thread_pthread.c */
414
static int
415
rb_iom_reserved_fd(int fd)
416
{
417
    rb_iom_t *iom = GET_VM()->iom;
418

  
419
    return iom && fd == iom->epoll_fd;
420
}
421

  
422
#include "iom_pingable_common.h"
423
#include "iom_common.h"
iom_internal.h
1
#ifndef RB_IOM_COMMON_H
2
#define RB_IOM_COMMON_H
3

  
4
#include "internal.h"
5
#include "iom.h"
6

  
7
/* cont.c */
8
void rb_fiber_auto_enqueue(VALUE fibval);
9

  
10
#define FMODE_IOM_PRIVATE1          0x01000000
11
#define FMODE_IOM_PRIVATE2          0x02000000
12

  
13
#define IOM_FIBMASK ((VALUE)0x1)
14
#define IOM_FIB     (0x2)
15
#define IOM_WAIT    (0x1) /* container_of(..., struct rb_iom_waiter, timer) */
16

  
17
/*
18
 * fdmap is a singleton.
19
 *
20
 * It makes zero sense to have multiple fdmaps per-process; even less so
21
 * than multiple ioms.  The file descriptor table in POSIX is per-process;
22
 * and POSIX requires syscalls to allocate the lowest available FD.
23
 * This is also why we use an array instead of a hash table, as there will
24
 * be no holes for big processes.
25
 *
26
 * If contention becomes a problem, we can pad (struct rb_iom_fd) to
27
 * 64-bytes for cache alignment.
28
 *
29
 * Currently we use fdmap to deal with FD aliasing with epoll
30
 * and kqueue interfaces..  FD aliasing happens when multiple
31
 * Fibers wait on the same FD; but epoll/kqueue APIs only allow
32
 * registering a single data pointer per FD.
33
 *
34
 * In the future, we may implement rb_notify_fd_close using fdmap.
35
 */
36

  
37
/* linear growth based on power-of-two */
38
#define RB_IOM_FD_PER_HEAP 64
39
/* on-heap and persistent for process lifetime keep as small as possible. */
40
struct rb_iom_fd {
41
    struct list_head fdhead; /* -kev.(rfdnode|wfdnode), epw.fdnode */
42
};
43

  
44
/* singleton (per-rb_iom_t, or per process, if we ever need > 1 iom) */
45
struct rb_iom_fdmap {
46
    struct rb_iom_fd **map;
47
    unsigned int heaps;
48
    int max_fd;
49
};
50

  
51
/* allocated on stack */
52
/* Every auto-yielded fiber has this on stack */
53
struct rb_iom_timer {
54
    union {
55
	struct list_node rnode; /* <=> rb_thread_t.afrunq */
56
	struct list_node tnode; /* <=> rb_iom_struct.timers */
57
    } n;
58
    double expires_at; /* absolute monotonic time */
59
    VALUE _fibval;
60
};
61

  
62
/* common waiter struct for waiting fds and pids */
63
struct rb_iom_waiter {
64
    struct rb_iom_timer timer;
65
    struct list_node wnode; /* <=> rb_iom_struct.(fds|pids) */
66
};
67

  
68
struct rb_iom_fd_waiter {
69
    struct rb_iom_waiter w; /* w.wnode - iom->fds */
70
    int *fdp; /* (ideally), a pointer fptr->fd to detect closed FDs */
71
    short events; /* requested events, like poll(2) */
72
    short revents; /* returned events, like poll(2) */
73
};
74

  
75
struct rb_iom_pid_waiter {
76
    struct rb_iom_waiter w; /* w.wnode - iom->pids */
77
    rb_thread_t *th;
78
    /* same pid, status, options same as waitpid(2) */
79
    rb_pid_t pid;
80
    int status;
81
    int options;
82
    int errnum;
83
};
84

  
85
/* threads sleeping in select, epoll_wait or kevent w/o GVL; on stack */
86
struct rb_iom_blocker {
87
    rb_thread_t *th;
88
    struct list_node bnode; /* -iom->blockers */
89
};
90

  
91
#if (RUBYVM_IOM == IOM_KQUEUE || RUBYVM_IOM == IOM_EPOLL)
92
/* TODO: IOM_SELECT may use this for rb_notify_fd_close */
93
static struct rb_iom_fd *
94
iom_fdhead_aref(struct rb_iom_fdmap *fdmap, int fd)
95
{
96
    VM_ASSERT(fd >= 0);
97
    return &fdmap->map[fd / RB_IOM_FD_PER_HEAP][fd % RB_IOM_FD_PER_HEAP];
98
}
99

  
100
static struct rb_iom_fd *
101
rb_iom_fd_get(struct rb_iom_fdmap *fdmap, int fd)
102
{
103
    if (fd >= fdmap->max_fd) {
104
	struct rb_iom_fd *base, *h;
105
	unsigned n = fdmap->heaps + 1;
106
	unsigned i;
107

  
108
	fdmap->map = xrealloc2(fdmap->map, n, sizeof(struct rb_iom_fd *));
109
	base = h = ALLOC_N(struct rb_iom_fd, RB_IOM_FD_PER_HEAP);
110
	for (i = 0; i < RB_IOM_FD_PER_HEAP; i++) {
111
	    list_head_init(&h->fdhead);
112
	    h++;
113
	}
114
	fdmap->map[fdmap->heaps] = base;
115
	fdmap->max_fd += RB_IOM_FD_PER_HEAP;
116
    }
117
    return iom_fdhead_aref(fdmap, fd);
118
}
119

  
120
static void
121
rb_iom_fdmap_init(struct rb_iom_fdmap *fdmap)
122
{
123
    fdmap->max_fd = 0;
124
    fdmap->heaps = 0;
125
    fdmap->map = 0;
126
}
127

  
128
static void
129
rb_iom_fdmap_destroy(struct rb_iom_fdmap *fdmap)
130
{
131
    unsigned n;
132

  
133
    for (n = 0; n < fdmap->heaps; n++) {
134
	xfree(fdmap->map[n]);
135
    }
136
    xfree(fdmap->map);
137
    rb_iom_fdmap_init(fdmap);
138
}
139
#endif /* (RUBYVM_IOM == IOM_KQUEUE || RUBYVM_IOM == IOM_EPOLL) */
140

  
141
static VALUE
142
rb_iom_timer_fibval(const struct rb_iom_timer *t)
143
{
144
    return t->_fibval & ~IOM_FIBMASK;
145
}
146

  
147
static struct rb_iom_waiter *
148
rb_iom_waiter_of(struct rb_iom_timer *t)
149
{
150
    if (t->_fibval & IOM_FIBMASK) {
151
	return 0;
152
    }
153
    return container_of(t, struct rb_iom_waiter, timer);
154
}
155

  
156
static double
157
rb_iom_next_timeout(struct list_head *timers)
158
{
159
    struct rb_iom_timer *t = list_top(timers, struct rb_iom_timer, n.tnode);
160

  
161
    if (t) {
162
	double diff = t->expires_at - timeofday();
163
	return diff <= 0.0 ? 0 : diff;
164
    }
165
    else {
166
	return -1;
167
    }
168
}
169

  
170
static void rb_iom_timer_check(const rb_thread_t *);
171
static void rb_iom_timer_add(rb_thread_t *, struct rb_iom_timer *,
172
			     const double *timeout, int flags);
173

  
174
static VALUE
175
rb_iom_timer_done(VALUE ptr)
176
{
177
    struct rb_iom_timer *t = (struct rb_iom_timer *)ptr;
178
    list_del(&t->n.tnode);
179
    return Qfalse;
180
}
181

  
182
static void
183
rb_iom_waiter_ready(struct rb_iom_waiter *w)
184
{
185
    VALUE fibval = rb_iom_timer_fibval(&w->timer);
186

  
187
    list_del_init(&w->wnode);
188
    list_del_init(&w->timer.n.tnode);
189
    if (fibval != Qfalse) {
190
	rb_thread_t *owner = rb_fiber_owner_thread(fibval);
191
	list_add_tail(&owner->afrunq, &w->timer.n.rnode);
192
    }
193
}
194

  
195
static VALUE
196
rb_iom_waiter_done(VALUE ptr)
197
{
198
    struct rb_iom_waiter *w = (struct rb_iom_waiter *)ptr;
199
    list_del(&w->timer.n.tnode);
200
    list_del(&w->wnode);
201
    return Qfalse;
202
}
203

  
204
/* cont.c */
205
int rb_fiber_resumable_p(const rb_thread_t *, const rb_fiber_t *);
206

  
207
/*
208
 * resume all "ready" fibers belonging to a given thread
209
 * stop when a fiber has not yielded, yet.
210
 */
211
static int
212
rb_fiber_auto_do_yield_p(rb_thread_t *th)
213
{
214
    rb_fiber_t *current_auto = rb_fiber_auto_sched_p(th) ? th->fiber : 0;
215
    struct rb_iom_timer *t = 0, *next = 0;
216
    LIST_HEAD(tmp);
217

  
218
    /*
219
     * do not infinite loop as new fibers get added to
220
     * th->afrunq, only work off a temporary list:
221
     */
222
    list_append_list(&tmp, &th->afrunq);
223
    list_for_each_safe(&tmp, t, next, n.rnode) {
224
	VALUE fibval = rb_iom_timer_fibval(t);
225
	rb_fiber_t *fib = RTYPEDDATA_DATA(fibval);
226

  
227
	if (fib == current_auto || !rb_fiber_resumable_p(th, fib)) {
228
	    /* tell the caller to yield */
229
	    list_prepend_list(&th->afrunq, &tmp);
230
	    return 1;
231
	}
232
	rb_fiber_resume(fibval, 0, 0);
233
    }
234
    return 0;
235
}
236

  
237
/* XXX: is this necessary? */
238
void
239
rb_iom_mark_runq(const rb_thread_t *th)
240
{
241
    struct rb_iom_timer *t = 0;
242

  
243
    list_for_each(&th->afrunq, t, n.rnode) {
244
	rb_gc_mark(rb_iom_timer_fibval(t));
245
    }
246
}
247

  
248
static rb_iom_t *rb_iom_get(rb_thread_t *);
249
static void rb_iom_blockers_notify(rb_iom_t *, int max);
250

  
251
#endif /* IOM_COMMON_H */
iom_kqueue.h
1
/*
2
 * kqueue-based implementation of I/O Manager for RubyVM on *BSD
3
 *
4
 * The kevent API has an advantage over epoll_ctl+epoll_wait since
5
 * it can simultaneously add filters and check for events with one
6
 * syscall.  It also allows EV_ADD to be used idempotently for
7
 * enabling filters, where as epoll_ctl requires separate ADD and
8
 * MOD operations.
9
 *
10
 * These are advantages in the common case...
11
 *
12
 * The epoll API has advantages in more esoteric cases:
13
 *
14
 *   epoll has the advantage over kqueue when watching for multiple
15
 *   events (POLLIN|POLLOUT|POLLPRI) (which is rare).  We also have
16
 *   to install two kevent filters to watch POLLIN|POLLOUT simutaneously.
17
 *   See udata_set/udata_get functions below for more on this.
18
 *
19
 *   Finally, kevent does not support POLLPRI directly, we need to use
20
 *   select() (or perhaps poll() on some platforms) with a zero
21
 *   timeout to check for POLLPRI after EVFILT_READ returns.
22
 *
23
 * Finally, several *BSDs implement kqueue; and the quality of each
24
 * implementation may vary.  Anecdotally, *BSDs are not known to even
25
 * support poll() consistently across different types of files.
26
 * We will need to selective and careful about injecting them into
27
 * kevent().
28
 */
29
#include "iom_internal.h"
30

  
31
/* LIST_HEAD (via ccan/list) conflicts with sys/queue.h (via sys/event.h) */
32
#undef LIST_HEAD
33
#include <sys/types.h>
34
#include <sys/event.h>
35
#include <sys/time.h>
36

  
37
/* We need to use EVFILT_READ to watch RB_WAITFD_PRI */
38
#define WAITFD_READ (RB_WAITFD_IN|RB_WAITFD_PRI)
39

  
40
/* allocated on heap (rb_vm_t.iom) */
41
struct rb_iom_struct {
42
    /*
43
     * Everything here is protected by GVL at this time,
44
     * URCU lists (LGPL-2.1+) may be used in the future
45
     */
46

  
47
    /* we NEVER need to scan kevs , only insert + delete + empty check */
48
    struct list_head kevs; /* -kev.fdw.w.wnode, order agnostic */
49

  
50
    struct list_head timers; /* -rb_iom_timer.n.tnode, sort by expire_at */
51
    struct list_head pids; /* -rb_iom_pid_waiter.w.wnode, LIFO order */
52
    struct rb_iom_fdmap rfdmap; /* holds fdh for EVFILT_READ */
53
    struct rb_iom_fdmap wfdmap; /* holds fdh for EVFILT_WRITE */
54

  
55
    int kqueue_fd;
56
    int nevents; /* auto-increases */
57
    struct list_head blockers; /* -rb_iom_blocker.bnode */
58
};
59

  
60
/* allocated on stack */
61
struct kev {
62
    /* fdw.w.wnode is overloaded for checking RB_WAITFD_PRI (seee check_pri) */
63
    struct rb_iom_fd_waiter fdw;
64
    rb_thread_t *th;
65

  
66
    /*
67
     * both rfdnode and wfdnode are overloaded for deleting paired
68
     * filters when watching both EVFILT_READ and EVFILT_WRITE on
69
     * a single FD
70
     */
71
    struct list_node rfdnode; /* -(ev.udata==fdh)->fdhead (EVFILT_READ) */
72
    struct list_node wfdnode; /* -(ev.udata==fdh)->fdhead (EVFILT_WRITE) */
73
};
74

  
75
/*
76
 * like our epoll implementation, we "ping" using kevent with zero-timeout
77
 * and can do so on any thread.
78
 */
... This diff was truncated because it exceeds the maximum size that can be displayed.