Feature #7390 ยป funny_falcon_threads.patch
test/ruby/test_thread.rb | ||
---|---|---|
end.join
|
||
end
|
||
class Serializer
|
||
def initialize
|
||
@mutex = Mutex.new
|
||
@condvar = ConditionVariable.new
|
||
end
|
||
def wait
|
||
@mutex.synchronize{
|
||
@condvar.wait(@mutex)
|
||
}
|
||
end
|
||
def signal
|
||
@mutex.synchronize{ @condvar.signal }
|
||
end
|
||
end
|
||
def test_condvar_wait_timeout
|
||
serialize = Serializer.new
|
||
mutex = Mutex.new
|
||
condvar = ConditionVariable.new
|
||
def condvar.waiters
|
||
@waiters
|
||
end
|
||
thread = Thread.new do
|
||
serialize.signal
|
||
mutex.synchronize do
|
||
condvar.wait(mutex, 0.001)
|
||
end
|
||
end
|
||
serialize.wait
|
||
mutex.synchronize do
|
||
sleep(0.01)
|
||
assert_not_includes(condvar.waiters, thread)
|
||
end
|
||
end
|
||
def test_condvar_wait_timeout_2
|
||
serialize = Serializer.new
|
||
mutex = Mutex.new
|
||
condvar = ConditionVariable.new
|
||
wait_timeout = Thread.new do
|
||
serialize.signal
|
||
mutex.synchronize do
|
||
condvar.wait(mutex, 0.001)
|
||
end
|
||
end
|
||
serialize.wait
|
||
wait_forever = Thread.new do
|
||
serialize.signal
|
||
mutex.synchronize do
|
||
condvar.wait(mutex)
|
||
end
|
||
end
|
||
serialize.wait
|
||
mutex.synchronize do
|
||
sleep(0.01)
|
||
condvar.signal
|
||
end
|
||
# If wait_timeout thread didn't remove himself
|
||
# from condvar sleepers, than signale tries to
|
||
# wakeup wait_timeout instead of wait_forever
|
||
# and fatal deadlock occures in a line below
|
||
wait_timeout.join
|
||
wait_forever.join
|
||
end
|
||
def test_local_barrier
|
||
dir = File.dirname(__FILE__)
|
||
lbtest = File.join(dir, "lbtest.rb")
|
||
-
|
lib/thread.rb | ||
---|---|---|
# even if no other thread doesn't signal.
|
||
#
|
||
def wait(mutex, timeout=nil)
|
||
begin
|
||
# TODO: mutex should not be used
|
||
@waiters_mutex.synchronize do
|
||
@waiters.push(Thread.current)
|
||
end
|
||
mutex.sleep timeout
|
||
ensure
|
||
# TODO: mutex should not be used
|
||
@waiters_mutex.synchronize do
|
||
@waiters.push(Thread.current)
|
||
end
|
||
mutex.sleep timeout do
|
||
@waiters_mutex.synchronize do
|
||
@waiters.delete(Thread.current)
|
||
end
|
thread.c | ||
---|---|---|
}
|
||
static VALUE
|
||
rb_mutex_sleep_forever(VALUE time)
|
||
wrap_thread_sleep_deadly()
|
||
{
|
||
rb_thread_sleep_deadly();
|
||
return Qnil;
|
||
}
|
||
static VALUE
|
||
rb_mutex_wait_for(VALUE time)
|
||
rb_mutex_sleep_forever(VALUE time)
|
||
{
|
||
if (rb_block_given_p()) {
|
||
return rb_ensure(wrap_thread_sleep_deadly, Qnil, rb_yield, Qnil);
|
||
}
|
||
return wrap_thread_sleep_deadly();
|
||
}
|
||
static VALUE
|
||
wrap_rb_thread_wait_for(VALUE time)
|
||
{
|
||
const struct timeval *t = (struct timeval *)time;
|
||
rb_thread_wait_for(*t);
|
||
return Qnil;
|
||
}
|
||
static VALUE
|
||
rb_mutex_wait_for(VALUE time)
|
||
{
|
||
if (rb_block_given_p()) {
|
||
return rb_ensure(wrap_rb_thread_wait_for, time, rb_yield, Qnil);
|
||
}
|
||
return wrap_rb_thread_wait_for(time);
|
||
}
|
||
VALUE
|
||
rb_mutex_sleep(VALUE self, VALUE timeout)
|
||
{
|
||
-
|
lib/thread.rb | ||
---|---|---|
Thread.abort_on_exception = true
|
||
end
|
||
unless defined?(Thread::RELY_ON_GVL)
|
||
Thread::RELY_ON_GVL = false
|
||
end
|
||
#
|
||
# ConditionVariable objects augment class Mutex. Using condition variables,
|
||
# it is possible to suspend while in the middle of a critical section until a
|
||
... | ... | |
# even if no other thread doesn't signal.
|
||
#
|
||
def wait(mutex, timeout=nil)
|
||
# TODO: mutex should not be used
|
||
# Rely on GVL for sychronizing @waiters.push
|
||
@waiters.push(Thread.current)
|
||
mutex.sleep timeout do
|
||
# We could not rely on GVL cause compare is called
|
||
@waiters_mutex.synchronize do
|
||
@waiters.delete(Thread.current)
|
||
end
|
||
end
|
||
self
|
||
end if Thread::RELY_ON_GVL
|
||
def wait(mutex, timeout=nil) # :nodoc:
|
||
@waiters_mutex.synchronize do
|
||
@waiters.push(Thread.current)
|
||
end
|
||
... | ... | |
end
|
||
end
|
||
self
|
||
end
|
||
end unless Thread::RELY_ON_GVL
|
||
#
|
||
# Wakes up the first thread in line waiting for this lock.
|
||
#
|
||
def signal
|
||
begin
|
||
t = @waiters_mutex.synchronize {@waiters.shift}
|
||
t = @waiters.shift
|
||
t.run if t
|
||
rescue ThreadError
|
||
retry
|
||
end
|
||
self
|
||
end
|
||
end if Thread::RELY_ON_GVL
|
||
def signal # :nodoc:
|
||
begin
|
||
t = @waiters_mutex.synchronize { @waiters.shift }
|
||
t.run if t
|
||
rescue ThreadError
|
||
retry
|
||
end
|
||
self
|
||
end unless Thread::RELY_ON_GVL
|
||
#
|
||
# Wakes up all threads waiting for this lock.
|
thread.c | ||
---|---|---|
rb_define_singleton_method(rb_cThread, "list", rb_thread_list, 0);
|
||
rb_define_singleton_method(rb_cThread, "abort_on_exception", rb_thread_s_abort_exc, 0);
|
||
rb_define_singleton_method(rb_cThread, "abort_on_exception=", rb_thread_s_abort_exc_set, 1);
|
||
rb_define_const(rb_cThread, "RELY_ON_GVL", Qtrue);
|
||
#if THREAD_DEBUG < 0
|
||
rb_define_singleton_method(rb_cThread, "DEBUG", rb_thread_s_debug, 0);
|
||
rb_define_singleton_method(rb_cThread, "DEBUG=", rb_thread_s_debug_set, 1);
|
||
-
|
lib/thread.rb | ||
---|---|---|
# Creates a new ConditionVariable
|
||
#
|
||
def initialize
|
||
@waiters = []
|
||
@waiters = {}
|
||
@waiters.compare_by_identity
|
||
@waiters_mutex = Mutex.new
|
||
end
|
||
... | ... | |
#
|
||
def wait(mutex, timeout=nil)
|
||
# Rely on GVL for sychronizing @waiters.push
|
||
@waiters.push(Thread.current)
|
||
@waiters[Thread.current] = true
|
||
mutex.sleep timeout do
|
||
# We could not rely on GVL cause compare is called
|
||
@waiters_mutex.synchronize do
|
||
@waiters.delete(Thread.current)
|
||
end
|
||
# We could rely on GVL cause hash were set to compare_by_identity mode
|
||
@waiters.delete(Thread.current)
|
||
end
|
||
self
|
||
end if Thread::RELY_ON_GVL
|
||
def wait(mutex, timeout=nil) # :nodoc:
|
||
@waiters_mutex.synchronize do
|
||
@waiters.push(Thread.current)
|
||
@waiters[Thread.current] = true
|
||
end
|
||
mutex.sleep timeout do
|
||
@waiters_mutex.synchronize do
|
||
... | ... | |
#
|
||
def signal
|
||
begin
|
||
t = @waiters.shift
|
||
t, _ = @waiters.shift
|
||
t.run if t
|
||
rescue ThreadError
|
||
retry
|
||
... | ... | |
def signal # :nodoc:
|
||
begin
|
||
t = @waiters_mutex.synchronize { @waiters.shift }
|
||
t, _ = @waiters_mutex.synchronize { @waiters.shift }
|
||
t.run if t
|
||
rescue ThreadError
|
||
retry
|
||
... | ... | |
# TODO: incomplete
|
||
waiters0 = nil
|
||
@waiters_mutex.synchronize do
|
||
waiters0 = @waiters.dup
|
||
waiters0 = @waiters.keys
|
||
@waiters.clear
|
||
end
|
||
for t in waiters0
|
||
... | ... | |
#
|
||
def initialize
|
||
@que = []
|
||
@waiting = []
|
||
@waiting = {}
|
||
@waiting.compare_by_identity
|
||
@que.taint # enable tainted communication
|
||
@waiting.taint
|
||
self.taint
|
||
@mutex = Mutex.new
|
||
end
|
||
def push_no_sync(obj) # :nodoc:
|
||
@que.push obj
|
||
begin
|
||
t, _ = @waiting.shift
|
||
t.wakeup if t
|
||
rescue ThreadError
|
||
retry
|
||
end
|
||
end
|
||
private :push_no_sync
|
||
#
|
||
# Pushes +obj+ to the queue.
|
||
#
|
||
def push(obj)
|
||
@mutex.synchronize{
|
||
@que.push obj
|
||
begin
|
||
t = @waiting.shift
|
||
t.wakeup if t
|
||
rescue ThreadError
|
||
retry
|
||
end
|
||
}
|
||
@mutex.synchronize{ push_no_sync(obj) }
|
||
end
|
||
#
|
||
... | ... | |
while true
|
||
if @que.empty?
|
||
raise ThreadError, "queue empty" if non_block
|
||
# @waiting.include? check is necessary for avoiding a race against
|
||
# Thread.wakeup [Bug 5195]
|
||
@waiting.push Thread.current unless @waiting.include?(Thread.current)
|
||
@waiting[Thread.current] = true
|
||
@mutex.sleep
|
||
else
|
||
return @que.shift
|
||
... | ... | |
def initialize(max)
|
||
raise ArgumentError, "queue size must be positive" unless max > 0
|
||
@max = max
|
||
@queue_wait = []
|
||
@queue_wait = {}
|
||
@queue_wait.compare_by_identity
|
||
@queue_wait.taint # enable tainted comunication
|
||
super()
|
||
end
|
||
... | ... | |
if diff
|
||
diff.times do
|
||
begin
|
||
t = @queue_wait.shift
|
||
t, _ = @queue_wait.shift
|
||
t.run if t
|
||
rescue ThreadError
|
||
retry
|
||
... | ... | |
begin
|
||
while true
|
||
break if @que.length < @max
|
||
@queue_wait.push Thread.current unless @queue_wait.include?(Thread.current)
|
||
@queue_wait[Thread.current] = true
|
||
@mutex.sleep
|
||
end
|
||
ensure
|
||
@queue_wait.delete(Thread.current)
|
||
end
|
||
@que.push obj
|
||
begin
|
||
t = @waiting.shift
|
||
t.wakeup if t
|
||
rescue ThreadError
|
||
retry
|
||
end
|
||
push_no_sync obj
|
||
}
|
||
end
|
||
... | ... | |
@mutex.synchronize {
|
||
if @que.length < @max
|
||
begin
|
||
t = @queue_wait.shift
|
||
t, _ = @queue_wait.shift
|
||
t.wakeup if t
|
||
rescue ThreadError
|
||
retry
|
||
-
|
lib/thread.rb | ||
---|---|---|
#
|
||
alias enq push
|
||
def pop_no_sync(non_block) # :nodoc:
|
||
if non_block
|
||
raise ThreadError, "queue empty" if @que.empty?
|
||
else
|
||
while @que.empty?
|
||
@waiting[Thread.current] = true
|
||
@mutex.sleep
|
||
end
|
||
end
|
||
@que.shift
|
||
ensure
|
||
@waiting.delete(Thread.current)
|
||
end
|
||
private :pop_no_sync
|
||
#
|
||
# Retrieves data from the queue. If the queue is empty, the calling thread is
|
||
# suspended until data is pushed onto the queue. If +non_block+ is true, the
|
||
# thread isn't suspended, and an exception is raised.
|
||
#
|
||
def pop(non_block=false)
|
||
@mutex.synchronize{
|
||
begin
|
||
while true
|
||
if @que.empty?
|
||
raise ThreadError, "queue empty" if non_block
|
||
@waiting[Thread.current] = true
|
||
@mutex.sleep
|
||
else
|
||
return @que.shift
|
||
end
|
||
end
|
||
ensure
|
||
@waiting.delete(Thread.current)
|
||
end
|
||
}
|
||
@mutex.synchronize{ pop_no_sync(non_block) }
|
||
end
|
||
#
|
||
... | ... | |
@max
|
||
end
|
||
def wakeup_queue_waiter # :nodoc:
|
||
t, _ = @queue_wait.shift
|
||
t.wakeup if t
|
||
rescue ThreadError
|
||
retry
|
||
end
|
||
private :wakeup_queue_waiter
|
||
#
|
||
# Sets the maximum size of the queue.
|
||
#
|
||
def max=(max)
|
||
raise ArgumentError, "queue size must be positive" unless max > 0
|
||
diff = nil
|
||
@mutex.synchronize {
|
||
if max <= @max
|
||
@max = max
|
||
else
|
||
diff = max - @max
|
||
@max = max
|
||
diff = max - @max
|
||
@max = max
|
||
if diff > 0
|
||
diff.times { wakeup_queue_waiter }
|
||
end
|
||
}
|
||
if diff
|
||
diff.times do
|
||
begin
|
||
t, _ = @queue_wait.shift
|
||
t.run if t
|
||
rescue ThreadError
|
||
retry
|
||
end
|
||
end
|
||
end
|
||
max
|
||
end
|
||
... | ... | |
def push(obj)
|
||
@mutex.synchronize{
|
||
begin
|
||
while true
|
||
break if @que.length < @max
|
||
while @que.length >= @max
|
||
@queue_wait[Thread.current] = true
|
||
@mutex.sleep
|
||
end
|
||
... | ... | |
#
|
||
# Retrieves data from the queue and runs a waiting thread, if any.
|
||
#
|
||
def pop(*args)
|
||
retval = super
|
||
def pop(non_block=false)
|
||
@mutex.synchronize {
|
||
if @que.length < @max
|
||
begin
|
||
t, _ = @queue_wait.shift
|
||
t.wakeup if t
|
||
rescue ThreadError
|
||
retry
|
||
end
|
||
end
|
||
retval = pop_no_sync(non_block)
|
||
wakeup_queue_waiter if @que.length < @max
|
||
retval
|
||
}
|
||
retval
|
||
end
|
||
#
|