Project

General

Profile

Feature #6293 ยป queue.patch

tenderlovemaking (Aaron Patterson), 04/14/2012 08:28 AM

View differences:

lib/thread/queue.rb
1
require 'timeout'
2

  
3
class Thread
4
  # Thread::Queue is thread safe a FIFO queue.  It provdies a way to synchronize
5
  # communication between threads.  This queue does not block when items are
6
  # removed (see Thread::Queue#remove)
7
  #
8
  # This queue does not allow nil elements.
9
  class Queue
10
    class NoSuchElementError < StandardError
11
    end
12

  
13
    include Enumerable
14

  
15
    #
16
    # Creates a new queue.
17
    #
18
    def initialize
19
      @que = []
20
      @que.taint          # enable tainted communication
21
      self.taint
22
      @mutex = Mutex.new
23
    end
24

  
25
    def each(&block)
26
      @que.each(&block)
27
    end
28

  
29
    #
30
    # Adds +obj+ to the head of the queue.
31
    #
32
    # Raises an ArgumentError if +obj+ is nil.
33
    #
34
    def add(obj)
35
      raise(ArgumentError, 'nil is not allowed in this queue') if obj.nil?
36
      @mutex.synchronize { @que.push obj }
37
      self
38
    end
39

  
40
    alias :push  :add
41
    alias :<<    :add
42

  
43
    def offer(obj, timeout = nil)
44
      add obj
45
    end
46

  
47
    # Retrieves data from the queue head, and removes it.
48
    #
49
    # Raises a NoSuchElementError if the queue is empty.
50
    def remove
51
      @mutex.synchronize {
52
        raise NoSuchElementError if empty?
53
        @que.shift
54
      }
55
    end
56

  
57
    alias :pop   :remove
58
    alias :shift :remove
59
    alias :deq   :remove
60

  
61
    # Retrieves data from the queue head, and removes it.
62
    #
63
    # Returns nil if this queue is empty.
64
    def poll
65
      @mutex.synchronize {
66
        if empty?
67
          nil
68
        else
69
          @que.shift
70
        end
71
      }
72
    end
73

  
74
    # Retrieves data from the queue head, but does not removes it.
75
    #
76
    # Returns nil if the queue is empty
77
    def peek
78
      @mutex.synchronize { @que.first }
79
    end
80
    alias :first :peek
81

  
82
    # Retrieves data from the queue head, but does not removes it.
83
    #
84
    # Raises NoSuchElementError if the queue is empty.
85
    def element
86
      @mutex.synchronize {
87
        if empty?
88
          raise NoSuchElementError
89
        else
90
          @que.first
91
        end
92
      }
93
    end
94

  
95
    #
96
    # Returns +true+ if the queue is empty.
97
    #
98
    def empty?
99
      @que.empty?
100
    end
101

  
102
    #
103
    # Removes all objects from the queue.
104
    #
105
    def clear
106
      @que.clear
107
    end
108

  
109
    #
110
    # Returns the length of the queue.
111
    #
112
    def length
113
      @que.length
114
    end
115
    alias :size :length
116
  end
117

  
118
  # Thread::Queue is thread safe a FIFO queue.  It provdies a way to synchronize
119
  # communication between threads.
120
  #
121
  # This queue does not allow nil elements.
122
  class BlockingQueue < Queue
123
    def initialize
124
      @waiting = []
125
      @waiting.taint
126
      super
127
    end
128

  
129
    # Retrieves data from the queue head, and removes it.
130
    #
131
    # If the queue is empty, remove will block until there is something
132
    # in the queue.
133
    def take
134
      @mutex.synchronize {
135
        while true
136
          if @que.empty?
137
            # @waiting.include? check is necessary for avoiding a race against
138
            # Thread.wakeup [Bug 5195]
139
            @waiting.push Thread.current unless @waiting.include?(Thread.current)
140
            @mutex.sleep
141
          else
142
            return @que.shift
143
          end
144
        end
145
      }
146
    end
147

  
148
    alias :pop   :take
149
    alias :shift :take
150
    alias :deq   :take
151

  
152
    # Adds +obj+ to the head of the queue.
153
    #
154
    # Raises an ArgumentError if +obj+ is nil.
155
    #
156
    def add(obj)
157
      raise ArgumentError if obj.nil?
158

  
159
      @mutex.synchronize {
160
        @que.push obj
161
        begin
162
          t = @waiting.shift
163
          t.wakeup if t
164
        rescue ThreadError
165
          retry
166
        end
167
      }
168
      self
169
    end
170

  
171
    alias :push :add
