Feature #13697 ยป 0001-thread-futex-based-thread-primitives-another-take.patch
common.mk | ||
---|---|---|
thread.$(OBJEXT): {$(VPATH)}thread.h
|
||
thread.$(OBJEXT): {$(VPATH)}thread_$(THREAD_MODEL).c
|
||
thread.$(OBJEXT): {$(VPATH)}thread_$(THREAD_MODEL).h
|
||
thread.$(OBJEXT): {$(VPATH)}thread_futex.h
|
||
thread.$(OBJEXT): {$(VPATH)}thread_native.h
|
||
thread.$(OBJEXT): {$(VPATH)}thread_sync.c
|
||
thread.$(OBJEXT): {$(VPATH)}timev.h
|
configure.in | ||
---|---|---|
AC_CHECK_HEADERS(intrinsics.h)
|
||
AC_CHECK_HEADERS(langinfo.h)
|
||
AC_CHECK_HEADERS(limits.h)
|
||
AC_CHECK_HEADERS(linux/futex.h)
|
||
AC_CHECK_HEADERS(locale.h)
|
||
AC_CHECK_HEADERS(malloc.h)
|
||
AC_CHECK_HEADERS(malloc/malloc.h)
|
include/ruby/thread_native.h | ||
---|---|---|
#elif defined(HAVE_PTHREAD_H)
|
||
#include <pthread.h>
|
||
# if defined(HAVE_LINUX_FUTEX_H) && defined(HAVE_SYS_SYSCALL_H)
|
||
# define RB_PTHREAD_USE_FUTEX (1)
|
||
# else
|
||
# define RB_PTHREAD_USE_FUTEX (0)
|
||
# endif
|
||
typedef pthread_t rb_nativethread_id_t;
|
||
# if RB_PTHREAD_USE_FUTEX
|
||
typedef struct rb_futex_lock_struct {
|
||
int ftx;
|
||
} rb_nativethread_lock_t;
|
||
# else
|
||
typedef pthread_mutex_t rb_nativethread_lock_t;
|
||
# endif
|
||
#else
|
||
#error "unsupported thread type"
|
thread_futex.h | ||
---|---|---|
/*
|
||
* futex-based locks and condvars for Linux 2.6 and later
|
||
*/
|
||
#ifndef RB_THREAD_FUTEX_H
|
||
#define RB_THREAD_FUTEX_H
|
||
#include "ruby_atomic.h"
|
||
#include <linux/futex.h>
|
||
#include <sys/syscall.h>
|
||
#include <sys/time.h>
|
||
#include <limits.h>
|
||
/*
|
||
* errnos are positive in Linux, this allows us to use negative
|
||
* errnos (e.g. -EAGAIN) as return values.
|
||
*/
|
||
STATIC_ASSERT(eagain_positive, EAGAIN > 0);
|
||
STATIC_ASSERT(etimedout_positive, ETIMEDOUT > 0);
|
||
STATIC_ASSERT(eintr_positive, EINTR > 0);
|
||
#define KVER(a,b,c,d) ((a << 24) | (b << 16) | (c << 8) | d)
|
||
#ifndef MINIMUM_KVER
|
||
# define MINIMUM_KVER KVER(2,6,18,0) /* 2.6.18 for RHEL/CentOS 5.x */
|
||
#endif
|
||
#undef RB_FTX_PRIVATE
|
||
/* optimization for Linux 2.6.22+ */
|
||
#if defined(FUTEX_PRIVATE_FLAG)
|
||
# if MINIMUM_KVER >= KVER(2, 6, 22, 0)
|
||
# define RB_FTX_PRIVATE FUTEX_PRIVATE_FLAG
|
||
# else
|
||
static int RB_FTX_PRIVATE = FUTEX_PRIVATE_FLAG;
|
||
# endif
|
||
#else /* no FUTEX_PRIVATE_FLAG in headers, assume kernel is old, too */
|
||
static int RB_FTX_PRIVATE;
|
||
#endif
|
||
/* prevent the compiler from reordering access */
|
||
#define RB_ACCESS_ONCE(type,x) (*((volatile type *)&(x)))
|
||
static int
|
||
rb_futex(int *uaddr, int futex_op, int val,
|
||
const struct timespec *timeout, /* or uint32_t val2 */
|
||
int *uaddr2, int val3)
|
||
{
|
||
return syscall(__NR_futex, uaddr, futex_op, val, timeout, uaddr2, val3);
|
||
}
|
||
static void
|
||
native_thread_sysinit(void)
|
||
{
|
||
#ifndef RB_FTX_PRIVATE /* macro if MINIMUM_KVER >= 2.6.22 */
|
||
/*
|
||
* Linux <2.6.22 does not support the FUTEX_PRIVATE_FLAG,
|
||
* however Ruby may be compiled with newer glibc headers or
|
||
* newer system entirely and then run on an old kernel,
|
||
* so we must detect at startup
|
||
*/
|
||
int op = FUTEX_WAKE | RB_FTX_PRIVATE;
|
||
int val = 1;
|
||
int rc = rb_futex(&val, op, 1, 0, 0, 0);
|
||
if (rc < 0) {
|
||
RB_FTX_PRIVATE = 0;
|
||
}
|
||
#endif
|
||
}
|
||
static int
|
||
futex_wait(int *addr, int val, const struct timespec *ts)
|
||
{
|
||
int op = FUTEX_WAIT | RB_FTX_PRIVATE;
|
||
int rc = rb_futex(addr, op, val, ts, 0, 0);
|
||
if (rc == 0) return 0; /* successfully woken */
|
||
rc = errno;
|
||
if (rc != EAGAIN && rc != ETIMEDOUT && rc != EINTR) {
|
||
rb_bug_errno("FUTEX_WAIT", rc);
|
||
}
|
||
return -rc;
|
||
}
|
||
/* returns number of threads woken */
|
||
static int
|
||
futex_wake(int *addr, int nwake)
|
||
{
|
||
int op = FUTEX_WAKE | RB_FTX_PRIVATE;
|
||
int rc = rb_futex(addr, op, nwake, 0, 0, 0);
|
||
if (rc >= 0) return rc;
|
||
rb_bug_errno("FUTEX_WAKE", errno);
|
||
}
|
||
static void
|
||
native_mutex_lock_contended(rb_nativethread_lock_t *lock)
|
||
{
|
||
/* tell waiters we are contended */
|
||
while (ATOMIC_EXCHANGE(lock->ftx, 2)) {
|
||
futex_wait(&lock->ftx, 2, 0);
|
||
}
|
||
}
|
||
static void
|
||
native_mutex_lock(rb_nativethread_lock_t *lock)
|
||
{
|
||
int old = ATOMIC_CAS(lock->ftx, 0, 1);
|
||
if (LIKELY(old == 0)) return; /* uncontended fast path */
|
||
native_mutex_lock_contended(lock);
|
||
}
|
||
static void
|
||
native_mutex_unlock(rb_nativethread_lock_t *lock)
|
||
{
|
||
int old = ATOMIC_EXCHANGE(lock->ftx, 0);
|
||
if (LIKELY(old == 1)) return; /* uncontended fast path */
|
||
VM_ASSERT(old != 0);
|
||
futex_wake(&lock->ftx, 1);
|
||
}
|
||
static int
|
||
native_mutex_trylock(rb_nativethread_lock_t *lock)
|
||
{
|
||
int old = ATOMIC_CAS(lock->ftx, 0, 1);
|
||
return old ? EBUSY : 0;
|
||
}
|
||
static void
|
||
native_mutex_initialize(rb_nativethread_lock_t *lock)
|
||
{
|
||
lock->ftx = 0;
|
||
}
|
||
static void
|
||
native_mutex_destroy(rb_nativethread_lock_t *lock)
|
||
{
|
||
if (lock->ftx) rb_bug_errno("mutex destroy busy", EBUSY);
|
||
native_mutex_initialize(lock);
|
||
}
|
||
static void
|
||
native_cond_initialize(rb_nativethread_cond_t *cond, int flags)
|
||
{
|
||
cond->lock = 0;
|
||
cond->seq = 0;
|
||
cond->clockid = CLOCK_REALTIME;
|
||
if (flags & RB_CONDATTR_CLOCK_MONOTONIC) {
|
||
cond->clockid = CLOCK_MONOTONIC;
|
||
}
|
||
}
|
||
static void
|
||
native_cond_destroy(rb_nativethread_cond_t *cond)
|
||
{
|
||
native_cond_initialize(cond, 0);
|
||
}
|
||
/*
|
||
* n.b.: native_cond_signal/native_cond_broadcoast do not check for the
|
||
* existence of a current waiters. For the Ruby mutex part, we rely on
|
||
* the rb_mutex_t->cond_waiting counter to do that.
|
||
*/
|
||
static void
|
||
native_cond_signal(rb_nativethread_cond_t *cond)
|
||
{
|
||
ATOMIC_INC(cond->seq);
|
||
futex_wake(&cond->seq, 1);
|
||
}
|
||
#if 0
|
||
static void
|
||
native_cond_broadcast(rb_nativethread_cond_t *cond)
|
||
{
|
||
/* we may use FUTEX_CMP_REQUEUE if we ever use this */
|
||
ATOMIC_INC(cond->seq);
|
||
futex_wake(&cond->seq, INT_MAX);
|
||
}
|
||
#endif
|
||
static void
|
||
native_cond_gettime(rb_nativethread_cond_t *cond, struct timespec *ts)
|
||
{
|
||
int rc = clock_gettime(cond->clockid, ts);
|
||
if (rc != 0) {
|
||
rb_bug_errno("clock_gettime()", errno);
|
||
}
|
||
}
|
||
/*
|
||
* returns 0 if `end' is in the future
|
||
* returns 1 if end is past, only triggered on EINTR
|
||
*/
|
||
static int
|
||
ts_expired_p(rb_nativethread_cond_t *cond, struct timespec *end,
|
||
struct timespec *rel)
|
||
{
|
||
struct timespec now;
|
||
native_cond_gettime(cond, &now);
|
||
rel->tv_sec = end->tv_sec - now.tv_sec;
|
||
rel->tv_nsec = end->tv_nsec - now.tv_nsec;
|
||
if (rel->tv_nsec < 0) {
|
||
rel->tv_nsec += 1000000000;
|
||
rel->tv_sec--;
|
||
}
|
||
if (rel->tv_sec < 0) {
|
||
return 1;
|
||
}
|
||
return 0;
|
||
}
|
||
static int
|
||
native_cond_timedwait(rb_nativethread_cond_t *cond,
|
||
rb_nativethread_lock_t *lock, const struct timespec *ts)
|
||
{
|
||
int val;
|
||
int rc;
|
||
struct timespec *end;
|
||
struct timespec tend;
|
||
struct timespec rel;
|
||
/*
|
||
* native_cond_timedwait is normally a wrapper around
|
||
* pthread_cond_timedwait (which takes an absolute timestamp).
|
||
* FUTEX_WAIT takes a relative timestamp, so we need to convert
|
||
* from absolute to relative.
|
||
*/
|
||
if (ts) { /* in case of EINTR */
|
||
native_cond_gettime(cond, &tend);
|
||
tend.tv_nsec += ts->tv_nsec;
|
||
tend.tv_sec += ts->tv_sec;
|
||
if (tend.tv_nsec >= 1000000000) {
|
||
tend.tv_nsec -= 1000000000;
|
||
tend.tv_sec++;
|
||
}
|
||
end = &tend;
|
||
rel = *ts;
|
||
ts = &rel;
|
||
}
|
||
if (cond->lock != lock) {
|
||
if (cond->lock) {
|
||
rb_bug_errno("futex native_cond_timedwait owner mismatch", EINVAL);
|
||
}
|
||
if (ATOMIC_PTR_EXCHANGE(cond->lock, lock) != 0) {
|
||
/* condvar is broken at this point */
|
||
rb_bug_errno("futex native_cond_timedwait owner race", EINVAL);
|
||
}
|
||
}
|
||
val = cond->seq;
|
||
native_mutex_unlock(lock);
|
||
do {
|
||
rc = futex_wait(&cond->seq, val, ts);
|
||
} while ((cond->seq == val) && (!ts || !ts_expired_p(cond, end, &rel)));
|
||
/*
|
||
* lock the mutex assuming contention, since the cond signal/broadcast
|
||
* thread currently has the futex
|
||
*/
|
||
native_mutex_lock_contended(lock);
|
||
/* we can return ETIMEDOUT like pthread_cond_timedwait does */
|
||
return rc == -ETIMEDOUT ? ETIMEDOUT : 0;
|
||
}
|
||
static void
|
||
native_cond_wait(rb_nativethread_cond_t *cond, rb_nativethread_lock_t *lock)
|
||
{
|
||
(void)native_cond_timedwait(cond, lock, 0);
|
||
}
|
||
static struct timespec
|
||
native_cond_timeout(rb_nativethread_cond_t *cond, struct timespec timeout_rel)
|
||
{
|
||
/* XXX: this is dirty and hides bugs in our callers */
|
||
if (timeout_rel.tv_nsec >= 1000000000) {
|
||
timeout_rel.tv_nsec -= 1000000000;
|
||
timeout_rel.tv_sec++;
|
||
}
|
||
return timeout_rel; /* Futexes use relative timeouts internally */
|
||
}
|
||
static int
|
||
gvl_waiting_p(const rb_global_vm_lock_t *gvl)
|
||
{
|
||
int nr = RB_ACCESS_ONCE(int, gvl->ftx);
|
||
return nr > 1;
|
||
}
|
||
static void
|
||
gvl_acquire_contended(rb_vm_t *vm, rb_thread_t *th)
|
||
{
|
||
while (ATOMIC_EXCHANGE(vm->gvl.ftx, 2)) {
|
||
/*
|
||
* reduce wakeups by only signaling the timer thread
|
||
* for the first waiter we get
|
||
*/
|
||
if (ATOMIC_INC(vm->gvl.waiting) == 0) {
|
||
rb_thread_wakeup_timer_thread_low();
|
||
}
|
||
futex_wait(&vm->gvl.ftx, 2, NULL);
|
||
ATOMIC_DEC(vm->gvl.waiting);
|
||
}
|
||
if (vm->gvl.need_yield) {
|
||
vm->gvl.need_yield = 0;
|
||
futex_wake(&vm->gvl.need_yield, 1);
|
||
}
|
||
}
|
||
static void
|
||
gvl_acquire(rb_vm_t *vm, rb_thread_t *th)
|
||
{
|
||
if (ATOMIC_CAS(vm->gvl.ftx, 0, 1)) {
|
||
gvl_acquire_contended(vm, th);
|
||
}
|
||
}
|
||
/* returns 1 if another thread is woken */
|
||
static int
|
||
gvl_release(rb_vm_t *vm)
|
||
{
|
||
int n = ATOMIC_EXCHANGE(vm->gvl.ftx, 0);
|
||
if (n == 1) return 0; /* fast path, no waiters */
|
||
assert(n != 0);
|
||
return futex_wake(&vm->gvl.ftx, 1);
|
||
}
|
||
static void
|
||
gvl_yield(rb_vm_t *vm, rb_thread_t *th)
|
||
{
|
||
int yielding = ATOMIC_EXCHANGE(vm->gvl.wait_yield, 1);
|
||
if (yielding) { /* another thread is inside gvl_yield */
|
||
if (gvl_release(vm)) {
|
||
sched_yield();
|
||
}
|
||
else {
|
||
futex_wait(&vm->gvl.wait_yield, 1, NULL);
|
||
}
|
||
gvl_acquire_contended(vm, th);
|
||
}
|
||
else {
|
||
int maybe_more = 0;
|
||
/*
|
||
* no need for atomic while holding GVL, gvl_acquire_contended
|
||
* cannot see need_yield change until we call gvl_release()
|
||
*/
|
||
vm->gvl.need_yield = 1;
|
||
if (gvl_release(vm)) {
|
||
/*
|
||
* we had active waiters, wait for them to wake us in
|
||
* gvl_acquire_contended
|
||
*/
|
||
do {
|
||
futex_wait(&vm->gvl.need_yield, 1, NULL);
|
||
} while (vm->gvl.need_yield);
|
||
ATOMIC_EXCHANGE(vm->gvl.wait_yield, 0);
|
||
/*
|
||
* we've waited ourselves, and have no idea who else is inside
|
||
* this function, so we must assume they exist and only wake up
|
||
* the rest AFTER we have the GVL
|
||
*/
|
||
maybe_more = 1;
|
||
}
|
||
else { /* nobody woken up when we released GVL */
|
||
/* we have no GVL, and no idea how be fair, here... */
|
||
vm->gvl.need_yield = 0;
|
||
ATOMIC_EXCHANGE(vm->gvl.wait_yield, 0);
|
||
/* first, try to avoid a thundering herd w/o GVL */
|
||
if (futex_wake(&vm->gvl.wait_yield, 1) > 0) {
|
||
maybe_more = 1;
|
||
}
|
||
/* either way, hope other threads get some work done... */
|
||
sched_yield();
|
||
}
|
||
if (maybe_more) {
|
||
gvl_acquire_contended(vm, th);
|
||
/*
|
||
* wake up the thundering herd now that we have
|
||
* the GVL. We cannot us FUTEX_REQUEUE here to
|
||
* migrate from gvl.wait_yield => gvl.ftx, since
|
||
* the gvl.waiting counter must be kept up-to-date,
|
||
* Maintaining correct gvl.waiting count here will
|
||
* be complex (or even impossible) to maintain for
|
||
* threads waiting on gvl.wait_yield.
|
||
*/
|
||
futex_wake(&vm->gvl.wait_yield, INT_MAX);
|
||
}
|
||
else {
|
||
gvl_acquire(vm, th);
|
||
}
|
||
}
|
||
}
|
||
static void
|
||
gvl_init(rb_vm_t *vm)
|
||
{
|
||
memset(&vm->gvl, 0, sizeof(rb_global_vm_lock_t));
|
||
}
|
||
RUBY_ALIAS_FUNCTION(gvl_destroy(rb_vm_t *vm), gvl_init, (vm));
|
||
#endif /* RB_THREAD_FUTEX_H */
|
thread_pthread.c | ||
---|---|---|
static void native_mutex_initialize(rb_nativethread_lock_t *lock);
|
||
static void native_mutex_destroy(rb_nativethread_lock_t *lock);
|
||
static void native_cond_signal(rb_nativethread_cond_t *cond);
|
||
#if !RB_PTHREAD_USE_FUTEX
|
||
static void native_cond_broadcast(rb_nativethread_cond_t *cond);
|
||
#endif
|
||
static void native_cond_wait(rb_nativethread_cond_t *cond, rb_nativethread_lock_t *mutex);
|
||
static void native_cond_initialize(rb_nativethread_cond_t *cond, int flags);
|
||
static void native_cond_destroy(rb_nativethread_cond_t *cond);
|
||
... | ... | |
# define USE_SLEEPY_TIMER_THREAD 0
|
||
#endif
|
||
#if RB_PTHREAD_USE_FUTEX
|
||
# include "thread_futex.h"
|
||
#else /* RB_PTHREAD_USE_FUTEX == 0 */
|
||
# if USE_SLEEPY_TIMER_THREAD
|
||
static int
|
||
gvl_waiting_p(const rb_global_vm_lock_t *gvl)
|
||
{
|
||
return gvl->waiting > 0;
|
||
}
|
||
# endif
|
||
static void
|
||
gvl_acquire_common(rb_vm_t *vm)
|
||
{
|
||
... | ... | |
native_mutex_destroy(&vm->gvl.lock);
|
||
}
|
||
#endif /* !RB_PTHREAD_USE_FUTEX */
|
||
#if defined(HAVE_WORKING_FORK)
|
||
static void
|
||
gvl_atfork(rb_vm_t *vm)
|
||
... | ... | |
#define NATIVE_MUTEX_LOCK_DEBUG 0
|
||
#if !RB_PTHREAD_USE_FUTEX
|
||
static inline void native_thread_sysinit(void) { /* no-op */ }
|
||
static void
|
||
mutex_debug(const char *msg, void *lock)
|
||
{
|
||
... | ... | |
return timeout;
|
||
}
|
||
#endif /* !RB_PTHREAD_USE_FUTEX */
|
||
#define native_cleanup_push pthread_cleanup_push
|
||
#define native_cleanup_pop pthread_cleanup_pop
|
||
... | ... | |
{
|
||
rb_thread_t *th = GET_THREAD();
|
||
native_thread_sysinit();
|
||
pthread_key_create(&ruby_native_thread_key, NULL);
|
||
th->thread_id = pthread_self();
|
||
fill_thread_id_str(th);
|
||
... | ... | |
need_polling = !ubf_threads_empty();
|
||
if (gvl->waiting > 0 || need_polling) {
|
||
if (gvl_waiting_p(gvl) || need_polling) {
|
||
/* polling (TIME_QUANTUM_USEC usec) */
|
||
result = poll(pollfds, 1, TIME_QUANTUM_USEC/1000);
|
||
}
|
thread_pthread.h | ||
---|---|---|
#include <pthread_np.h>
|
||
#endif
|
||
#define RB_NATIVETHREAD_LOCK_INIT PTHREAD_MUTEX_INITIALIZER
|
||
#define RB_NATIVETHREAD_COND_INIT { PTHREAD_COND_INITIALIZER, }
|
||
#if RB_PTHREAD_USE_FUTEX /* defined in include/ruby/thread_native.h */
|
||
# define RB_NATIVETHREAD_LOCK_INIT { 0 }
|
||
# define RB_NATIVETHREAD_COND_INIT { 0, 0, }
|
||
#else /* really using pthreads */
|
||
# define RB_NATIVETHREAD_LOCK_INIT PTHREAD_MUTEX_INITIALIZER
|
||
# define RB_NATIVETHREAD_COND_INIT { PTHREAD_COND_INITIALIZER, }
|
||
#endif
|
||
typedef struct rb_thread_cond_struct {
|
||
#if RB_PTHREAD_USE_FUTEX
|
||
rb_nativethread_lock_t *lock;
|
||
int seq;
|
||
#else
|
||
pthread_cond_t cond;
|
||
#endif
|
||
#ifdef HAVE_CLOCKID_T
|
||
clockid_t clockid;
|
||
#endif
|
||
... | ... | |
#undef finally
|
||
typedef struct rb_global_vm_lock_struct {
|
||
#if RB_PTHREAD_USE_FUTEX
|
||
int ftx;
|
||
int waiting;
|
||
#else /* RB_PTHREAD_USE_FUTEX == 0 */
|
||
/* fast path */
|
||
unsigned long acquired;
|
||
rb_nativethread_lock_t lock;
|
||
... | ... | |
/* yield */
|
||
rb_nativethread_cond_t switch_cond;
|
||
rb_nativethread_cond_t switch_wait_cond;
|
||
#endif /* RB_PTHREAD_USE_FUTEX == 0 */
|
||
int need_yield;
|
||
int wait_yield;
|
||
} rb_global_vm_lock_t;
|
||
-
|