Project

General

Profile

Feature #6293 » queue.patch

Anonymous, 04/21/2012 07:59 AM

View differences:

lib/thread.rb
129 129
#
130 130
#   consumer = Thread.new do
131 131
#     5.times do |i|
132
#       value = queue.pop
132
#       value = queue.shift
133 133
#       sleep rand(i/2) # simulate expense
134 134
#       puts "consumed #{value}"
135 135
#     end
/dev/null → lib/thread/queue.rb
1
require 'timeout'
2

  
3
class Thread
4
  # Thread::Queue is thread safe a FIFO queue.  It provdies a way to synchronize
5
  # communication between threads.  This queue does not block when items are
6
  # removed (see Thread::Queue#remove)
7
  #
8
  # This queue does not allow nil elements.
9
  class Queue
10
    class NoSuchElementError < StandardError
11
    end
12

  
13
    #
14
    # Creates a new queue.
15
    #
16
    def initialize
17
      @que = []
18
      @que.taint          # enable tainted communication
19
      self.taint
20
      @mutex = Mutex.new
21
    end
22

  
23
    def to_a
24
      @que.dup
25
    end
26

  
27
    #
28
    # Adds +obj+ to the head of the queue.
29
    #
30
    # Raises an ArgumentError if +obj+ is nil.
31
    #
32
    def add(obj)
33
      raise ArgumentError if obj.nil?
34
      @mutex.synchronize { @que.push obj }
35
      self
36
    end
37

  
38
    alias :push  :add
39
    alias :<<    :add
40

  
41
    def offer(obj, timeout = nil)
42
      add obj
43
    end
44

  
45
    # Retrieves data from the queue head, and removes it.
46
    #
47
    # Raises a NoSuchElementError if the queue is empty.
48
    def remove
49
      @mutex.synchronize {
50
        raise NoSuchElementError if empty?
51
        @que.shift
52
      }
53
    end
54

  
55
    alias :pop   :remove
56
    alias :shift :remove
57
    alias :deq   :remove
58

  
59
    # Retrieves data from the queue head, and removes it.
60
    #
61
    # Returns nil if this queue is empty.
62
    def poll
63
      @mutex.synchronize {
64
        if empty?
65
          nil
66
        else
67
          @que.shift
68
        end
69
      }
70
    end
71

  
72
    # Retrieves data from the queue head, but does not removes it.
73
    #
74
    # Returns nil if the queue is empty
75
    def peek
76
      @mutex.synchronize { @que.first }
77
    end
78

  
79
    # Retrieves data from the queue head, but does not removes it.
80
    #
81
    # Raises NoSuchElementError if the queue is empty.
82
    def element
83
      @mutex.synchronize {
84
        if empty?
85
          raise NoSuchElementError
86
        else
87
          @que.first
88
        end
89
      }
90
    end
91

  
92
    #
93
    # Returns +true+ if the queue is empty.
94
    #
95
    def empty?
96
      @que.empty?
97
    end
98

  
99
    #
100
    # Removes all objects from the queue.
101
    #
102
    def clear
103
      @que.clear
104
    end
105

  
106
    #
107
    # Returns the length of the queue.
108
    #
109
    def length
110
      @que.length
111
    end
112
    alias :size :length
113
  end
114

  
115
  # Thread::Queue is thread safe a FIFO queue.  It provdies a way to synchronize
116
  # communication between threads.
117
  #
118
  # This queue does not allow nil elements.
119
  class BlockingQueue < Queue
120
    def initialize
121
      @waiting = []
122
      @waiting.taint
123
      super
124
    end
125

  
126
    # Retrieves data from the queue head, and removes it.
127
    #
128
    # If the queue is empty, remove will block until there is something
129
    # in the queue.
130
    def take
131
      @mutex.synchronize {
132
        while true
133
          if @que.empty?
134
            # @waiting.include? check is necessary for avoiding a race against
135
            # Thread.wakeup [Bug 5195]
136
            @waiting.push Thread.current unless @waiting.include?(Thread.current)
137
            @mutex.sleep
138
          else
139
            return @que.shift
140
          end
141
        end
142
      }
143
    end
144

  
145
    alias :pop   :take
146
    alias :shift :take
147
    alias :deq   :take
