Project

General

Profile

Feature #13697 ยป 0001-thread-futex-based-thread-primitives-another-take.patch

normalperson (Eric Wong), 06/29/2017 03:26 AM

View differences:

common.mk
thread.$(OBJEXT): {$(VPATH)}thread.h
thread.$(OBJEXT): {$(VPATH)}thread_$(THREAD_MODEL).c
thread.$(OBJEXT): {$(VPATH)}thread_$(THREAD_MODEL).h
thread.$(OBJEXT): {$(VPATH)}thread_futex.h
thread.$(OBJEXT): {$(VPATH)}thread_native.h
thread.$(OBJEXT): {$(VPATH)}thread_sync.c
thread.$(OBJEXT): {$(VPATH)}timev.h
configure.in
AC_CHECK_HEADERS(intrinsics.h)
AC_CHECK_HEADERS(langinfo.h)
AC_CHECK_HEADERS(limits.h)
AC_CHECK_HEADERS(linux/futex.h)
AC_CHECK_HEADERS(locale.h)
AC_CHECK_HEADERS(malloc.h)
AC_CHECK_HEADERS(malloc/malloc.h)
include/ruby/thread_native.h
#elif defined(HAVE_PTHREAD_H)
#include <pthread.h>
# if defined(HAVE_LINUX_FUTEX_H) && defined(HAVE_SYS_SYSCALL_H)
# define RB_PTHREAD_USE_FUTEX (1)
# else
# define RB_PTHREAD_USE_FUTEX (0)
# endif
typedef pthread_t rb_nativethread_id_t;
# if RB_PTHREAD_USE_FUTEX
typedef struct rb_futex_lock_struct {
int ftx;
} rb_nativethread_lock_t;
# else
typedef pthread_mutex_t rb_nativethread_lock_t;
# endif
#else
#error "unsupported thread type"
thread_futex.h
/*
* futex-based locks and condvars for Linux 2.6 and later
*/
#ifndef RB_THREAD_FUTEX_H
#define RB_THREAD_FUTEX_H
#include "ruby_atomic.h"
#include <linux/futex.h>
#include <sys/syscall.h>
#include <sys/time.h>
#include <limits.h>
/*
* errnos are positive in Linux, this allows us to use negative
* errnos (e.g. -EAGAIN) as return values.
*/
STATIC_ASSERT(eagain_positive, EAGAIN > 0);
STATIC_ASSERT(etimedout_positive, ETIMEDOUT > 0);
STATIC_ASSERT(eintr_positive, EINTR > 0);
#define KVER(a,b,c,d) ((a << 24) | (b << 16) | (c << 8) | d)
#ifndef MINIMUM_KVER
# define MINIMUM_KVER KVER(2,6,18,0) /* 2.6.18 for RHEL/CentOS 5.x */
#endif
#undef RB_FTX_PRIVATE
/* optimization for Linux 2.6.22+ */
#if defined(FUTEX_PRIVATE_FLAG)
# if MINIMUM_KVER >= KVER(2, 6, 22, 0)
# define RB_FTX_PRIVATE FUTEX_PRIVATE_FLAG
# else
static int RB_FTX_PRIVATE = FUTEX_PRIVATE_FLAG;
# endif
#else /* no FUTEX_PRIVATE_FLAG in headers, assume kernel is old, too */
static int RB_FTX_PRIVATE;
#endif
/* prevent the compiler from reordering access */
#define RB_ACCESS_ONCE(type,x) (*((volatile type *)&(x)))
static int
rb_futex(int *uaddr, int futex_op, int val,
const struct timespec *timeout, /* or uint32_t val2 */
int *uaddr2, int val3)
{
return syscall(__NR_futex, uaddr, futex_op, val, timeout, uaddr2, val3);
}
static void
native_thread_sysinit(void)
{
#ifndef RB_FTX_PRIVATE /* macro if MINIMUM_KVER >= 2.6.22 */
/*
* Linux <2.6.22 does not support the FUTEX_PRIVATE_FLAG,
* however Ruby may be compiled with newer glibc headers or
* newer system entirely and then run on an old kernel,
* so we must detect at startup
*/
int op = FUTEX_WAKE | RB_FTX_PRIVATE;
int val = 1;
int rc = rb_futex(&val, op, 1, 0, 0, 0);
if (rc < 0) {
RB_FTX_PRIVATE = 0;
}
#endif
}
static int
futex_wait(int *addr, int val, const struct timespec *ts)
{
int op = FUTEX_WAIT | RB_FTX_PRIVATE;
int rc = rb_futex(addr, op, val, ts, 0, 0);
if (rc == 0) return 0; /* successfully woken */
rc = errno;
if (rc != EAGAIN && rc != ETIMEDOUT && rc != EINTR) {
rb_bug_errno("FUTEX_WAIT", rc);
}
return -rc;
}
/* returns number of threads woken */
static int
futex_wake(int *addr, int nwake)
{
int op = FUTEX_WAKE | RB_FTX_PRIVATE;
int rc = rb_futex(addr, op, nwake, 0, 0, 0);
if (rc >= 0) return rc;
rb_bug_errno("FUTEX_WAKE", errno);
}
static void
native_mutex_lock_contended(rb_nativethread_lock_t *lock)
{
/* tell waiters we are contended */
while (ATOMIC_EXCHANGE(lock->ftx, 2)) {
futex_wait(&lock->ftx, 2, 0);
}
}
static void
native_mutex_lock(rb_nativethread_lock_t *lock)
{
int old = ATOMIC_CAS(lock->ftx, 0, 1);
if (LIKELY(old == 0)) return; /* uncontended fast path */
native_mutex_lock_contended(lock);
}
static void
native_mutex_unlock(rb_nativethread_lock_t *lock)
{
int old = ATOMIC_EXCHANGE(lock->ftx, 0);
if (LIKELY(old == 1)) return; /* uncontended fast path */
VM_ASSERT(old != 0);
futex_wake(&lock->ftx, 1);
}
static int
native_mutex_trylock(rb_nativethread_lock_t *lock)
{
int old = ATOMIC_CAS(lock->ftx, 0, 1);
return old ? EBUSY : 0;
}
static void
native_mutex_initialize(rb_nativethread_lock_t *lock)
{
lock->ftx = 0;
}
static void
native_mutex_destroy(rb_nativethread_lock_t *lock)
{
if (lock->ftx) rb_bug_errno("mutex destroy busy", EBUSY);
native_mutex_initialize(lock);
}
static void
native_cond_initialize(rb_nativethread_cond_t *cond, int flags)
{
cond->lock = 0;
cond->seq = 0;
cond->clockid = CLOCK_REALTIME;
if (flags & RB_CONDATTR_CLOCK_MONOTONIC) {
cond->clockid = CLOCK_MONOTONIC;
}
}
static void
native_cond_destroy(rb_nativethread_cond_t *cond)
{
native_cond_initialize(cond, 0);
}
/*
* n.b.: native_cond_signal/native_cond_broadcoast do not check for the
* existence of a current waiters. For the Ruby mutex part, we rely on
* the rb_mutex_t->cond_waiting counter to do that.
*/
static void
native_cond_signal(rb_nativethread_cond_t *cond)
{
ATOMIC_INC(cond->seq);
futex_wake(&cond->seq, 1);
}
#if 0
static void
native_cond_broadcast(rb_nativethread_cond_t *cond)
{
/* we may use FUTEX_CMP_REQUEUE if we ever use this */
ATOMIC_INC(cond->seq);
futex_wake(&cond->seq, INT_MAX);
}
#endif
static void
native_cond_gettime(rb_nativethread_cond_t *cond, struct timespec *ts)
{
int rc = clock_gettime(cond->clockid, ts);
if (rc != 0) {
rb_bug_errno("clock_gettime()", errno);
}
}
/*
* returns 0 if `end' is in the future
* returns 1 if end is past, only triggered on EINTR
*/
static int
ts_expired_p(rb_nativethread_cond_t *cond, struct timespec *end,
struct timespec *rel)
{
struct timespec now;
native_cond_gettime(cond, &now);
rel->tv_sec = end->tv_sec - now.tv_sec;
rel->tv_nsec = end->tv_nsec - now.tv_nsec;
if (rel->tv_nsec < 0) {
rel->tv_nsec += 1000000000;
rel->tv_sec--;
}
if (rel->tv_sec < 0) {
return 1;
}
return 0;
}
static int
native_cond_timedwait(rb_nativethread_cond_t *cond,
rb_nativethread_lock_t *lock, const struct timespec *ts)
{
int val;
int rc;
struct timespec *end;
struct timespec tend;
struct timespec rel;
/*
* native_cond_timedwait is normally a wrapper around
* pthread_cond_timedwait (which takes an absolute timestamp).
* FUTEX_WAIT takes a relative timestamp, so we need to convert
* from absolute to relative.
*/
if (ts) { /* in case of EINTR */
native_cond_gettime(cond, &tend);
tend.tv_nsec += ts->tv_nsec;
tend.tv_sec += ts->tv_sec;
if (tend.tv_nsec >= 1000000000) {
tend.tv_nsec -= 1000000000;
tend.tv_sec++;
}
end = &tend;
rel = *ts;
ts = &rel;
}
if (cond->lock != lock) {
if (cond->lock) {
rb_bug_errno("futex native_cond_timedwait owner mismatch", EINVAL);
}
if (ATOMIC_PTR_EXCHANGE(cond->lock, lock) != 0) {
/* condvar is broken at this point */
rb_bug_errno("futex native_cond_timedwait owner race", EINVAL);
}
}
val = cond->seq;
native_mutex_unlock(lock);
do {
rc = futex_wait(&cond->seq, val, ts);
} while ((cond->seq == val) && (!ts || !ts_expired_p(cond, end, &rel)));
/*
* lock the mutex assuming contention, since the cond signal/broadcast
* thread currently has the futex
*/
native_mutex_lock_contended(lock);
/* we can return ETIMEDOUT like pthread_cond_timedwait does */
return rc == -ETIMEDOUT ? ETIMEDOUT : 0;
}
static void
native_cond_wait(rb_nativethread_cond_t *cond, rb_nativethread_lock_t *lock)
{
(void)native_cond_timedwait(cond, lock, 0);
}
static struct timespec
native_cond_timeout(rb_nativethread_cond_t *cond, struct timespec timeout_rel)
{
/* XXX: this is dirty and hides bugs in our callers */
if (timeout_rel.tv_nsec >= 1000000000) {
timeout_rel.tv_nsec -= 1000000000;
timeout_rel.tv_sec++;
}
return timeout_rel; /* Futexes use relative timeouts internally */
}
static int
gvl_waiting_p(const rb_global_vm_lock_t *gvl)
{
int nr = RB_ACCESS_ONCE(int, gvl->ftx);
return nr > 1;
}
static void
gvl_acquire_contended(rb_vm_t *vm, rb_thread_t *th)
{
while (ATOMIC_EXCHANGE(vm->gvl.ftx, 2)) {
/*
* reduce wakeups by only signaling the timer thread
* for the first waiter we get
*/
if (ATOMIC_INC(vm->gvl.waiting) == 0) {
rb_thread_wakeup_timer_thread_low();
}
futex_wait(&vm->gvl.ftx, 2, NULL);
ATOMIC_DEC(vm->gvl.waiting);
}
if (vm->gvl.need_yield) {
vm->gvl.need_yield = 0;
futex_wake(&vm->gvl.need_yield, 1);
}
}
static void
gvl_acquire(rb_vm_t *vm, rb_thread_t *th)
{
if (ATOMIC_CAS(vm->gvl.ftx, 0, 1)) {
gvl_acquire_contended(vm, th);
}
}
/* returns 1 if another thread is woken */
static int
gvl_release(rb_vm_t *vm)
{
int n = ATOMIC_EXCHANGE(vm->gvl.ftx, 0);
if (n == 1) return 0; /* fast path, no waiters */
assert(n != 0);
return futex_wake(&vm->gvl.ftx, 1);
}
static void
gvl_yield(rb_vm_t *vm, rb_thread_t *th)
{
int yielding = ATOMIC_EXCHANGE(vm->gvl.wait_yield, 1);
if (yielding) { /* another thread is inside gvl_yield */
if (gvl_release(vm)) {
sched_yield();
}
else {
futex_wait(&vm->gvl.wait_yield, 1, NULL);
}
gvl_acquire_contended(vm, th);
}
else {
int maybe_more = 0;
/*
* no need for atomic while holding GVL, gvl_acquire_contended
* cannot see need_yield change until we call gvl_release()
*/
vm->gvl.need_yield = 1;
if (gvl_release(vm)) {
/*
* we had active waiters, wait for them to wake us in
* gvl_acquire_contended
*/
do {
futex_wait(&vm->gvl.need_yield, 1, NULL);
} while (vm->gvl.need_yield);
ATOMIC_EXCHANGE(vm->gvl.wait_yield, 0);
/*
* we've waited ourselves, and have no idea who else is inside
* this function, so we must assume they exist and only wake up
* the rest AFTER we have the GVL
*/
maybe_more = 1;
}
else { /* nobody woken up when we released GVL */
/* we have no GVL, and no idea how be fair, here... */
vm->gvl.need_yield = 0;
ATOMIC_EXCHANGE(vm->gvl.wait_yield, 0);
/* first, try to avoid a thundering herd w/o GVL */
if (futex_wake(&vm->gvl.wait_yield, 1) > 0) {
maybe_more = 1;
}
/* either way, hope other threads get some work done... */
sched_yield();
}
if (maybe_more) {
gvl_acquire_contended(vm, th);
/*
* wake up the thundering herd now that we have
* the GVL. We cannot us FUTEX_REQUEUE here to
* migrate from gvl.wait_yield => gvl.ftx, since
* the gvl.waiting counter must be kept up-to-date,
* Maintaining correct gvl.waiting count here will
* be complex (or even impossible) to maintain for
* threads waiting on gvl.wait_yield.
*/
futex_wake(&vm->gvl.wait_yield, INT_MAX);
}
else {
gvl_acquire(vm, th);
}
}
}
static void
gvl_init(rb_vm_t *vm)
{
memset(&vm->gvl, 0, sizeof(rb_global_vm_lock_t));
}
RUBY_ALIAS_FUNCTION(gvl_destroy(rb_vm_t *vm), gvl_init, (vm));
#endif /* RB_THREAD_FUTEX_H */
thread_pthread.c
static void native_mutex_initialize(rb_nativethread_lock_t *lock);
static void native_mutex_destroy(rb_nativethread_lock_t *lock);
static void native_cond_signal(rb_nativethread_cond_t *cond);
#if !RB_PTHREAD_USE_FUTEX
static void native_cond_broadcast(rb_nativethread_cond_t *cond);
#endif
static void native_cond_wait(rb_nativethread_cond_t *cond, rb_nativethread_lock_t *mutex);
static void native_cond_initialize(rb_nativethread_cond_t *cond, int flags);
static void native_cond_destroy(rb_nativethread_cond_t *cond);
......
# define USE_SLEEPY_TIMER_THREAD 0
#endif
#if RB_PTHREAD_USE_FUTEX
# include "thread_futex.h"
#else /* RB_PTHREAD_USE_FUTEX == 0 */
# if USE_SLEEPY_TIMER_THREAD
static int
gvl_waiting_p(const rb_global_vm_lock_t *gvl)
{
return gvl->waiting > 0;
}
# endif
static void
gvl_acquire_common(rb_vm_t *vm)
{
......
native_mutex_destroy(&vm->gvl.lock);
}
#endif /* !RB_PTHREAD_USE_FUTEX */
#if defined(HAVE_WORKING_FORK)
static void
gvl_atfork(rb_vm_t *vm)
......
#define NATIVE_MUTEX_LOCK_DEBUG 0
#if !RB_PTHREAD_USE_FUTEX
static inline void native_thread_sysinit(void) { /* no-op */ }
static void
mutex_debug(const char *msg, void *lock)
{
......
return timeout;
}
#endif /* !RB_PTHREAD_USE_FUTEX */
#define native_cleanup_push pthread_cleanup_push
#define native_cleanup_pop pthread_cleanup_pop
......
{
rb_thread_t *th = GET_THREAD();
native_thread_sysinit();
pthread_key_create(&ruby_native_thread_key, NULL);
th->thread_id = pthread_self();
fill_thread_id_str(th);
......
need_polling = !ubf_threads_empty();
if (gvl->waiting > 0 || need_polling) {
if (gvl_waiting_p(gvl) || need_polling) {
/* polling (TIME_QUANTUM_USEC usec) */
result = poll(pollfds, 1, TIME_QUANTUM_USEC/1000);
}
thread_pthread.h
#include <pthread_np.h>
#endif
#define RB_NATIVETHREAD_LOCK_INIT PTHREAD_MUTEX_INITIALIZER
#define RB_NATIVETHREAD_COND_INIT { PTHREAD_COND_INITIALIZER, }
#if RB_PTHREAD_USE_FUTEX /* defined in include/ruby/thread_native.h */
# define RB_NATIVETHREAD_LOCK_INIT { 0 }
# define RB_NATIVETHREAD_COND_INIT { 0, 0, }
#else /* really using pthreads */
# define RB_NATIVETHREAD_LOCK_INIT PTHREAD_MUTEX_INITIALIZER
# define RB_NATIVETHREAD_COND_INIT { PTHREAD_COND_INITIALIZER, }
#endif
typedef struct rb_thread_cond_struct {
#if RB_PTHREAD_USE_FUTEX
rb_nativethread_lock_t *lock;
int seq;
#else
pthread_cond_t cond;
#endif
#ifdef HAVE_CLOCKID_T
clockid_t clockid;
#endif
......
#undef finally
typedef struct rb_global_vm_lock_struct {
#if RB_PTHREAD_USE_FUTEX
int ftx;
int waiting;
#else /* RB_PTHREAD_USE_FUTEX == 0 */
/* fast path */
unsigned long acquired;
rb_nativethread_lock_t lock;
......
/* yield */
rb_nativethread_cond_t switch_cond;
rb_nativethread_cond_t switch_wait_cond;
#endif /* RB_PTHREAD_USE_FUTEX == 0 */
int need_yield;
int wait_yield;
} rb_global_vm_lock_t;
-
    (1-1/1)