Feature #17

deadlock detection for 1.9

Added by Yusuke Endoh almost 6 years ago. Updated over 1 year ago.

Status:Closed
Priority:Normal
Assignee:Yusuke Endoh
Category:-
Target version:-

Description

=begin
遠藤です。

# redmine から投稿してみます。

1.9 のスレッドにデッドロック検出を実装してみました。

  • スレッドの状態種別に THREADSTOPPEDFOREVER を追加。
    無期限の sleep 状態と mutex 解放待ち状態を表す。

  • rbvmt に変数 sleeper を追加。
    THREADSTOPPEDFOREVER 状態のスレッドの数を表す。

  • rbthreadt に変数 locking_mutex を追加。
    このスレッドが待っている mutex を表す。

  • rbthreadt に sttable *keepingmutexes を追加。
    このスレッドがロックしている mutex たちを表す。

  • mutext に変数 condnotified を追加。
    cond_signal されてまだ起動していないスレッドの数を表す。

  • スレッドの終了時にロックしていた mutex をすべて
    解放するようにした。

  • rbmutexlock や rbmutexunlock で追加した変数たちを
    適宜更新するようにした。

  • 唯一動いていそうなスレッドが nativecondwait しそうな
    時は、lock_func を抜けてデッドロック検査するようにした。

  • rbcheckdeadlock では vm->living_threads を列挙して、

    • 例外状態のスレッド、または
    • locking_mutex のロックに成功したスレッド、または
    • lockingmutex が誰にもロックされていないスレッド、 のいずれもなければ mainthread にデッドロックを投げる ようにした。

    どんなもんでしょう。

    Index: thread_pthread.c

    --- threadpthread.c (revision 16676)
    +++ thread
    pthread.c (working copy)
    @@ -418,7 +418,14 @@
    }
    }
    - th->status = THREADSTOPPED;
    + if (tv) {
    + th->status = THREAD
    STOPPED;
    + }
    + else {
    + th->status = THREADSTOPPEDFOREVER;
    + th->vm->sleeper++;
    + rbcheckdeadlock(th->vm);
    + }

    threaddebug("nativesleep %ld\n", tv ? tv->tvsec : -1);
    GVL
    UNLOCKBEGIN();
    @@ -455,9 +462,10 @@
    th->unblock
    function_arg = 0;

    pthreadmutexunlock(&th->interruptlock);
    - th->status = prev
    status;
    }
    GVLUNLOCKEND();
    + th->status = prevstatus;
    + if (!tv) th->vm->sleeper--;
    RUBY
    VMCHECKINTS();

    threaddebug("nativesleep done\n");

    Index: bootstraptest/test_thread.rb

    --- bootstraptest/testthread.rb (revision 16676)
    +++ bootstraptest/test
    thread.rb (working copy)
    @@ -268,3 +268,66 @@
    at_exit { Fiber.new{}.resume }
    }

    +assertequal 'ok', %q{
    + begin
    + Thread.new { sleep }
    + sleep
    + :ng
    + rescue Exception
    + :ok
    + end
    +}
    +
    +assert
    equal 'ok', %q{
    + begin
    + m1, m2 = Mutex.new, Mutex.new
    + Thread.new { m1.lock; sleep 1; m2.lock }
    + m2.lock; sleep 1; m1.lock
    + sleep
    + :ng
    + rescue Exception
    + :ok
    + end
    +}
    +
    +assertequal 'ok', %q{
    + begin
    + m = Mutex.new
    + Thread.new { m.lock }; m.lock
    + :ok
    + rescue Exception
    + :ng
    + end
    +}
    +
    +assert
    equal 'ok', %q{
    + begin
    + m = Mutex.new
    + Thread.new { m.lock }.join; m.lock
    + :ok
    + rescue Exception
    + :ng
    + end
    +}
    +
    +assertequal 'ok', %q{
    + begin
    + m = Mutex.new
    + Thread.new { m.lock; sleep 2 }
    + sleep 1; m.lock
    + :ok
    + rescue Exception
    + :ng
    + end
    +}
    +
    +assert
    equal 'ok', %q{
    + begin
    + m = Mutex.new
    + Thread.new { m.lock; sleep 2; m.unlock }
    + sleep 1; m.lock
    + :ok
    + rescue Exception
    + :ng
    + end
    +}

    Index: vm_core.h

    --- vmcore.h (revision 16676)
    +++ vm
    core.h (working copy)
    @@ -300,6 +300,7 @@
    int running;
    int threadabortonexception;
    unsigned long trace
    flag;
    + volatile int sleeper;

    /* object management */
    VALUE markobjectary;
    @@ -354,6 +355,7 @@
    THREADTOKILL,
    THREADRUNNABLE,
    THREAD
    STOPPED,
    + THREADSTOPPEDFOREVER,
    THREAD_KILLED,
    };

    @@ -421,6 +423,8 @@
    rbunblockfunctiont *unblockfunction;
    void *unblockfunctionarg;
    rbthreadlockt interruptlock;
    + VALUE lockingmutex;
    + st
    table *keeping_mutexes;

    struct rbvmtag *tag;
    struct rbvmtraptag *traptag;

    Index: thread.c

    --- thread.c (revision 16676)
    +++ thread.c (working copy)
    @@ -62,6 +62,9 @@
    struct timeval rbtimeinterval(VALUE);
    static int rbthreaddead(rbthreadt *th);

    +static int unlocki(stdatat key, stdatat val, rbthreadt *th);
    +static void rb
    checkdeadlock(rbvmt *vm);
    +
    void rb
    signalexec(rbthreadt *th, int sig);
    void rb
    disable_interrupt(void);

    @@ -92,13 +95,13 @@
    rbthreadsetcurrent(th_stored); \
    } while(0)

    -#define BLOCKINGREGION(exec, ubf, ubfarg) do { \
    +#define BLOCKING
    REGION(exec, ubf, ubfarg, stopped) do { \
    rbthreadt *th = GETTHREAD(); \
    int _
    prevstatus = _th->status; \
    rbunblockfunction_t *
    oldubf; \
    void *oldubfarg; \
    setunblockfunction(
    th, ubf, ubfarg, &oldubf, &oldubfarg); \
    - th->status = THREADSTOPPED; \
    + if (stopped) _
    th->status = THREADSTOPPED; \
    thread
    debug("enter blocking region (%p)\n", _th); \
    GVL
    UNLOCKBEGIN(); {\
    exec; \
    @@ -107,10 +110,9 @@
    thread
    debug("leave blocking region (%p)\n", _th); \
    remove
    signalthreadlist(
    th); \
    setunblockfunction(th, _oldubf, _oldubfarg, 0, 0); \
    - if (
    th->status == THREADSTOPPED) { \
    + if (stopped && _
    th->status == THREADSTOPPED) { \
    _
    th->status = _prevstatus; \
    } \
    - RUBYVMCHECK_INTS(); \
    } while(0)

    #if THREADDEBUG
    @@ -197,19 +199,11 @@
    set
    unblockfunction(rbthreadt *th, rbunblockfunctiont func, void *arg,
    rbunblockfunction_t *
    oldfunc, void *oldarg)
    {
    - checkints:
    - RUBY
    VMCHECKINTS(); /
    check signal or so */
    nativemutexlock(&th->interruptlock);
    - if (th->interrupt
    flag) {
    - nativemutexunlock(&th->interruptlock);
    - goto check
    ints;
    - }
    - else {
    - if (oldfunc) *oldfunc = th->unblockfunction;
    - if (oldarg) *oldarg = th->unblock
    functionarg;
    - th->unblock
    function = func;
    - th->unblockfunctionarg = arg;
    - }
    + if (oldfunc) *oldfunc = th->unblockfunction;
    + if (oldarg) *oldarg = th->unblock
    functionarg;
    + th->unblock
    function = func;
    + th->unblockfunctionarg = arg;
    nativemutexunlock(&th->interrupt_lock);
    }

    @@ -259,6 +253,11 @@
    threaddebug("rbthreadterminateall (main thread: %p)\n", th);
    stforeach(vm->livingthreads, terminatei, (stdatat)th);
    + /* unlock all locking mutexes */
    + if (th->keeping
    mutexes) {
    + stforeach(th->keepingmutexes, unlocki, 0);
    + }
    +
    while (!rb
    threadalone()) {
    PUSH
    TAG();
    if (EXECTAG() == 0) {
    @@ -354,6 +353,17 @@
    }
    TH
    POPTAG();
    + /* locking
    mutex must be Qfalse /
    + if (th->lockingmutex != Qfalse) {
    + rb
    bug("threadstartfunc2: lockingmutex must be NULL (%p:%p)", th, (void
    )th->lockingmutex);
    + }
    +
    + /* unlock all locking mutexes */
    + if (th->keeping
    mutexes) {
    + stforeach(th->keepingmutexes, unlocki, 0);
    + }
    +
    + /* delete self from living
    threads */
    stdeletewrap(th->vm->living_threads, th->self);

    /* wake up joinning threads */
    @@ -363,7 +373,7 @@
    rbthreadinterrupt(jointh);
    join
    th = jointh->joinlistnext;
    }
    - st
    deletewrap(th->vm->livingthreads, th->self);
    + rbcheckdeadlock(th->vm);

    if (!th->rootfiber) {
    rb
    threadrecyclestack_release(th->stack);
    @@ -775,7 +785,8 @@

    BLOCKINGREGION({
    val = func(data1);
    - }, ubf, data2);
    + }, ubf, data2, 1);
    + RUBY
    VMCHECKINTS();

    return val;
    }
    @@ -1135,6 +1146,7 @@
    switch (th->status) {
    case THREADRUNNABLE:
    case THREAD
    STOPPED:
    + case THREADSTOPPEDFOREVER:
    case THREADTOKILL:
    rbarypush(ary, th->self);
    default:
    @@ -1329,6 +1341,7 @@
    case THREADRUNNABLE:
    return "run";
    case THREAD
    STOPPED:
    + case THREADSTOPPEDFOREVER:
    return "sleep";
    case THREADTOKILL:
    return "aborting";
    @@ -1428,7 +1441,7 @@

    if (rbthreaddead(th))
    return Qtrue;
    - if (th->status == THREADSTOPPED)
    + if (th->status == THREAD
    STOPPED || th->status == THREADSTOPPEDFOREVER)
    return Qtrue;
    return Qfalse;
    }
    @@ -1868,14 +1881,16 @@
    if (except) *except = origexcept;
    wait = &wait
    100ms;
    } while (_th->interruptflag == 0 && (timeout == 0 || subst(timeout, &wait100ms)));
    - }, 0, 0);
    + }, 0, 0, 1);
    + RUBY
    VMCHECKINTS();
    } while (result == 0 && (timeout == 0 || subst(timeout, &wait100ms)));
    }
    #else
    BLOCKING
    REGION({
    result = select(n, read, write, except, timeout);
    if (result < 0) lerrno = errno;
    - }, ubfselect, GETTHREAD());
    + }, ubfselect, GETTHREAD(), 1);
    + RUBYVMCHECK_INTS();
    #endif

    errno = lerrno;
    @@ -2070,6 +2085,7 @@
    stforeach(vm->livingthreads, terminateatforki, (stdatat)th);
    stclear(vm->livingthreads);
    stinsert(vm->livingthreads, thval, (stdatat) th->thread_id);
    + vm->sleeper = 0;
    }

    static int
    @@ -2096,6 +2112,7 @@
    stforeach(vm->livingthreads, terminateatforkbeforeexeci, (stdatat)th);
    stclear(vm->livingthreads);
    stinsert(vm->livingthreads, thval, (stdatat) th->thread_id);
    + vm->sleeper = 0;
    }

    struct thgroup {
    @@ -2312,7 +2329,7 @@
    rbthreadlockt lock;
    rb
    threadcondt cond;
    rbthreadt volatile *th;
    - volatile int condwaiting;
    + volatile int cond
    waiting, condnotified;
    } mutex
    t;

    #define GetMutexPtr(obj, tobj) \
    @@ -2384,6 +2401,15 @@
    return mutex->th ? Qtrue : Qfalse;
    }

    +static void
    +mutexlocked(rbthreadt *th, VALUE self)
    +{
    + if (!th->keeping
    mutexes) {
    + th->keepingmutexes = stinitnumtable();
    + }
    + st
    insert(th->keepingmutexes, self, (stdatat) th->threadid);
    +}
    +
    /*

  • call-seq:

  • mutex.trylock => true or false
    @@ -2406,6 +2432,8 @@
    if (mutex->th == 0) {
    mutex->th = GET
    THREAD();
    locked = Qtrue;
    +

    • mutexlocked(GETTHREAD(), self); } nativemutexunlock(&mutex->lock);

    @@ -2413,17 +2441,23 @@
    }

    static int
    -lockfunc(rbthreadt *th, mutext *mutex)
    +lockfunc(rbthreadt *th, mutext *mutex, int last_thread)
    {
    int interrupted = Qfalse;

    nativemutexlock(&mutex->lock);
    while (mutex->th || (mutex->th = th, 0)) {
    + if (lastthread) {
    + interrupted = 2;
    + break;
    + }
    +
    mutex->cond
    waiting++;
    nativecondwait(&mutex->cond, &mutex->lock);
    + mutex->condnotified--;
    - if (th->interrupt
    flag) {
    - interrupted = Qtrue;
    + if (RUBYVMINTERRUPTED(th)) {
    + interrupted = 1;
    break;
    }
    }
    @@ -2438,6 +2472,7 @@
    nativemutexlock(&mutex->lock);
    if (mutex->condwaiting > 0) {
    native
    condbroadcast(&mutex->cond);
    + mutex->cond
    notified += mutex->condwaiting;
    mutex->cond
    waiting = 0;
    }
    nativemutexunlock(&mutex->lock);
    @@ -2460,11 +2495,30 @@

    while (mutex->th != th) {
    int interrupted;
    + int prevstatus = th->status;
    + int last
    thread = 0;
    + th->lockingmutex = self;
    + th->status = THREAD
    STOPPEDFOREVER;
    + th->vm->sleeper++;
    + if (th->vm->living
    threads->numentries == th->vm->sleeper) {
    + last
    thread = 1;
    + }
    +
    BLOCKINGREGION({
    - interrupted = lock
    func(th, mutex);
    - }, lockinterrupt, mutex);
    + interrupted = lock
    func(th, mutex, lastthread);
    + }, lock
    interrupt, mutex, 0);
    + th->lockingmutex = Qfalse;
    + if (interrupted == 2) {
    + /* assert: mutex->th != th */
    + rb
    checkdeadlock(th->vm);
    + }
    + th->status = prev
    status;
    + th->vm->sleeper--;
    +
    + if (mutex->th == th) mutexlocked(th, self);
    +
    if (interrupted) {
    RUBY
    VMCHECKINTS();
    }
    @@ -2473,15 +2527,8 @@
    return self;
    }

    -/*
    - * call-seq:
    - * mutex.unlock => self
    - *
    - * Releases the lock.
    - * Raises +ThreadError+ if +mutex+ wasn't locked by the current thread.
    - /
    -VALUE
    -rbmutexunlock(VALUE self)
    +static char *
    +mutexunlock(VALUE self)
    {
    mutex
    t *mutex;
    char *err = NULL;
    @@ -2501,16 +2548,45 @@
    /
    waiting thread */
    nativecondsignal(&mutex->cond);
    mutex->condwaiting--;
    + mutex->cond
    notified++;
    }
    }

    nativemutexunlock(&mutex->lock);
    + return err;
    +}
    +
    +/*
    + * call-seq:
    + * mutex.unlock => self
    + *
    + * Releases the lock.
    + * Raises +ThreadError+ if +mutex+ wasn't locked by the current thread.
    + */
    +VALUE
    +rbmutexunlock(VALUE self)
    +{
    + char *err;
    +
    + err = mutexunlock(self);
    +
    + if (!err) st
    deletewrap(GETTHREAD()->keepingmutexes, self);
    if (err) rb
    raise(rb_eThreadError, err);

    return self;
    }

    +static int
    +unlocki(stdatat key, stdatat val, rbthreadt *th)
    +{
    + VALUE mtxval = key;
    +
    + mutex
    unlock(mtxval);
    +
    + return STCONTINUE;
    +}
    +
    static VALUE
    rb
    mutexsleepforever(VALUE time)
    {
    @@ -2579,6 +2655,51 @@
    return rbensure(func, arg, rbmutex_unlock, mutex);
    }

    +static int
    +checkdeadlocki(stdatat key, stdatat val, int found)
    +{
    + VALUE thval = key;
    + rbthreadt *th;
    + GetThreadPtr(thval, th);
    +
    + if (th->status != THREADSTOPPEDFOREVER) {
    + rbbug("checkdeadlocki: thread that is not THREADSTOPPEDFOREVER found (%p:%d)", th, th->status);
    + }
    +
    + if (RUBY
    VMINTERRUPTED(th)) {
    + *found = 1;
    + }
    + else if (th->locking
    mutex) {
    + mutext *mutex;
    + GetMutexPtr(th->locking
    mutex, mutex);
    +
    + nativemutexlock(&mutex->lock);
    + if (mutex->th == th || (!mutex->th && mutex->condnotified)) {
    + *found = 1;
    + }
    + native
    mutex_unlock(&mutex->lock);
    + }
    +
    + return (
    found) ? STSTOP : STCONTINUE;
    +}
    +
    +static void
    +rbcheckdeadlock(rbvmt vm)
    +{
    + int found = 0;
    +
    + if (vm->livingthreads->numentries != vm->sleeper) return;
    +
    + stforeach(vm->livingthreads, checkdeadlocki, (stdatat)&found);
    +
    + if (!found) {
    + VALUE argv[2];
    + argv[0] = rbeFatal;
    + argv[1] = rb
    strnew2("deadlock detected");
    + rb
    threadraise(2, argv, vm->mainthread);
    + }
    +}
    +
    /

  • Document-class: Barrier
    */

    Index: vm.c

    --- vm.c (revision 16676)
    +++ vm.c (working copy)
    @@ -1445,6 +1445,13 @@
    RUBYFREEUNLESS_NULL(th->stack);
    }

    • if (th->locking_mutex != Qfalse) {
    • rbbug("threadfree: lockingmutex must be NULL (%p:%d)", th, th->lockingmutex);
    • }
    • if (th->keeping_mutexes) {
    • stfreetable(th->keeping_mutexes);
    • } + if (th->localstorage) { stfreetable(th->localstorage); } @@ -1512,6 +1519,12 @@ RUBYMARKUNLESSNULL(th->rootfiber); RUBYMARKUNLESSNULL(th->statinsn_usage);
    • RUBYMARKUNLESSNULL(th->lockingmutex); +
    • if (th->keeping_mutexes) {
    • stforeach(th->keepingmutexes, vmmarkeachthreadfunc, 0);
    • } + rbmarktbl(th->local_storage);

    if (GETTHREAD() != th && th->machinestackstart && th->machinestack_end) {

    Yusuke ENDOH mame@tsg.ne.jp
    =end

patch Magnifier - パッチ (13.2 KB) Yusuke Endoh, 05/28/2008 11:31 PM

History

#1 Updated by Yusuke Endoh almost 6 years ago

  • Status changed from Open to Closed
  • Assignee changed from Koichi Sasada to Yusuke Endoh

=begin
integrated in r17110
=end

Also available in: Atom PDF