Project

General

Profile

Feature #6293 » queue.patch

tenderlovemaking (Aaron Patterson), 04/14/2012 08:28 AM

View differences:

lib/thread/queue.rb
require 'timeout'
class Thread
# Thread::Queue is thread safe a FIFO queue. It provdies a way to synchronize
# communication between threads. This queue does not block when items are
# removed (see Thread::Queue#remove)
#
# This queue does not allow nil elements.
class Queue
class NoSuchElementError < StandardError
end
include Enumerable
#
# Creates a new queue.
#
def initialize
@que = []
@que.taint # enable tainted communication
self.taint
@mutex = Mutex.new
end
def each(&block)
@que.each(&block)
end
#
# Adds +obj+ to the head of the queue.
#
# Raises an ArgumentError if +obj+ is nil.
#
def add(obj)
raise(ArgumentError, 'nil is not allowed in this queue') if obj.nil?
@mutex.synchronize { @que.push obj }
self
end
alias :push :add
alias :<< :add
def offer(obj, timeout = nil)
add obj
end
# Retrieves data from the queue head, and removes it.
#
# Raises a NoSuchElementError if the queue is empty.
def remove
@mutex.synchronize {
raise NoSuchElementError if empty?
@que.shift
}
end
alias :pop :remove
alias :shift :remove
alias :deq :remove
# Retrieves data from the queue head, and removes it.
#
# Returns nil if this queue is empty.
def poll
@mutex.synchronize {
if empty?
nil
else
@que.shift
end
}
end
# Retrieves data from the queue head, but does not removes it.
#
# Returns nil if the queue is empty
def peek
@mutex.synchronize { @que.first }
end
alias :first :peek
# Retrieves data from the queue head, but does not removes it.
#
# Raises NoSuchElementError if the queue is empty.
def element
@mutex.synchronize {
if empty?
raise NoSuchElementError
else
@que.first
end
}
end
#
# Returns +true+ if the queue is empty.
#
def empty?
@que.empty?
end
#
# Removes all objects from the queue.
#
def clear
@que.clear
end
#
# Returns the length of the queue.
#
def length
@que.length
end
alias :size :length
end
# Thread::Queue is thread safe a FIFO queue. It provdies a way to synchronize
# communication between threads.
#
# This queue does not allow nil elements.
class BlockingQueue < Queue
def initialize
@waiting = []
@waiting.taint
super
end
# Retrieves data from the queue head, and removes it.
#
# If the queue is empty, remove will block until there is something
# in the queue.
def take
@mutex.synchronize {
while true
if @que.empty?
# @waiting.include? check is necessary for avoiding a race against
# Thread.wakeup [Bug 5195]
@waiting.push Thread.current unless @waiting.include?(Thread.current)
@mutex.sleep
else
return @que.shift
end
end
}
end
alias :pop :take
alias :shift :take
alias :deq :take
# Adds +obj+ to the head of the queue.
#
# Raises an ArgumentError if +obj+ is nil.
#
def add(obj)
raise ArgumentError if obj.nil?
@mutex.synchronize {
@que.push obj
begin
t = @waiting.shift
t.wakeup if t
rescue ThreadError
retry
end
}
self
end
alias :push :add
alias :<< :add
# Retrieves data from the queue head, and removes it.
#
# Blocks for +timeout+ seconds if the queue is empty, and returns nil if
# the timeout expires.
def poll(timeout = nil)
return super() unless timeout
begin
Timeout.timeout(timeout) do
take
end
rescue TimeoutError
nil
end
end
end
end
test/thread/helper.rb
require 'minitest/autorun'
require 'thread/queue'
class Thread
class TestCase < MiniTest::Unit::TestCase
class Latch
def initialize
@mutex = Mutex.new
@cond = ConditionVariable.new
end
def release
@mutex.synchronize { @cond.broadcast }
end
def await
@mutex.synchronize { @cond.wait @mutex }
end
end
attr_reader :queue
POISON = Object.new
def grind(num_threads, num_objects, num_iterations, klass, *args)
from_workers = klass.new(*args)
to_workers = klass.new(*args)
to_consumers = num_threads.times.map {
Thread.new {
while object = to_workers.pop
break if object == POISON
from_workers.push object
end
}
}
from_consumer = Thread.new {
num_iterations.times {
num_objects.times { from_workers.pop }
}
}
num_iterations.times {
num_objects.times { to_workers.push 99 }
}
num_threads.times { to_workers.push POISON }
to_consumers.each { |t| t.join }
from_consumer.join
assert_equal 0, from_workers.size
assert_equal 0, to_workers.size
end
def non_block_grind(num_threads, num_objects, num_iterations, klass, *args)
from_workers = klass.new(*args)
to_workers = klass.new(*args)
to_latch = Latch.new
from_latch = Latch.new
to_consumers = num_threads.times.map {
Thread.new {
to_latch.await
while object = to_workers.pop
break if object == POISON
from_workers.push object
end
}
}
from_consumer = Thread.new {
from_latch.await
num_iterations.times {
num_objects.times { from_workers.pop }
}
}
num_iterations.times {
num_objects.times { to_workers.push 99 }
}
num_threads.times { to_workers.push POISON }
Thread.pass until to_consumers.all? { |c| c.status == "sleep" }
Thread.pass until from_consumer.status == "sleep"
to_latch.release
to_consumers.each { |t| t.join }
from_latch.release
from_consumer.join
assert_equal 0, from_workers.size
assert_equal 0, to_workers.size
end
end
end
test/thread/test_blocking_queue.rb
require 'helper'
class Thread
class TestBlockingQueue < TestCase
attr_reader :queue
def setup
@queue = Thread::BlockingQueue.new
super
end
def test_add_returns_self
assert_equal queue, queue.add(1)
end
def test_queue
grind(5, 1000, 15, Thread::BlockingQueue)
end
def test_offer
assert queue.offer(1)
assert_equal 1, queue.length
end
def test_clear
10.times { |i| queue << i }
assert_equal 10, queue.length
queue.clear
assert_equal 0, queue.length
assert queue.empty?
end
def test_enumerable
10.times { |i| queue << i }
assert_equal 10.times.map { |i| i }, queue.to_a
list = []
queue.map { |x| list << x }
assert_equal 10.times.map { |i| i }, list
assert_equal 10.times.map { |i| i }, queue.to_a
refute_equal queue.to_a.object_id, queue.to_a.object_id
end
def test_add
queue.add "foo"
assert_equal "foo", queue.take
assert queue.empty?
end
def test_add_nil
assert_raises(ArgumentError) do
queue.add nil
end
end
def test_remove_empty
assert queue.empty?
t = Thread.new { queue.take }
queue << 1
assert_equal 1, t.join.value
end
def test_poll
queue.add "foo"
assert_equal "foo", queue.poll
end
def test_poll_empty
assert_nil queue.poll
end
def test_poll_timeout
assert_nil queue.poll(1)
t = Thread.new { queue.poll(10) }
queue << "foo"
assert_equal "foo", t.join.value
end
def test_peek
queue.add "foo"
assert_equal "foo", queue.peek
assert_equal "foo", queue.take
end
def test_first
queue.add "foo"
assert_equal "foo", queue.first
assert_equal "foo", queue.remove
end
def test_peek_empty
assert queue.empty?
assert_nil queue.peek
end
def test_element
queue.add "foo"
assert_equal "foo", queue.element
assert_equal "foo", queue.take
end
def test_element_empty
assert queue.empty?
assert_raises(Queue::NoSuchElementError) do
queue.element
end
end
end
end
test/thread/test_non_block_queue.rb
require 'helper'
class Thread
class TestQueue < TestCase
alias :grind :non_block_grind
def setup
super
@queue = Thread::Queue.new
end
def test_queue
grind(5, 1000, 15, Thread::Queue)
end
def test_add_returns_self
assert_equal queue, queue.add(1)
end
def test_offer
assert queue.offer(1)
end
def test_clear
10.times { |i| queue << i }
assert_equal 10, queue.length
queue.clear
assert_equal 0, queue.length
assert queue.empty?
end
def test_enumerable
10.times { |i| queue << i }
assert_equal 10.times.map { |i| i }, queue.to_a
list = []
queue.map { |x| list << x }
assert_equal 10.times.map { |i| i }, list
assert_equal 10.times.map { |i| i }, queue.to_a
refute_equal queue.to_a.object_id, queue.to_a.object_id
end
def test_add
queue.add "foo"
assert_equal "foo", queue.remove
assert queue.empty?
end
def test_add_nil
assert_raises(ArgumentError) do
queue.add nil
end
end
def test_remove_empty
assert queue.empty?
assert_raises(Queue::NoSuchElementError) do
queue.remove
end
end
def test_poll
queue.add "foo"
assert_equal "foo", queue.poll
end
def test_poll_empty
assert_nil queue.poll
end
def test_peek
queue.add "foo"
assert_equal "foo", queue.peek
assert_equal "foo", queue.remove
end
def test_first
queue.add "foo"
assert_equal "foo", queue.first
assert_equal "foo", queue.remove
end
def test_peek_empty
assert queue.empty?
assert_nil queue.peek
end
def test_element
queue.add "foo"
assert_equal "foo", queue.element
assert_equal "foo", queue.remove
end
def test_element_empty
assert queue.empty?
assert_raises(Queue::NoSuchElementError) do
queue.element
end
end
def test_offer_optionally_takes_timeout
assert queue.empty?
queue.offer 0, 10
assert_equal 1, queue.length
end
end
end
(1-1/6)