172
    alias :<<   :add
173

  
174
    # Retrieves data from the queue head, and removes it.
175
    #
176
    # Blocks for +timeout+ seconds if the queue is empty, and returns nil if
177
    # the timeout expires.
178
    def poll(timeout = nil)
179
      return super() unless timeout
180

  
181
      begin
182
        Timeout.timeout(timeout) do
183
          take
184
        end
185
      rescue TimeoutError
186
        nil
187
      end
188
    end
189
  end
190
end
test/thread/helper.rb
1
require 'minitest/autorun'
2
require 'thread/queue'
3

  
4
class Thread
5
  class TestCase < MiniTest::Unit::TestCase
6
    class Latch
7
      def initialize
8
        @mutex = Mutex.new
9
        @cond  = ConditionVariable.new
10
      end
11

  
12
      def release
13
        @mutex.synchronize { @cond.broadcast }
14
      end
15

  
16
      def await
17
        @mutex.synchronize { @cond.wait @mutex }
18
      end
19
    end
20

  
21
    attr_reader :queue
22

  
23
    POISON = Object.new
24

  
25
    def grind(num_threads, num_objects, num_iterations, klass, *args)
26
      from_workers = klass.new(*args)
27
      to_workers = klass.new(*args)
28

  
29
      to_consumers = num_threads.times.map {
30
        Thread.new {
31
          while object = to_workers.pop
32
            break if object == POISON
33
            from_workers.push object
34
          end
35
        }
36
      }
37

  
38
      from_consumer = Thread.new {
39
        num_iterations.times {
40
          num_objects.times { from_workers.pop }
41
        }
42
      }
43

  
44
      num_iterations.times {
45
        num_objects.times { to_workers.push 99 }
46
      }
47
      num_threads.times { to_workers.push POISON }
48

  
49
      to_consumers.each { |t| t.join }
50

  
51
      from_consumer.join
52

  
53
      assert_equal 0, from_workers.size
54
      assert_equal 0, to_workers.size
55
    end
56

  
57
    def non_block_grind(num_threads, num_objects, num_iterations, klass, *args)
58
      from_workers = klass.new(*args)
59
      to_workers = klass.new(*args)
60

  
61
      to_latch   = Latch.new
62
      from_latch = Latch.new
63

  
64
      to_consumers = num_threads.times.map {
65
        Thread.new {
66
          to_latch.await
67

  
68
          while object = to_workers.pop
69
            break if object == POISON
70
            from_workers.push object
71
          end
72
        }
73
      }
74

  
75
      from_consumer = Thread.new {
76
        from_latch.await
77

  
78
        num_iterations.times {
79
          num_objects.times { from_workers.pop }
80
        }
81
      }
82

  
83
      num_iterations.times {
84
        num_objects.times { to_workers.push 99 }
85
      }
86
      num_threads.times { to_workers.push POISON }
87

  
88
      Thread.pass until to_consumers.all? { |c| c.status == "sleep" }
89
      Thread.pass until from_consumer.status == "sleep"
90

  
91
      to_latch.release
92

  
93
      to_consumers.each { |t| t.join }
94

  
95
      from_latch.release
96
      from_consumer.join
97

  
98
      assert_equal 0, from_workers.size
99
      assert_equal 0, to_workers.size
100
    end
101
  end
102
end
test/thread/test_blocking_queue.rb
1
require 'helper'
2

  
3
class Thread
4
  class TestBlockingQueue < TestCase
5
    attr_reader :queue
6

  
7
    def setup
8
      @queue = Thread::BlockingQueue.new
9
      super
10
    end
11

  
12
    def test_add_returns_self
13
      assert_equal queue, queue.add(1)
14
    end
15

  
16
    def test_queue
17
      grind(5, 1000, 15, Thread::BlockingQueue)
18
    end
19

  
20
    def test_offer
21
      assert queue.offer(1)
22
      assert_equal 1, queue.length
23
    end
24

  
25
    def test_clear
26
      10.times { |i| queue << i }
27
      assert_equal 10, queue.length
28
      queue.clear
29
      assert_equal 0, queue.length
30
      assert queue.empty?
31
    end
32

  
33
    def test_enumerable
34
      10.times { |i| queue << i }
35
      assert_equal 10.times.map { |i| i }, queue.to_a
36

  
37
      list = []
38
      queue.map { |x| list << x }
39

  
40
      assert_equal 10.times.map { |i| i }, list
41
      assert_equal 10.times.map { |i| i }, queue.to_a
42
      refute_equal queue.to_a.object_id, queue.to_a.object_id
43
    end