148

  
149
    # Adds +obj+ to the head of the queue.
150
    #
151
    # Raises an ArgumentError if +obj+ is nil.
152
    #
153
    def add(obj)
154
      raise ArgumentError if obj.nil?
155

  
156
      @mutex.synchronize {
157
        @que.push obj
158
        begin
159
          t = @waiting.shift
160
          t.wakeup if t
161
        rescue ThreadError
162
          retry
163
        end
164
      }
165
      self
166
    end
167

  
168
    alias :push :add
169
    alias :<<   :add
170

  
171
    # Retrieves data from the queue head, and removes it.
172
    #
173
    # Blocks for +timeout+ seconds if the queue is empty, and returns nil if
174
    # the timeout expires.
175
    def poll(timeout = nil)
176
      return super() unless timeout
177

  
178
      begin
179
        Timeout.timeout(timeout) do
180
          take
181
        end
182
      rescue TimeoutError
183
        nil
184
      end
185
    end
186
  end
187
end
/dev/null → test/thread/helper.rb
1
require 'minitest/autorun'
2
require 'thread/queue'
3

  
4
class Thread
5
  class TestCase < MiniTest::Unit::TestCase
6
    class Latch
7
      def initialize
8
        @mutex = Mutex.new
9
        @cond  = ConditionVariable.new
10
      end
11

  
12
      def release
13
        @mutex.synchronize { @cond.broadcast }
14
      end
15

  
16
      def await
17
        @mutex.synchronize { @cond.wait @mutex }
18
      end
19
    end
20

  
21
    attr_reader :queue
22

  
23
    POISON = Object.new
24

  
25
    def grind(num_threads, num_objects, num_iterations, klass, *args)
26
      from_workers = klass.new(*args)
27
      to_workers = klass.new(*args)
28

  
29
      to_consumers = num_threads.times.map {
30
        Thread.new {
31
          while object = to_workers.pop
32
            break if object == POISON
33
            from_workers.push object
34
          end
35
        }
36
      }
37

  
38
      from_consumer = Thread.new {
39
        num_iterations.times {
40
          num_objects.times { from_workers.pop }
41
        }
42
      }
43

  
44
      num_iterations.times {
45
        num_objects.times { to_workers.push 99 }
46
      }
47
      num_threads.times { to_workers.push POISON }
48

  
49
      to_consumers.each { |t| t.join }
50

  
51
      from_consumer.join
52

  
53
      assert_equal 0, from_workers.size
54
      assert_equal 0, to_workers.size
55
    end
56

  
57
    def non_block_grind(num_threads, num_objects, num_iterations, klass, *args)
58
      from_workers = klass.new(*args)
59
      to_workers = klass.new(*args)
60

  
61
      to_latch   = Latch.new
62
      from_latch = Latch.new
63

  
64
      to_consumers = num_threads.times.map {
65
        Thread.new {
66
          to_latch.await
67

  
68
          while object = to_workers.pop
69
            break if object == POISON
70
            from_workers.push object
71
          end
72
        }
73
      }
74

  
75
      from_consumer = Thread.new {
76
        from_latch.await
77

  
78
        num_iterations.times {
79
          num_objects.times { from_workers.pop }
80
        }
81
      }
82

  
83
      num_iterations.times {
84
        num_objects.times { to_workers.push 99 }
85
      }
86
      num_threads.times { to_workers.push POISON }
87

  
88
      Thread.pass until to_consumers.all? { |c| c.status == "sleep" }
89
      Thread.pass until from_consumer.status == "sleep"
90

  
91
      to_latch.release
92

  
93
      to_consumers.each { |t| t.join }
94

  
95
      from_latch.release
96
      from_consumer.join
97

  
98
      assert_equal 0, from_workers.size
99
      assert_equal 0, to_workers.size
100
    end
101
  end
102
end
/dev/null → test/thread/test_blocking_queue.rb
1
require 'helper'
2

  
3
class Thread
4
  class TestBlockingQueue < TestCase
5
    attr_reader :queue
6

  
7
    def setup
8
      @queue = Thread::BlockingQueue.new
9
      super
10
    end
11

  
12
    def test_add_returns_self
13
      assert_equal queue, queue.add(1)
14
    end
15

  
16
    def test_queue
