Feature #6293 » queue.patch
lib/thread/queue.rb | ||
---|---|---|
require 'timeout'
|
||
class Thread
|
||
# Thread::Queue is thread safe a FIFO queue. It provdies a way to synchronize
|
||
# communication between threads. This queue does not block when items are
|
||
# removed (see Thread::Queue#remove)
|
||
#
|
||
# This queue does not allow nil elements.
|
||
class Queue
|
||
class NoSuchElementError < StandardError
|
||
end
|
||
include Enumerable
|
||
#
|
||
# Creates a new queue.
|
||
#
|
||
def initialize
|
||
@que = []
|
||
@que.taint # enable tainted communication
|
||
self.taint
|
||
@mutex = Mutex.new
|
||
end
|
||
def each(&block)
|
||
@que.each(&block)
|
||
end
|
||
#
|
||
# Adds +obj+ to the head of the queue.
|
||
#
|
||
# Raises an ArgumentError if +obj+ is nil.
|
||
#
|
||
def add(obj)
|
||
raise(ArgumentError, 'nil is not allowed in this queue') if obj.nil?
|
||
@mutex.synchronize { @que.push obj }
|
||
self
|
||
end
|
||
alias :push :add
|
||
alias :<< :add
|
||
def offer(obj, timeout = nil)
|
||
add obj
|
||
end
|
||
# Retrieves data from the queue head, and removes it.
|
||
#
|
||
# Raises a NoSuchElementError if the queue is empty.
|
||
def remove
|
||
@mutex.synchronize {
|
||
raise NoSuchElementError if empty?
|
||
@que.shift
|
||
}
|
||
end
|
||
alias :pop :remove
|
||
alias :shift :remove
|
||
alias :deq :remove
|
||
# Retrieves data from the queue head, and removes it.
|
||
#
|
||
# Returns nil if this queue is empty.
|
||
def poll
|
||
@mutex.synchronize {
|
||
if empty?
|
||
nil
|
||
else
|
||
@que.shift
|
||
end
|
||
}
|
||
end
|
||
# Retrieves data from the queue head, but does not removes it.
|
||
#
|
||
# Returns nil if the queue is empty
|
||
def peek
|
||
@mutex.synchronize { @que.first }
|
||
end
|
||
alias :first :peek
|
||
# Retrieves data from the queue head, but does not removes it.
|
||
#
|
||
# Raises NoSuchElementError if the queue is empty.
|
||
def element
|
||
@mutex.synchronize {
|
||
if empty?
|
||
raise NoSuchElementError
|
||
else
|
||
@que.first
|
||
end
|
||
}
|
||
end
|
||
#
|
||
# Returns +true+ if the queue is empty.
|
||
#
|
||
def empty?
|
||
@que.empty?
|
||
end
|
||
#
|
||
# Removes all objects from the queue.
|
||
#
|
||
def clear
|
||
@que.clear
|
||
end
|
||
#
|
||
# Returns the length of the queue.
|
||
#
|
||
def length
|
||
@que.length
|
||
end
|
||
alias :size :length
|
||
end
|
||
# Thread::Queue is thread safe a FIFO queue. It provdies a way to synchronize
|
||
# communication between threads.
|
||
#
|
||
# This queue does not allow nil elements.
|
||
class BlockingQueue < Queue
|
||
def initialize
|
||
@waiting = []
|
||
@waiting.taint
|
||
super
|
||
end
|
||
# Retrieves data from the queue head, and removes it.
|
||
#
|
||
# If the queue is empty, remove will block until there is something
|
||
# in the queue.
|
||
def take
|
||
@mutex.synchronize {
|
||
while true
|
||
if @que.empty?
|
||
# @waiting.include? check is necessary for avoiding a race against
|
||
# Thread.wakeup [Bug 5195]
|
||
@waiting.push Thread.current unless @waiting.include?(Thread.current)
|
||
@mutex.sleep
|
||
else
|
||
return @que.shift
|
||
end
|
||
end
|
||
}
|
||
end
|
||
alias :pop :take
|
||
alias :shift :take
|
||
alias :deq :take
|
||
# Adds +obj+ to the head of the queue.
|
||
#
|
||
# Raises an ArgumentError if +obj+ is nil.
|
||
#
|
||
def add(obj)
|
||
raise ArgumentError if obj.nil?
|
||
@mutex.synchronize {
|
||
@que.push obj
|
||
begin
|
||
t = @waiting.shift
|
||
t.wakeup if t
|
||
rescue ThreadError
|
||
retry
|
||
end
|
||
}
|
||
self
|
||
end
|
||
alias :push :add
|
||
alias :<< :add
|
||
# Retrieves data from the queue head, and removes it.
|
||
#
|
||
# Blocks for +timeout+ seconds if the queue is empty, and returns nil if
|
||
# the timeout expires.
|
||
def poll(timeout = nil)
|
||
return super() unless timeout
|
||
begin
|
||
Timeout.timeout(timeout) do
|
||
take
|
||
end
|
||
rescue TimeoutError
|
||
nil
|
||
end
|
||
end
|
||
end
|
||
end
|
test/thread/helper.rb | ||
---|---|---|
require 'minitest/autorun'
|
||
require 'thread/queue'
|
||
class Thread
|
||
class TestCase < MiniTest::Unit::TestCase
|
||
class Latch
|
||
def initialize
|
||
@mutex = Mutex.new
|
||
@cond = ConditionVariable.new
|
||
end
|
||
def release
|
||
@mutex.synchronize { @cond.broadcast }
|
||
end
|
||
def await
|
||
@mutex.synchronize { @cond.wait @mutex }
|
||
end
|
||
end
|
||
attr_reader :queue
|
||
POISON = Object.new
|
||
def grind(num_threads, num_objects, num_iterations, klass, *args)
|
||
from_workers = klass.new(*args)
|
||
to_workers = klass.new(*args)
|
||
to_consumers = num_threads.times.map {
|
||
Thread.new {
|
||
while object = to_workers.pop
|
||
break if object == POISON
|
||
from_workers.push object
|
||
end
|
||
}
|
||
}
|
||
from_consumer = Thread.new {
|
||
num_iterations.times {
|
||
num_objects.times { from_workers.pop }
|
||
}
|
||
}
|
||
num_iterations.times {
|
||
num_objects.times { to_workers.push 99 }
|
||
}
|
||
num_threads.times { to_workers.push POISON }
|
||
to_consumers.each { |t| t.join }
|
||
from_consumer.join
|
||
assert_equal 0, from_workers.size
|
||
assert_equal 0, to_workers.size
|
||
end
|
||
def non_block_grind(num_threads, num_objects, num_iterations, klass, *args)
|
||
from_workers = klass.new(*args)
|
||
to_workers = klass.new(*args)
|
||
to_latch = Latch.new
|
||
from_latch = Latch.new
|
||
to_consumers = num_threads.times.map {
|
||
Thread.new {
|
||
to_latch.await
|
||
while object = to_workers.pop
|
||
break if object == POISON
|
||
from_workers.push object
|
||
end
|
||
}
|
||
}
|
||
from_consumer = Thread.new {
|
||
from_latch.await
|
||
num_iterations.times {
|
||
num_objects.times { from_workers.pop }
|
||
}
|
||
}
|
||
num_iterations.times {
|
||
num_objects.times { to_workers.push 99 }
|
||
}
|
||
num_threads.times { to_workers.push POISON }
|
||
Thread.pass until to_consumers.all? { |c| c.status == "sleep" }
|
||
Thread.pass until from_consumer.status == "sleep"
|
||
to_latch.release
|
||
to_consumers.each { |t| t.join }
|
||
from_latch.release
|
||
from_consumer.join
|
||
assert_equal 0, from_workers.size
|
||
assert_equal 0, to_workers.size
|
||
end
|
||
end
|
||
end
|
test/thread/test_blocking_queue.rb | ||
---|---|---|
require 'helper'
|
||
class Thread
|
||
class TestBlockingQueue < TestCase
|
||
attr_reader :queue
|
||
def setup
|
||
@queue = Thread::BlockingQueue.new
|
||
super
|
||
end
|
||
def test_add_returns_self
|
||
assert_equal queue, queue.add(1)
|
||
end
|
||
def test_queue
|
||
grind(5, 1000, 15, Thread::BlockingQueue)
|
||
end
|
||
def test_offer
|
||
assert queue.offer(1)
|
||
assert_equal 1, queue.length
|
||
end
|
||
def test_clear
|
||
10.times { |i| queue << i }
|
||
assert_equal 10, queue.length
|
||
queue.clear
|
||
assert_equal 0, queue.length
|
||
assert queue.empty?
|
||
end
|
||
def test_enumerable
|
||
10.times { |i| queue << i }
|
||
assert_equal 10.times.map { |i| i }, queue.to_a
|
||
list = []
|
||
queue.map { |x| list << x }
|
||
assert_equal 10.times.map { |i| i }, list
|
||
assert_equal 10.times.map { |i| i }, queue.to_a
|
||
refute_equal queue.to_a.object_id, queue.to_a.object_id
|
||
end
|
||
def test_add
|
||
queue.add "foo"
|
||
assert_equal "foo", queue.take
|
||
assert queue.empty?
|
||
end
|
||
def test_add_nil
|
||
assert_raises(ArgumentError) do
|
||
queue.add nil
|
||
end
|
||
end
|
||
def test_remove_empty
|
||
assert queue.empty?
|
||
t = Thread.new { queue.take }
|
||
queue << 1
|
||
assert_equal 1, t.join.value
|
||
end
|
||
def test_poll
|
||
queue.add "foo"
|
||
assert_equal "foo", queue.poll
|
||
end
|
||
def test_poll_empty
|
||
assert_nil queue.poll
|
||
end
|
||
def test_poll_timeout
|
||
assert_nil queue.poll(1)
|
||
t = Thread.new { queue.poll(10) }
|
||
queue << "foo"
|
||
assert_equal "foo", t.join.value
|
||
end
|
||
def test_peek
|
||
queue.add "foo"
|
||
assert_equal "foo", queue.peek
|
||
assert_equal "foo", queue.take
|
||
end
|
||
def test_first
|
||
queue.add "foo"
|
||
assert_equal "foo", queue.first
|
||
assert_equal "foo", queue.remove
|
||
end
|
||
def test_peek_empty
|
||
assert queue.empty?
|
||
assert_nil queue.peek
|
||
end
|
||
def test_element
|
||
queue.add "foo"
|
||
assert_equal "foo", queue.element
|
||
assert_equal "foo", queue.take
|
||
end
|
||
def test_element_empty
|
||
assert queue.empty?
|
||
assert_raises(Queue::NoSuchElementError) do
|
||
queue.element
|
||
end
|
||
end
|
||
end
|
||
end
|
test/thread/test_non_block_queue.rb | ||
---|---|---|
require 'helper'
|
||
class Thread
|
||
class TestQueue < TestCase
|
||
alias :grind :non_block_grind
|
||
def setup
|
||
super
|
||
@queue = Thread::Queue.new
|
||
end
|
||
def test_queue
|
||
grind(5, 1000, 15, Thread::Queue)
|
||
end
|
||
def test_add_returns_self
|
||
assert_equal queue, queue.add(1)
|
||
end
|
||
def test_offer
|
||
assert queue.offer(1)
|
||
end
|
||
def test_clear
|
||
10.times { |i| queue << i }
|
||
assert_equal 10, queue.length
|
||
queue.clear
|
||
assert_equal 0, queue.length
|
||
assert queue.empty?
|
||
end
|
||
def test_enumerable
|
||
10.times { |i| queue << i }
|
||
assert_equal 10.times.map { |i| i }, queue.to_a
|
||
list = []
|
||
queue.map { |x| list << x }
|
||
assert_equal 10.times.map { |i| i }, list
|
||
assert_equal 10.times.map { |i| i }, queue.to_a
|
||
refute_equal queue.to_a.object_id, queue.to_a.object_id
|
||
end
|
||
def test_add
|
||
queue.add "foo"
|
||
assert_equal "foo", queue.remove
|
||
assert queue.empty?
|
||
end
|
||
def test_add_nil
|
||
assert_raises(ArgumentError) do
|
||
queue.add nil
|
||
end
|
||
end
|
||
def test_remove_empty
|
||
assert queue.empty?
|
||
assert_raises(Queue::NoSuchElementError) do
|
||
queue.remove
|
||
end
|
||
end
|
||
def test_poll
|
||
queue.add "foo"
|
||
assert_equal "foo", queue.poll
|
||
end
|
||
def test_poll_empty
|
||
assert_nil queue.poll
|
||
end
|
||
def test_peek
|
||
queue.add "foo"
|
||
assert_equal "foo", queue.peek
|
||
assert_equal "foo", queue.remove
|
||
end
|
||
def test_first
|
||
queue.add "foo"
|
||
assert_equal "foo", queue.first
|
||
assert_equal "foo", queue.remove
|
||
end
|
||
def test_peek_empty
|
||
assert queue.empty?
|
||
assert_nil queue.peek
|
||
end
|
||
def test_element
|
||
queue.add "foo"
|
||
assert_equal "foo", queue.element
|
||
assert_equal "foo", queue.remove
|
||
end
|
||
def test_element_empty
|
||
assert queue.empty?
|
||
assert_raises(Queue::NoSuchElementError) do
|
||
queue.element
|
||
end
|
||
end
|
||
def test_offer_optionally_takes_timeout
|
||
assert queue.empty?
|
||
queue.offer 0, 10
|
||
assert_equal 1, queue.length
|
||
end
|
||
end
|
||
end
|