Project

General

Profile

Feature #3176 » thread-priorities-try3.diff

coatl (caleb clausen), 05/18/2010 02:41 AM

View differences:

eval.c
void rb_clear_trace_func(void);
void rb_thread_stop_timer_thread(void);
extern void rb_threadptr_interrupt(rb_thread_t *th);
void rb_call_inits(void);
void Init_heap(void);
......
ruby_finalize_1();
}
void rb_thread_stop_timer_thread(void);
int
ruby_cleanup(volatile int ex)
{
signal.c
{
int i, sig = 0;
/*this function could be made much faster by use of a bitmask and ffs() */
for (i=1; i<RUBY_NSIG; i++) {
if (signal_buff.cnt[i] > 0) {
rb_disable_interrupt();
thread.c
#include "eval_intern.h"
#include "gc.h"
#ifndef USE_NATIVE_THREAD_PRIORITY
#define USE_NATIVE_THREAD_PRIORITY 0
#define RUBY_THREAD_PRIORITY_MAX 3
#define RUBY_THREAD_PRIORITY_MIN -3
#endif
#ifndef THREAD_DEBUG
#define THREAD_DEBUG 0
#endif
......
static inline void blocking_region_end(rb_thread_t *th, struct rb_blocking_region_buffer *region);
static void rb_pqueue_enqueue(pqueue_t *pqueue, rb_thread_t *th, unsigned priority);
static rb_thread_t *rb_pqueue_dequeue(pqueue_t *pqueue);
static rb_thread_t *rb_pqueue_dequeue_starting_at(pqueue_t *pqueue, unsigned start_from, unsigned *found_at);
void rb_threadptr_interrupt(rb_thread_t *th);
#define RB_GC_SAVE_MACHINE_CONTEXT(th) \
do { \
rb_gc_save_machine_context(th); \
SET_MACHINE_STACK_END(&(th)->machine_stack_end); \
} while (0)
#define GVL_TAKE(th) \
while (0!=native_mutex_trylock(&(th)->vm->global_vm_lock)) { \
thread_debug("waiting for gvl\n"); \
/*might be good to check RUBY_VM_INTERRUPTED here*/ \
rb_pqueue_enqueue(&(th)->vm->ready_to_run_list, \
(th), \
RUBY_THREAD_PRIORITY_MAX-(th)->priority \
); \
rb_doze((th)); \
}
#define GVL_GIVE(th) \
do { \
rb_thread_t *th2; \
native_mutex_unlock(&(th)->vm->global_vm_lock); \
th2=rb_pqueue_dequeue(&(th)->vm->ready_to_run_list); \
thread_debug("giving up gvl to %p\n", th2); \
if (th2) rb_undoze(th2); \
} while(0)
#define GVL_UNLOCK_BEGIN() do { \
rb_thread_t *_th_stored = GET_THREAD(); \
RB_GC_SAVE_MACHINE_CONTEXT(_th_stored); \
native_mutex_unlock(&_th_stored->vm->global_vm_lock)
GVL_GIVE(_th_stored)
#define GVL_UNLOCK_END() \
native_mutex_lock(&_th_stored->vm->global_vm_lock); \
GVL_TAKE(_th_stored); \
rb_thread_set_current(_th_stored); \
} while(0)
......
(th)->status = THREAD_STOPPED; \
thread_debug("enter blocking region (%p)\n", (void *)(th)); \
RB_GC_SAVE_MACHINE_CONTEXT(th); \
native_mutex_unlock(&(th)->vm->global_vm_lock); \
GVL_GIVE(th); \
} while (0)
#define BLOCKING_REGION(exec, ubf, ubfarg) do { \
......
}
static void
rb_pqueue_flush(pqueue_t *pqueue)
{
memset(pqueue,0,sizeof(pqueue));
native_mutex_initialize(&pqueue->lock);
}
static void
rb_pqueue_initialize(pqueue_t *pqueue)
{
rb_pqueue_flush(pqueue);
if (sizeof(pqueue->mask)*CHAR_BIT<RUBY_NUM_PRIORITIES)
rb_fatal("pqueue_t.mask smaller than %d bits!", RUBY_NUM_PRIORITIES);
if (!getenv("THREAD_PRIOS_WARN")) {
rb_warn("need benchmarks");
rb_warn("need to test thread priorities more");
ruby_setenv("THREAD_PRIOS_WARN","1");
}
}
void
rb_pqueue_destroy(pqueue_t *pqueue)
{
native_mutex_destroy(&pqueue->lock);
memset(pqueue,0,sizeof(pqueue));
}
static void
rb_pqueue_enqueue(pqueue_t *pqueue, rb_thread_t *th, unsigned priority)
{
rb_thread_t *queue;
if (priority>=RUBY_NUM_PRIORITIES) priority=RUBY_NUM_PRIORITIES-1;
/*th->next should be NULL here*/
native_mutex_lock(&pqueue->lock);
pqueue->mask |= 1<<priority;
queue=pqueue->queues[priority];
if (queue==NULL) {
th->next=th;
} else {
th->next=queue->next;
queue->next=th;
}
pqueue->queues[priority]=th;
native_mutex_unlock(&pqueue->lock);
}
static rb_thread_t *
rb_pqueue_dequeue(pqueue_t *pqueue)
{
int i;
rb_thread_t *result;
unsigned mask;
native_mutex_lock(&pqueue->lock);
mask = pqueue->mask;
i=ffs(mask)-1;
if (i==-1) {
result=NULL;
} else {
rb_thread_t *queue=pqueue->queues[i];
/*queue should be non-NULL here*/
result=queue->next;
if (result==queue) { /*last item in this queue?*/
pqueue->queues[i]=NULL;
pqueue->mask &= ~(1<<i);
} else {
queue->next=result->next;
}
result->next=NULL;
}
native_mutex_unlock(&pqueue->lock);
return result;
}
static rb_thread_t *
rb_pqueue_dequeue_starting_at(pqueue_t *pqueue, unsigned start_from, unsigned *found_at)
{
int i;
rb_thread_t *result;
unsigned mask;
mask=(1<<start_from)-1;
mask=~mask;
native_mutex_lock(&pqueue->lock);
mask &= pqueue->mask;
i=ffs(mask)-1;
if (i==-1) {
result=NULL;
*found_at=-1;
} else {
rb_thread_t *queue=pqueue->queues[i];
/*queue should be non-NULL here*/
*found_at=i;
result=queue->next;
if (result==queue) { /*last item in this queue?*/
pqueue->queues[i]=NULL;
pqueue->mask &= ~(1<<i);
} else {
queue->next=result->next;
}
result->next=NULL;
}
native_mutex_unlock(&pqueue->lock);
return result;
}
/*returns the priority of the highest priority item in the queue.
returns -1 if the queue is empty.
note: this returns a queue-relative priority (0..31, with 0==highest prio),
rather than a ruby-level priority (-16..15, with 15==highest prio).
*/
static int
rb_pqueue_highest_priority(pqueue_t *pqueue)
{
return ffs(pqueue->mask)-1;
}
static void
rb_pqueue_rotate(pqueue_t *pqueue)
{
unsigned i=pqueue->next_promote_index;
if (i){
rb_thread_t *promoting;
unsigned found_at;
promoting=rb_pqueue_dequeue_starting_at(pqueue,i,&found_at);
if (!promoting) promoting=rb_pqueue_dequeue_starting_at(pqueue,0,&found_at);
if (promoting) rb_pqueue_enqueue(pqueue,promoting,found_at-1);
}
if (++pqueue->next_promote_index>=RUBY_NUM_PRIORITIES) pqueue->next_promote_index=0;
}
static void
set_unblock_function(rb_thread_t *th, rb_unblock_function_t *func, void *arg,
struct rb_unblock_callback *old)
{
......
native_mutex_unlock(&th->interrupt_lock);
}
/*notify a thread that it should stop waiting and call the thread's
unblocking function. see rb_thread_blocking_region for a
description of blocking regions and unblocking functions. Typically,
th->unblock.func is set to one of these:
ubf_handle (win32)
ubf_pthread_cond_signal (pthreads)
ubf_select
lock_interrupt
rb_big_stop
and th->unblock.arg is set to th. However, they might be different if
an extention used rb_thread_blocking_region or rb_thread_call_without_gvl
to define a custom blocking region.
*/
void
rb_threadptr_interrupt(rb_thread_t *th)
{
......
#endif
thread_debug("thread start: %p\n", (void *)th);
native_mutex_lock(&th->vm->global_vm_lock);
GVL_TAKE(th);
{
thread_debug("thread start (get lock): %p\n", (void *)th);
rb_thread_set_current(th);
......
thread_unlock_all_locking_mutexes(th);
if (th != main_th) rb_check_deadlock(th->vm);
if (th->vm->main_thread == th) {
/*ending main thread; interpreter will exit*/
ruby_cleanup(state);
}
else {
thread_cleanup_func(th);
native_mutex_unlock(&th->vm->global_vm_lock);
GVL_GIVE(th);
}
return 0;
......
rb_thread_wait_for(rb_time_timeval(INT2FIX(sec)));
}
static int
rb_there_are_equal_or_higher_priority_threads(rb_thread_t *th)
{
int highest_waiting=rb_pqueue_highest_priority(&th->vm->ready_to_run_list);
if (highest_waiting==-1) return 0;
highest_waiting=RUBY_THREAD_PRIORITY_MAX-highest_waiting;
return(highest_waiting>=th->priority);
}
static void rb_threadptr_execute_interrupts_rec(rb_thread_t *, int);
#define TICKS_PER_ROTATION 4
static void
rb_thread_schedule_rec(int sched_depth)
{
static int ticks_til_rotate=TICKS_PER_ROTATION;
thread_debug("rb_thread_schedule\n");
if (!rb_thread_alone()) {
rb_thread_t *th = GET_THREAD();
if (!sched_depth || rb_there_are_equal_or_higher_priority_threads(th)) {
thread_debug("rb_thread_schedule/switch start\n");
thread_debug("rb_thread_schedule/switch start\n");
RB_GC_SAVE_MACHINE_CONTEXT(th);
GVL_GIVE(th);
GVL_TAKE(th);
RB_GC_SAVE_MACHINE_CONTEXT(th);
native_mutex_unlock(&th->vm->global_vm_lock);
{
native_thread_yield();
rb_thread_set_current(th);
thread_debug("rb_thread_schedule/switch done\n");
}
native_mutex_lock(&th->vm->global_vm_lock);
rb_thread_set_current(th);
thread_debug("rb_thread_schedule/switch done\n");
if (sched_depth){
if (ticks_til_rotate) {
--ticks_til_rotate;
} else {
ticks_til_rotate=TICKS_PER_ROTATION;
rb_pqueue_rotate(&th->vm->ready_to_run_list);
}
}
if (!sched_depth && UNLIKELY(GET_THREAD()->interrupt_flag)) {
rb_threadptr_execute_interrupts_rec(GET_THREAD(), sched_depth+1);
}
if (!sched_depth && UNLIKELY(GET_THREAD()->interrupt_flag)) {
rb_threadptr_execute_interrupts_rec(GET_THREAD(), sched_depth+1);
}
}
}
......
static inline void
blocking_region_end(rb_thread_t *th, struct rb_blocking_region_buffer *region)
{
native_mutex_lock(&th->vm->global_vm_lock);
GVL_TAKE(th);
rb_thread_set_current(th);
thread_debug("leave blocking region (%p)\n", (void *)th);
remove_signal_thread_list(th);
......
return Qnil;
}
/*
/* check the current thread for 'interrupts', (asynchronous events sent by other
* threads or the system) and handle them if present. Here are the types of
* 'interrupt':
* a signal
* an exception sent asynchonously (via Thread#raise)
* c-level finalizers which are run as a result of garbage collection
* the thread's time slice has expired so it must give up time to other threads
*
* this method and rb_thread_schedule_rec are mutually recursive; however,
* the sched_depth counter prevents re-entry into the time slice expiry logic.
* (so this method should never be recursed into more than twice, and never
* more than once in the time slice expiry logic.)
*/
static void
......
sched_depth++;
EXEC_EVENT_HOOK(th, RUBY_EVENT_SWITCH, th->cfp->self, 0, 0);
if (th->slice > 0) {
th->slice--;
}
else {
reschedule:
rb_thread_schedule_rec(sched_depth+1);
if (th->slice < 0) {
th->slice++;
goto reschedule;
}
else {
th->slice = th->priority;
}
}
rb_thread_schedule_rec(sched_depth+1);
}
}
}
......
/*****************************************************/
/*just an alias for rb_threadptr_interrupt, appearently... so why is it needed?*/
static void
rb_threadptr_ready(rb_thread_t *th)
{
......
* will run more frequently than lower-priority threads (but lower-priority
* threads can also run).
*
* This is just hint for Ruby thread scheduler. It may be ignored on some
* platform.
*
* count1 = count2 = 0
* a = Thread.new do
* loop { count1 += 1 }
......
priority = RUBY_THREAD_PRIORITY_MIN;
}
th->priority = priority;
th->slice = priority;
#endif
return INT2NUM(th->priority);
}
......
vm->main_thread = th;
native_mutex_reinitialize_atfork(&th->vm->global_vm_lock);
rb_pqueue_flush(&vm->ready_to_run_list);
st_foreach(vm->living_threads, atfork, (st_data_t)th);
st_clear(vm->living_threads);
st_insert(vm->living_threads, thval, (st_data_t)th->thread_id);
......
rb_thread_lock_t *lp = &GET_THREAD()->vm->global_vm_lock;
native_mutex_initialize(lp);
native_mutex_lock(lp);
rb_pqueue_initialize(&GET_THREAD()->vm->ready_to_run_list);
native_mutex_initialize(&GET_THREAD()->interrupt_lock);
}
}
thread_pthread.c
return pthread_setspecific(ruby_native_thread_key, th) == 0;
}
/*called once to initialize the main thread*/
static void
Init_native_thread(void)
{
......
#endif
CHECK_ERR(pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED));
pthread_cond_init(&th->native_thread_data.sleep_cond, 0);
err = pthread_create(&th->thread_id, &attr, thread_start_func_1, th);
thread_debug("create: %p (%d)", (void *)th, err);
thread_debug("create: %p (%d)\n", (void *)th, err);
CHECK_ERR(pthread_attr_destroy(&attr));
if (!err) {
pthread_cond_init(&th->native_thread_data.sleep_cond, 0);
if (err) {
pthread_cond_destroy(&th->native_thread_data.sleep_cond);
}
}
return err;
......
#define PER_NANO 1000000000
/*go into a 'light sleep', while waiting for the GVL
to become available. To be called by ready threads
that are waiting to run.
*/
static void
rb_doze(rb_thread_t *th)
{
int r;
pthread_mutex_lock(&th->interrupt_lock);
thread_debug("doze: pthread_cond_wait start\n");
r = pthread_cond_wait(&th->native_thread_data.sleep_cond,
&th->interrupt_lock);
thread_debug("doze: pthread_cond_wait end\n");
if (r) rb_bug_errno("pthread_cond_wait", r);
pthread_mutex_unlock(&th->interrupt_lock);
}
static void
rb_undoze(rb_thread_t *th)
{
pthread_cond_signal(&th->native_thread_data.sleep_cond);
}
static void
native_sleep(rb_thread_t *th, struct timeval *tv)
{
thread_win32.c
return TlsSetValue(ruby_native_thread_key, th);
}
/*called once to initialize the main thread*/
static void
Init_native_thread(void)
{
......
thread_debug(" w32_wait_events events:%p, count:%d, timeout:%ld, th:%p\n",
events, count, timeout, th);
if (th && (intr = th->native_thread_data.interrupt_event)) {
native_mutex_lock(&th->vm->global_vm_lock);
GVL_TAKE(th);
if (intr == th->native_thread_data.interrupt_event) {
w32_reset_event(intr);
if (RUBY_VM_INTERRUPTED(th)) {
......
targets[count++] = intr;
thread_debug(" * handle: %p (count: %d, intr)\n", intr, count);
}
native_mutex_unlock(&th->vm->global_vm_lock);
GVL_GIVE(th);
}
thread_debug(" WaitForMultipleObjects start (count: %d)\n", count);
......
return ret;
}
/*go into a 'light sleep', while waiting for the GVL
to become available. To be called by ready threads
that are waiting to run.
*/
static void
rb_doze(rb_thread_t *th)
{
DWORD ret;
thread_debug("doze start\n");
ret=WaitForSingleObject(th->interrupt_event, INFINITE);
thread_debug("doze done (%lu)\n", ret);
if (WAIT_OBJECT_0 != ret) w32_error("WaitForSingleObject in doze");
}
static void
rb_undoze(rb_thread_t *th)
{
w32_set_event(th->native_thread_data.interrupt_event);
}
static void
native_sleep(rb_thread_t *th, struct timeval *tv)
{
......
thread_debug("native_sleep start (%lu)\n", msec);
ret = w32_wait_events(0, 0, msec, th);
thread_debug("native_sleep done (%lu)\n", ret);
/*should check for error and rb_bug if there was one here*/
}
native_mutex_lock(&th->interrupt_lock);
vm.c
void vm_analysis_register(int reg, int isset);
void vm_analysis_insn(int insn);
extern void rb_pqueue_destroy(pqueue_t *pqueue);
void
rb_vm_change_state(void)
{
......
}
rb_thread_lock_unlock(&vm->global_vm_lock);
rb_thread_lock_destroy(&vm->global_vm_lock);
rb_pqueue_destroy(&vm->ready_to_run_list);
ruby_xfree(vm);
ruby_current_vm = 0;
#if defined(ENABLE_VM_OBJSPACE) && ENABLE_VM_OBJSPACE
vm_core.h
#include <setjmp.h>
#include <signal.h>
#ifndef USE_NATIVE_THREAD_PRIORITY
#define USE_NATIVE_THREAD_PRIORITY 0
#define RUBY_THREAD_PRIORITY_MAX 15
#define RUBY_THREAD_PRIORITY_MIN -16
#define RUBY_NUM_PRIORITIES (1+RUBY_THREAD_PRIORITY_MAX-RUBY_THREAD_PRIORITY_MIN)
#endif
#ifndef NSIG
# define NSIG (_SIGMAX + 1) /* For QNX */
#endif
......
void rb_objspace_free(struct rb_objspace *);
#endif
struct rb_thread_struct;
typedef struct priority_queue {
/*elements in queues are circularly linked lists of rb_thread_t,
and queues[i] points to the _tail_ of the queue. in this way,
both the head and tail of the queue are easily accessible (O(1))
but only one word is required to hold a pointer to the queue.
*/
struct rb_thread_struct *queues[RUBY_NUM_PRIORITIES];
/*queues[0]==highest prio, queues[RUBY_NUM_PRIORITIES-1]==lowest prio*/
/*mask holds a index of which elements in queues are nonempty.
if queues[i]!=NULL, then mask&(1<<i) is set.
*/
unsigned mask; /*must be at least RUBY_NUM_PRIORITIES bits*/
unsigned next_promote_index; /*makes this into a fair priority queue*/
rb_thread_lock_t lock;
} pqueue_t;
typedef struct rb_vm_struct {
VALUE self;
rb_thread_lock_t global_vm_lock;
pqueue_t ready_to_run_list;
struct rb_thread_struct *main_thread;
struct rb_thread_struct *running_thread;
......
rb_thread_id_t thread_id;
enum rb_thread_status status;
int priority;
int slice;
native_thread_data_t native_thread_data;
void *blocking_region_buffer;
......
/* misc */
int method_missing_reason;
int abort_on_exception;
struct rb_thread_struct *next;
#ifdef USE_SIGALTSTACK
void *altstack;
#endif
(2-2/5)