17
      grind(5, 1000, 15, Thread::BlockingQueue)
18
    end
19

  
20
    def test_offer
21
      assert queue.offer(1)
22
      assert_equal 1, queue.length
23
    end
24

  
25
    def test_clear
26
      10.times { |i| queue << i }
27
      assert_equal 10, queue.length
28
      queue.clear
29
      assert_equal 0, queue.length
30
      assert queue.empty?
31
    end
32

  
33
    def test_add
34
      queue.add "foo"
35
      assert_equal "foo", queue.take
36
      assert queue.empty?
37
    end
38

  
39
    def test_add_nil
40
      assert_raises(ArgumentError) do
41
        queue.add nil
42
      end
43
    end
44

  
45
    def test_remove_empty
46
      assert queue.empty?
47
      t = Thread.new { queue.take }
48
      queue << 1
49
      assert_equal 1, t.join.value
50
    end
51

  
52
    def test_poll
53
      queue.add "foo"
54
      assert_equal "foo", queue.poll
55
    end
56

  
57
    def test_poll_empty
58
      assert_nil queue.poll
59
    end
60

  
61
    def test_poll_timeout
62
      assert_nil queue.poll(1)
63

  
64
      t = Thread.new { queue.poll(10) }
65
      queue << "foo"
66
      assert_equal "foo", t.join.value
67
    end
68

  
69
    def test_peek
70
      queue.add "foo"
71
      assert_equal "foo", queue.peek
72
      assert_equal "foo", queue.take
73
    end
74

  
75
    def test_peek_empty
76
      assert queue.empty?
77
      assert_nil queue.peek
78
    end
79

  
80
    def test_element
81
      queue.add "foo"
82
      assert_equal "foo", queue.element
83
      assert_equal "foo", queue.take
84
    end
85

  
86
    def test_element_empty
87
      assert queue.empty?
88
      assert_raises(Queue::NoSuchElementError) do
89
        queue.element
90
      end
91
    end
92
  end
93
end
/dev/null → test/thread/test_non_block_queue.rb
1
require 'helper'
2

  
3
class Thread
4
  class TestQueue < TestCase
5
    alias :grind :non_block_grind
6

  
7
    def setup
8
      super
9
      @queue = Thread::Queue.new
10
    end
11

  
12
    def test_queue
13
      grind(5, 1000, 15, Thread::Queue)
14
    end
15

  
16
    def test_add_returns_self
17
      assert_equal queue, queue.add(1)
18
    end
19

  
20
    def test_offer
21
      assert queue.offer(1)
22
    end
23

  
24
    def test_clear
25
      10.times { |i| queue << i }
26
      assert_equal 10, queue.length
27
      queue.clear
28
      assert_equal 0, queue.length
29
      assert queue.empty?
30
    end
31

  
32
    def test_add
33
      queue.add "foo"
34
      assert_equal "foo", queue.remove
35
      assert queue.empty?
36
    end
37

  
38
    def test_add_nil
39
      assert_raises(ArgumentError) do
40
        queue.add nil
41
      end
42
    end
43

  
44
    def test_remove_empty
45
      assert queue.empty?
46
      assert_raises(Queue::NoSuchElementError) do
47
        queue.remove
48
      end
49
    end
50

  
51
    def test_poll
52
      queue.add "foo"
53
      assert_equal "foo", queue.poll
54
    end
55

  
56
    def test_poll_empty
57
      assert_nil queue.poll
58
    end
59

  
60
    def test_peek
61
      queue.add "foo"
62
      assert_equal "foo", queue.peek
63
      assert_equal "foo", queue.remove
64
    end
65

  
66
    def test_peek_empty
67
      assert queue.empty?
68
      assert_nil queue.peek
69
    end
70

  
71
    def test_element
72
      queue.add "foo"
73
      assert_equal "foo", queue.element
74
      assert_equal "foo", queue.remove
75
    end
76

  
77
    def test_element_empty
78
      assert queue.empty?
79
      assert_raises(Queue::NoSuchElementError) do
80
        queue.element
81
      end
82
    end
83

  
84
    def test_offer_optionally_takes_timeout
85
      assert queue.empty?
86
      queue.offer 0, 10
87
      assert_equal 1, queue.length
88
    end
89
  end
90
end
0 91