44

  
45
    def test_add
46
      queue.add "foo"
47
      assert_equal "foo", queue.take
48
      assert queue.empty?
49
    end
50

  
51
    def test_add_nil
52
      assert_raises(ArgumentError) do
53
        queue.add nil
54
      end
55
    end
56

  
57
    def test_remove_empty
58
      assert queue.empty?
59
      t = Thread.new { queue.take }
60
      queue << 1
61
      assert_equal 1, t.join.value
62
    end
63

  
64
    def test_poll
65
      queue.add "foo"
66
      assert_equal "foo", queue.poll
67
    end
68

  
69
    def test_poll_empty
70
      assert_nil queue.poll
71
    end
72

  
73
    def test_poll_timeout
74
      assert_nil queue.poll(1)
75

  
76
      t = Thread.new { queue.poll(10) }
77
      queue << "foo"
78
      assert_equal "foo", t.join.value
79
    end
80

  
81
    def test_peek
82
      queue.add "foo"
83
      assert_equal "foo", queue.peek
84
      assert_equal "foo", queue.take
85
    end
86

  
87
    def test_first
88
      queue.add "foo"
89
      assert_equal "foo", queue.first
90
      assert_equal "foo", queue.remove
91
    end
92

  
93
    def test_peek_empty
94
      assert queue.empty?
95
      assert_nil queue.peek
96
    end
97

  
98
    def test_element
99
      queue.add "foo"
100
      assert_equal "foo", queue.element
101
      assert_equal "foo", queue.take
102
    end
103

  
104
    def test_element_empty
105
      assert queue.empty?
106
      assert_raises(Queue::NoSuchElementError) do
107
        queue.element
108
      end
109
    end
110
  end
111
end
test/thread/test_non_block_queue.rb
1
require 'helper'
2

  
3
class Thread
4
  class TestQueue < TestCase
5
    alias :grind :non_block_grind
6

  
7
    def setup
8
      super
9
      @queue = Thread::Queue.new
10
    end
11

  
12
    def test_queue
13
      grind(5, 1000, 15, Thread::Queue)
14
    end
15

  
16
    def test_add_returns_self
17
      assert_equal queue, queue.add(1)
18
    end
19

  
20
    def test_offer
21
      assert queue.offer(1)
22
    end
23

  
24
    def test_clear
25
      10.times { |i| queue << i }
26
      assert_equal 10, queue.length
27
      queue.clear
28
      assert_equal 0, queue.length
29
      assert queue.empty?
30
    end
31

  
32
    def test_enumerable
33
      10.times { |i| queue << i }
34
      assert_equal 10.times.map { |i| i }, queue.to_a
35

  
36
      list = []
37
      queue.map { |x| list << x }
38

  
39
      assert_equal 10.times.map { |i| i }, list
40
      assert_equal 10.times.map { |i| i }, queue.to_a
41
      refute_equal queue.to_a.object_id, queue.to_a.object_id
42
    end
43

  
44
    def test_add
45
      queue.add "foo"
46
      assert_equal "foo", queue.remove
47
      assert queue.empty?
48
    end
49

  
50
    def test_add_nil
51
      assert_raises(ArgumentError) do
52
        queue.add nil
53
      end
54
    end
55

  
56
    def test_remove_empty
57
      assert queue.empty?
58
      assert_raises(Queue::NoSuchElementError) do
59
        queue.remove
60
      end
61
    end
62

  
63
    def test_poll
64
      queue.add "foo"
65
      assert_equal "foo", queue.poll
66
    end
67

  
68
    def test_poll_empty
69
      assert_nil queue.poll
70
    end
71

  
72
    def test_peek
73
      queue.add "foo"
74
      assert_equal "foo", queue.peek
75
      assert_equal "foo", queue.remove
76
    end
77

  
78
    def test_first
79
      queue.add "foo"
80
      assert_equal "foo", queue.first
81
      assert_equal "foo", queue.remove
82
    end
83

  
84
    def test_peek_empty
85
      assert queue.empty?
86
      assert_nil queue.peek
87
    end
88

  
89
    def test_element
90
      queue.add "foo"
91
      assert_equal "foo", queue.element
92
      assert_equal "foo", queue.remove
93
    end
94

  
95
    def test_element_empty
96
      assert queue.empty?
97
      assert_raises(Queue::NoSuchElementError) do
98
        queue.element
99
      end
100
    end
101

  
102
    def test_offer_optionally_takes_timeout
103
      assert queue.empty?
104
      queue.offer 0, 10
105
      assert_equal 1, queue.length
106
    end
107
  end
108
end