Feature #7390 ยป funny_falcon_threads.patch
| test/ruby/test_thread.rb | ||
|---|---|---|
|
end.join
|
||
|
end
|
||
|
class Serializer
|
||
|
def initialize
|
||
|
@mutex = Mutex.new
|
||
|
@condvar = ConditionVariable.new
|
||
|
end
|
||
|
def wait
|
||
|
@mutex.synchronize{
|
||
|
@condvar.wait(@mutex)
|
||
|
}
|
||
|
end
|
||
|
def signal
|
||
|
@mutex.synchronize{ @condvar.signal }
|
||
|
end
|
||
|
end
|
||
|
def test_condvar_wait_timeout
|
||
|
serialize = Serializer.new
|
||
|
mutex = Mutex.new
|
||
|
condvar = ConditionVariable.new
|
||
|
def condvar.waiters
|
||
|
@waiters
|
||
|
end
|
||
|
thread = Thread.new do
|
||
|
serialize.signal
|
||
|
mutex.synchronize do
|
||
|
condvar.wait(mutex, 0.001)
|
||
|
end
|
||
|
end
|
||
|
serialize.wait
|
||
|
mutex.synchronize do
|
||
|
sleep(0.01)
|
||
|
assert_not_includes(condvar.waiters, thread)
|
||
|
end
|
||
|
end
|
||
|
def test_condvar_wait_timeout_2
|
||
|
serialize = Serializer.new
|
||
|
mutex = Mutex.new
|
||
|
condvar = ConditionVariable.new
|
||
|
wait_timeout = Thread.new do
|
||
|
serialize.signal
|
||
|
mutex.synchronize do
|
||
|
condvar.wait(mutex, 0.001)
|
||
|
end
|
||
|
end
|
||
|
serialize.wait
|
||
|
wait_forever = Thread.new do
|
||
|
serialize.signal
|
||
|
mutex.synchronize do
|
||
|
condvar.wait(mutex)
|
||
|
end
|
||
|
end
|
||
|
serialize.wait
|
||
|
mutex.synchronize do
|
||
|
sleep(0.01)
|
||
|
condvar.signal
|
||
|
end
|
||
|
# If wait_timeout thread didn't remove himself
|
||
|
# from condvar sleepers, than signale tries to
|
||
|
# wakeup wait_timeout instead of wait_forever
|
||
|
# and fatal deadlock occures in a line below
|
||
|
wait_timeout.join
|
||
|
wait_forever.join
|
||
|
end
|
||
|
def test_local_barrier
|
||
|
dir = File.dirname(__FILE__)
|
||
|
lbtest = File.join(dir, "lbtest.rb")
|
||
|
-
|
||
| lib/thread.rb | ||
|---|---|---|
|
# even if no other thread doesn't signal.
|
||
|
#
|
||
|
def wait(mutex, timeout=nil)
|
||
|
begin
|
||
|
# TODO: mutex should not be used
|
||
|
@waiters_mutex.synchronize do
|
||
|
@waiters.push(Thread.current)
|
||
|
end
|
||
|
mutex.sleep timeout
|
||
|
ensure
|
||
|
# TODO: mutex should not be used
|
||
|
@waiters_mutex.synchronize do
|
||
|
@waiters.push(Thread.current)
|
||
|
end
|
||
|
mutex.sleep timeout do
|
||
|
@waiters_mutex.synchronize do
|
||
|
@waiters.delete(Thread.current)
|
||
|
end
|
||
| thread.c | ||
|---|---|---|
|
}
|
||
|
static VALUE
|
||
|
rb_mutex_sleep_forever(VALUE time)
|
||
|
wrap_thread_sleep_deadly()
|
||
|
{
|
||
|
rb_thread_sleep_deadly();
|
||
|
return Qnil;
|
||
|
}
|
||
|
static VALUE
|
||
|
rb_mutex_wait_for(VALUE time)
|
||
|
rb_mutex_sleep_forever(VALUE time)
|
||
|
{
|
||
|
if (rb_block_given_p()) {
|
||
|
return rb_ensure(wrap_thread_sleep_deadly, Qnil, rb_yield, Qnil);
|
||
|
}
|
||
|
return wrap_thread_sleep_deadly();
|
||
|
}
|
||
|
static VALUE
|
||
|
wrap_rb_thread_wait_for(VALUE time)
|
||
|
{
|
||
|
const struct timeval *t = (struct timeval *)time;
|
||
|
rb_thread_wait_for(*t);
|
||
|
return Qnil;
|
||
|
}
|
||
|
static VALUE
|
||
|
rb_mutex_wait_for(VALUE time)
|
||
|
{
|
||
|
if (rb_block_given_p()) {
|
||
|
return rb_ensure(wrap_rb_thread_wait_for, time, rb_yield, Qnil);
|
||
|
}
|
||
|
return wrap_rb_thread_wait_for(time);
|
||
|
}
|
||
|
VALUE
|
||
|
rb_mutex_sleep(VALUE self, VALUE timeout)
|
||
|
{
|
||
|
-
|
||
| lib/thread.rb | ||
|---|---|---|
|
Thread.abort_on_exception = true
|
||
|
end
|
||
|
unless defined?(Thread::RELY_ON_GVL)
|
||
|
Thread::RELY_ON_GVL = false
|
||
|
end
|
||
|
#
|
||
|
# ConditionVariable objects augment class Mutex. Using condition variables,
|
||
|
# it is possible to suspend while in the middle of a critical section until a
|
||
| ... | ... | |
|
# even if no other thread doesn't signal.
|
||
|
#
|
||
|
def wait(mutex, timeout=nil)
|
||
|
# TODO: mutex should not be used
|
||
|
# Rely on GVL for sychronizing @waiters.push
|
||
|
@waiters.push(Thread.current)
|
||
|
mutex.sleep timeout do
|
||
|
# We could not rely on GVL cause compare is called
|
||
|
@waiters_mutex.synchronize do
|
||
|
@waiters.delete(Thread.current)
|
||
|
end
|
||
|
end
|
||
|
self
|
||
|
end if Thread::RELY_ON_GVL
|
||
|
def wait(mutex, timeout=nil) # :nodoc:
|
||
|
@waiters_mutex.synchronize do
|
||
|
@waiters.push(Thread.current)
|
||
|
end
|
||
| ... | ... | |
|
end
|
||
|
end
|
||
|
self
|
||
|
end
|
||
|
end unless Thread::RELY_ON_GVL
|
||
|
#
|
||
|
# Wakes up the first thread in line waiting for this lock.
|
||
|
#
|
||
|
def signal
|
||
|
begin
|
||
|
t = @waiters_mutex.synchronize {@waiters.shift}
|
||
|
t = @waiters.shift
|
||
|
t.run if t
|
||
|
rescue ThreadError
|
||
|
retry
|
||
|
end
|
||
|
self
|
||
|
end
|
||
|
end if Thread::RELY_ON_GVL
|
||
|
def signal # :nodoc:
|
||
|
begin
|
||
|
t = @waiters_mutex.synchronize { @waiters.shift }
|
||
|
t.run if t
|
||
|
rescue ThreadError
|
||
|
retry
|
||
|
end
|
||
|
self
|
||
|
end unless Thread::RELY_ON_GVL
|
||
|
#
|
||
|
# Wakes up all threads waiting for this lock.
|
||
| thread.c | ||
|---|---|---|
|
rb_define_singleton_method(rb_cThread, "list", rb_thread_list, 0);
|
||
|
rb_define_singleton_method(rb_cThread, "abort_on_exception", rb_thread_s_abort_exc, 0);
|
||
|
rb_define_singleton_method(rb_cThread, "abort_on_exception=", rb_thread_s_abort_exc_set, 1);
|
||
|
rb_define_const(rb_cThread, "RELY_ON_GVL", Qtrue);
|
||
|
#if THREAD_DEBUG < 0
|
||
|
rb_define_singleton_method(rb_cThread, "DEBUG", rb_thread_s_debug, 0);
|
||
|
rb_define_singleton_method(rb_cThread, "DEBUG=", rb_thread_s_debug_set, 1);
|
||
|
-
|
||
| lib/thread.rb | ||
|---|---|---|
|
# Creates a new ConditionVariable
|
||
|
#
|
||
|
def initialize
|
||
|
@waiters = []
|
||
|
@waiters = {}
|
||
|
@waiters.compare_by_identity
|
||
|
@waiters_mutex = Mutex.new
|
||
|
end
|
||
| ... | ... | |
|
#
|
||
|
def wait(mutex, timeout=nil)
|
||
|
# Rely on GVL for sychronizing @waiters.push
|
||
|
@waiters.push(Thread.current)
|
||
|
@waiters[Thread.current] = true
|
||
|
mutex.sleep timeout do
|
||
|
# We could not rely on GVL cause compare is called
|
||
|
@waiters_mutex.synchronize do
|
||
|
@waiters.delete(Thread.current)
|
||
|
end
|
||
|
# We could rely on GVL cause hash were set to compare_by_identity mode
|
||
|
@waiters.delete(Thread.current)
|
||
|
end
|
||
|
self
|
||
|
end if Thread::RELY_ON_GVL
|
||
|
def wait(mutex, timeout=nil) # :nodoc:
|
||
|
@waiters_mutex.synchronize do
|
||
|
@waiters.push(Thread.current)
|
||
|
@waiters[Thread.current] = true
|
||
|
end
|
||
|
mutex.sleep timeout do
|
||
|
@waiters_mutex.synchronize do
|
||
| ... | ... | |
|
#
|
||
|
def signal
|
||
|
begin
|
||
|
t = @waiters.shift
|
||
|
t, _ = @waiters.shift
|
||
|
t.run if t
|
||
|
rescue ThreadError
|
||
|
retry
|
||
| ... | ... | |
|
def signal # :nodoc:
|
||
|
begin
|
||
|
t = @waiters_mutex.synchronize { @waiters.shift }
|
||
|
t, _ = @waiters_mutex.synchronize { @waiters.shift }
|
||
|
t.run if t
|
||
|
rescue ThreadError
|
||
|
retry
|
||
| ... | ... | |
|
# TODO: incomplete
|
||
|
waiters0 = nil
|
||
|
@waiters_mutex.synchronize do
|
||
|
waiters0 = @waiters.dup
|
||
|
waiters0 = @waiters.keys
|
||
|
@waiters.clear
|
||
|
end
|
||
|
for t in waiters0
|
||
| ... | ... | |
|
#
|
||
|
def initialize
|
||
|
@que = []
|
||
|
@waiting = []
|
||
|
@waiting = {}
|
||
|
@waiting.compare_by_identity
|
||
|
@que.taint # enable tainted communication
|
||
|
@waiting.taint
|
||
|
self.taint
|
||
|
@mutex = Mutex.new
|
||
|
end
|
||
|
def push_no_sync(obj) # :nodoc:
|
||
|
@que.push obj
|
||
|
begin
|
||
|
t, _ = @waiting.shift
|
||
|
t.wakeup if t
|
||
|
rescue ThreadError
|
||
|
retry
|
||
|
end
|
||
|
end
|
||
|
private :push_no_sync
|
||
|
#
|
||
|
# Pushes +obj+ to the queue.
|
||
|
#
|
||
|
def push(obj)
|
||
|
@mutex.synchronize{
|
||
|
@que.push obj
|
||
|
begin
|
||
|
t = @waiting.shift
|
||
|
t.wakeup if t
|
||
|
rescue ThreadError
|
||
|
retry
|
||
|
end
|
||
|
}
|
||
|
@mutex.synchronize{ push_no_sync(obj) }
|
||
|
end
|
||
|
#
|
||
| ... | ... | |
|
while true
|
||
|
if @que.empty?
|
||
|
raise ThreadError, "queue empty" if non_block
|
||
|
# @waiting.include? check is necessary for avoiding a race against
|
||
|
# Thread.wakeup [Bug 5195]
|
||
|
@waiting.push Thread.current unless @waiting.include?(Thread.current)
|
||
|
@waiting[Thread.current] = true
|
||
|
@mutex.sleep
|
||
|
else
|
||
|
return @que.shift
|
||
| ... | ... | |
|
def initialize(max)
|
||
|
raise ArgumentError, "queue size must be positive" unless max > 0
|
||
|
@max = max
|
||
|
@queue_wait = []
|
||
|
@queue_wait = {}
|
||
|
@queue_wait.compare_by_identity
|
||
|
@queue_wait.taint # enable tainted comunication
|
||
|
super()
|
||
|
end
|
||
| ... | ... | |
|
if diff
|
||
|
diff.times do
|
||
|
begin
|
||
|
t = @queue_wait.shift
|
||
|
t, _ = @queue_wait.shift
|
||
|
t.run if t
|
||
|
rescue ThreadError
|
||
|
retry
|
||
| ... | ... | |
|
begin
|
||
|
while true
|
||
|
break if @que.length < @max
|
||
|
@queue_wait.push Thread.current unless @queue_wait.include?(Thread.current)
|
||
|
@queue_wait[Thread.current] = true
|
||
|
@mutex.sleep
|
||
|
end
|
||
|
ensure
|
||
|
@queue_wait.delete(Thread.current)
|
||
|
end
|
||
|
@que.push obj
|
||
|
begin
|
||
|
t = @waiting.shift
|
||
|
t.wakeup if t
|
||
|
rescue ThreadError
|
||
|
retry
|
||
|
end
|
||
|
push_no_sync obj
|
||
|
}
|
||
|
end
|
||
| ... | ... | |
|
@mutex.synchronize {
|
||
|
if @que.length < @max
|
||
|
begin
|
||
|
t = @queue_wait.shift
|
||
|
t, _ = @queue_wait.shift
|
||
|
t.wakeup if t
|
||
|
rescue ThreadError
|
||
|
retry
|
||
|
-
|
||
| lib/thread.rb | ||
|---|---|---|
|
#
|
||
|
alias enq push
|
||
|
def pop_no_sync(non_block) # :nodoc:
|
||
|
if non_block
|
||
|
raise ThreadError, "queue empty" if @que.empty?
|
||
|
else
|
||
|
while @que.empty?
|
||
|
@waiting[Thread.current] = true
|
||
|
@mutex.sleep
|
||
|
end
|
||
|
end
|
||
|
@que.shift
|
||
|
ensure
|
||
|
@waiting.delete(Thread.current)
|
||
|
end
|
||
|
private :pop_no_sync
|
||
|
#
|
||
|
# Retrieves data from the queue. If the queue is empty, the calling thread is
|
||
|
# suspended until data is pushed onto the queue. If +non_block+ is true, the
|
||
|
# thread isn't suspended, and an exception is raised.
|
||
|
#
|
||
|
def pop(non_block=false)
|
||
|
@mutex.synchronize{
|
||
|
begin
|
||
|
while true
|
||
|
if @que.empty?
|
||
|
raise ThreadError, "queue empty" if non_block
|
||
|
@waiting[Thread.current] = true
|
||
|
@mutex.sleep
|
||
|
else
|
||
|
return @que.shift
|
||
|
end
|
||
|
end
|
||
|
ensure
|
||
|
@waiting.delete(Thread.current)
|
||
|
end
|
||
|
}
|
||
|
@mutex.synchronize{ pop_no_sync(non_block) }
|
||
|
end
|
||
|
#
|
||
| ... | ... | |
|
@max
|
||
|
end
|
||
|
def wakeup_queue_waiter # :nodoc:
|
||
|
t, _ = @queue_wait.shift
|
||
|
t.wakeup if t
|
||
|
rescue ThreadError
|
||
|
retry
|
||
|
end
|
||
|
private :wakeup_queue_waiter
|
||
|
#
|
||
|
# Sets the maximum size of the queue.
|
||
|
#
|
||
|
def max=(max)
|
||
|
raise ArgumentError, "queue size must be positive" unless max > 0
|
||
|
diff = nil
|
||
|
@mutex.synchronize {
|
||
|
if max <= @max
|
||
|
@max = max
|
||
|
else
|
||
|
diff = max - @max
|
||
|
@max = max
|
||
|
diff = max - @max
|
||
|
@max = max
|
||
|
if diff > 0
|
||
|
diff.times { wakeup_queue_waiter }
|
||
|
end
|
||
|
}
|
||
|
if diff
|
||
|
diff.times do
|
||
|
begin
|
||
|
t, _ = @queue_wait.shift
|
||
|
t.run if t
|
||
|
rescue ThreadError
|
||
|
retry
|
||
|
end
|
||
|
end
|
||
|
end
|
||
|
max
|
||
|
end
|
||
| ... | ... | |
|
def push(obj)
|
||
|
@mutex.synchronize{
|
||
|
begin
|
||
|
while true
|
||
|
break if @que.length < @max
|
||
|
while @que.length >= @max
|
||
|
@queue_wait[Thread.current] = true
|
||
|
@mutex.sleep
|
||
|
end
|
||
| ... | ... | |
|
#
|
||
|
# Retrieves data from the queue and runs a waiting thread, if any.
|
||
|
#
|
||
|
def pop(*args)
|
||
|
retval = super
|
||
|
def pop(non_block=false)
|
||
|
@mutex.synchronize {
|
||
|
if @que.length < @max
|
||
|
begin
|
||
|
t, _ = @queue_wait.shift
|
||
|
t.wakeup if t
|
||
|
rescue ThreadError
|
||
|
retry
|
||
|
end
|
||
|
end
|
||
|
retval = pop_no_sync(non_block)
|
||
|
wakeup_queue_waiter if @que.length < @max
|
||
|
retval
|
||
|
}
|
||
|
retval
|
||
|
end
|
||
|
#
|
||