Project

General

Profile

Feature #7390 ยป funny_falcon_threads.patch

zzak (zzak _), 11/19/2012 08:35 AM

View differences:

test/ruby/test_thread.rb
end.join
end
class Serializer
def initialize
@mutex = Mutex.new
@condvar = ConditionVariable.new
end
def wait
@mutex.synchronize{
@condvar.wait(@mutex)
}
end
def signal
@mutex.synchronize{ @condvar.signal }
end
end
def test_condvar_wait_timeout
serialize = Serializer.new
mutex = Mutex.new
condvar = ConditionVariable.new
def condvar.waiters
@waiters
end
thread = Thread.new do
serialize.signal
mutex.synchronize do
condvar.wait(mutex, 0.001)
end
end
serialize.wait
mutex.synchronize do
sleep(0.01)
assert_not_includes(condvar.waiters, thread)
end
end
def test_condvar_wait_timeout_2
serialize = Serializer.new
mutex = Mutex.new
condvar = ConditionVariable.new
wait_timeout = Thread.new do
serialize.signal
mutex.synchronize do
condvar.wait(mutex, 0.001)
end
end
serialize.wait
wait_forever = Thread.new do
serialize.signal
mutex.synchronize do
condvar.wait(mutex)
end
end
serialize.wait
mutex.synchronize do
sleep(0.01)
condvar.signal
end
# If wait_timeout thread didn't remove himself
# from condvar sleepers, than signale tries to
# wakeup wait_timeout instead of wait_forever
# and fatal deadlock occures in a line below
wait_timeout.join
wait_forever.join
end
def test_local_barrier
dir = File.dirname(__FILE__)
lbtest = File.join(dir, "lbtest.rb")
-
lib/thread.rb
# even if no other thread doesn't signal.
#
def wait(mutex, timeout=nil)
begin
# TODO: mutex should not be used
@waiters_mutex.synchronize do
@waiters.push(Thread.current)
end
mutex.sleep timeout
ensure
# TODO: mutex should not be used
@waiters_mutex.synchronize do
@waiters.push(Thread.current)
end
mutex.sleep timeout do
@waiters_mutex.synchronize do
@waiters.delete(Thread.current)
end
thread.c
}
static VALUE
rb_mutex_sleep_forever(VALUE time)
wrap_thread_sleep_deadly()
{
rb_thread_sleep_deadly();
return Qnil;
}
static VALUE
rb_mutex_wait_for(VALUE time)
rb_mutex_sleep_forever(VALUE time)
{
if (rb_block_given_p()) {
return rb_ensure(wrap_thread_sleep_deadly, Qnil, rb_yield, Qnil);
}
return wrap_thread_sleep_deadly();
}
static VALUE
wrap_rb_thread_wait_for(VALUE time)
{
const struct timeval *t = (struct timeval *)time;
rb_thread_wait_for(*t);
return Qnil;
}
static VALUE
rb_mutex_wait_for(VALUE time)
{
if (rb_block_given_p()) {
return rb_ensure(wrap_rb_thread_wait_for, time, rb_yield, Qnil);
}
return wrap_rb_thread_wait_for(time);
}
VALUE
rb_mutex_sleep(VALUE self, VALUE timeout)
{
-
lib/thread.rb
Thread.abort_on_exception = true
end
unless defined?(Thread::RELY_ON_GVL)
Thread::RELY_ON_GVL = false
end
#
# ConditionVariable objects augment class Mutex. Using condition variables,
# it is possible to suspend while in the middle of a critical section until a
......
# even if no other thread doesn't signal.
#
def wait(mutex, timeout=nil)
# TODO: mutex should not be used
# Rely on GVL for sychronizing @waiters.push
@waiters.push(Thread.current)
mutex.sleep timeout do
# We could not rely on GVL cause compare is called
@waiters_mutex.synchronize do
@waiters.delete(Thread.current)
end
end
self
end if Thread::RELY_ON_GVL
def wait(mutex, timeout=nil) # :nodoc:
@waiters_mutex.synchronize do
@waiters.push(Thread.current)
end
......
end
end
self
end
end unless Thread::RELY_ON_GVL
#
# Wakes up the first thread in line waiting for this lock.
#
def signal
begin
t = @waiters_mutex.synchronize {@waiters.shift}
t = @waiters.shift
t.run if t
rescue ThreadError
retry
end
self
end
end if Thread::RELY_ON_GVL
def signal # :nodoc:
begin
t = @waiters_mutex.synchronize { @waiters.shift }
t.run if t
rescue ThreadError
retry
end
self
end unless Thread::RELY_ON_GVL
#
# Wakes up all threads waiting for this lock.
thread.c
rb_define_singleton_method(rb_cThread, "list", rb_thread_list, 0);
rb_define_singleton_method(rb_cThread, "abort_on_exception", rb_thread_s_abort_exc, 0);
rb_define_singleton_method(rb_cThread, "abort_on_exception=", rb_thread_s_abort_exc_set, 1);
rb_define_const(rb_cThread, "RELY_ON_GVL", Qtrue);
#if THREAD_DEBUG < 0
rb_define_singleton_method(rb_cThread, "DEBUG", rb_thread_s_debug, 0);
rb_define_singleton_method(rb_cThread, "DEBUG=", rb_thread_s_debug_set, 1);
-
lib/thread.rb
# Creates a new ConditionVariable
#
def initialize
@waiters = []
@waiters = {}
@waiters.compare_by_identity
@waiters_mutex = Mutex.new
end
......
#
def wait(mutex, timeout=nil)
# Rely on GVL for sychronizing @waiters.push
@waiters.push(Thread.current)
@waiters[Thread.current] = true
mutex.sleep timeout do
# We could not rely on GVL cause compare is called
@waiters_mutex.synchronize do
@waiters.delete(Thread.current)
end
# We could rely on GVL cause hash were set to compare_by_identity mode
@waiters.delete(Thread.current)
end
self
end if Thread::RELY_ON_GVL
def wait(mutex, timeout=nil) # :nodoc:
@waiters_mutex.synchronize do
@waiters.push(Thread.current)
@waiters[Thread.current] = true
end
mutex.sleep timeout do
@waiters_mutex.synchronize do
......
#
def signal
begin
t = @waiters.shift
t, _ = @waiters.shift
t.run if t
rescue ThreadError
retry
......
def signal # :nodoc:
begin
t = @waiters_mutex.synchronize { @waiters.shift }
t, _ = @waiters_mutex.synchronize { @waiters.shift }
t.run if t
rescue ThreadError
retry
......
# TODO: incomplete
waiters0 = nil
@waiters_mutex.synchronize do
waiters0 = @waiters.dup
waiters0 = @waiters.keys
@waiters.clear
end
for t in waiters0
......
#
def initialize
@que = []
@waiting = []
@waiting = {}
@waiting.compare_by_identity
@que.taint # enable tainted communication
@waiting.taint
self.taint
@mutex = Mutex.new
end
def push_no_sync(obj) # :nodoc:
@que.push obj
begin
t, _ = @waiting.shift
t.wakeup if t
rescue ThreadError
retry
end
end
private :push_no_sync
#
# Pushes +obj+ to the queue.
#
def push(obj)
@mutex.synchronize{
@que.push obj
begin
t = @waiting.shift
t.wakeup if t
rescue ThreadError
retry
end
}
@mutex.synchronize{ push_no_sync(obj) }
end
#
......
while true
if @que.empty?
raise ThreadError, "queue empty" if non_block
# @waiting.include? check is necessary for avoiding a race against
# Thread.wakeup [Bug 5195]
@waiting.push Thread.current unless @waiting.include?(Thread.current)
@waiting[Thread.current] = true
@mutex.sleep
else
return @que.shift
......
def initialize(max)
raise ArgumentError, "queue size must be positive" unless max > 0
@max = max
@queue_wait = []
@queue_wait = {}
@queue_wait.compare_by_identity
@queue_wait.taint # enable tainted comunication
super()
end
......
if diff
diff.times do
begin
t = @queue_wait.shift
t, _ = @queue_wait.shift
t.run if t
rescue ThreadError
retry
......
begin
while true
break if @que.length < @max
@queue_wait.push Thread.current unless @queue_wait.include?(Thread.current)
@queue_wait[Thread.current] = true
@mutex.sleep
end
ensure
@queue_wait.delete(Thread.current)
end
@que.push obj
begin
t = @waiting.shift
t.wakeup if t
rescue ThreadError
retry
end
push_no_sync obj
}
end
......
@mutex.synchronize {
if @que.length < @max
begin
t = @queue_wait.shift
t, _ = @queue_wait.shift
t.wakeup if t
rescue ThreadError
retry
-
lib/thread.rb
#
alias enq push
def pop_no_sync(non_block) # :nodoc:
if non_block
raise ThreadError, "queue empty" if @que.empty?
else
while @que.empty?
@waiting[Thread.current] = true
@mutex.sleep
end
end
@que.shift
ensure
@waiting.delete(Thread.current)
end
private :pop_no_sync
#
# Retrieves data from the queue. If the queue is empty, the calling thread is
# suspended until data is pushed onto the queue. If +non_block+ is true, the
# thread isn't suspended, and an exception is raised.
#
def pop(non_block=false)
@mutex.synchronize{
begin
while true
if @que.empty?
raise ThreadError, "queue empty" if non_block
@waiting[Thread.current] = true
@mutex.sleep
else
return @que.shift
end
end
ensure
@waiting.delete(Thread.current)
end
}
@mutex.synchronize{ pop_no_sync(non_block) }
end
#
......
@max
end
def wakeup_queue_waiter # :nodoc:
t, _ = @queue_wait.shift
t.wakeup if t
rescue ThreadError
retry
end
private :wakeup_queue_waiter
#
# Sets the maximum size of the queue.
#
def max=(max)
raise ArgumentError, "queue size must be positive" unless max > 0
diff = nil
@mutex.synchronize {
if max <= @max
@max = max
else
diff = max - @max
@max = max
diff = max - @max
@max = max
if diff > 0
diff.times { wakeup_queue_waiter }
end
}
if diff
diff.times do
begin
t, _ = @queue_wait.shift
t.run if t
rescue ThreadError
retry
end
end
end
max
end
......
def push(obj)
@mutex.synchronize{
begin
while true
break if @que.length < @max
while @que.length >= @max
@queue_wait[Thread.current] = true
@mutex.sleep
end
......
#
# Retrieves data from the queue and runs a waiting thread, if any.
#
def pop(*args)
retval = super
def pop(non_block=false)
@mutex.synchronize {
if @que.length < @max
begin
t, _ = @queue_wait.shift
t.wakeup if t
rescue ThreadError
retry
end
end
retval = pop_no_sync(non_block)
wakeup_queue_waiter if @que.length < @max
retval
}
retval
end
#
    (1-1/1)