funny_falcon_threads.patch

Zachary Scott, 11/19/2012 08:35 AM

Download (15.2 KB)

View differences:

test/ruby/test_thread.rb
209 209
    end.join
210 210
  end
211 211

  
212
  class Serializer
213
    def initialize
214
      @mutex = Mutex.new
215
      @condvar = ConditionVariable.new
216
    end
217
    def wait
218
      @mutex.synchronize{
219
        @condvar.wait(@mutex)
220
      }
221
    end
222
    def signal
223
      @mutex.synchronize{ @condvar.signal }
224
    end
225
  end
226

  
227
  def test_condvar_wait_timeout
228
    serialize = Serializer.new
229

  
230
    mutex = Mutex.new
231
    condvar = ConditionVariable.new
232
    def condvar.waiters
233
      @waiters
234
    end
235
    thread = Thread.new do
236
      serialize.signal
237
      mutex.synchronize do
238
        condvar.wait(mutex, 0.001)
239
      end
240
    end
241
    serialize.wait
242
    mutex.synchronize do
243
      sleep(0.01)
244
      assert_not_includes(condvar.waiters, thread)
245
    end
246
  end
247

  
248
  def test_condvar_wait_timeout_2
249
    serialize = Serializer.new
250

  
251
    mutex = Mutex.new
252
    condvar = ConditionVariable.new
253

  
254
    wait_timeout = Thread.new do
255
      serialize.signal
256
      mutex.synchronize do
257
        condvar.wait(mutex, 0.001)
258
      end
259
    end
260
    serialize.wait
261

  
262
    wait_forever = Thread.new do
263
      serialize.signal
264
      mutex.synchronize do
265
        condvar.wait(mutex)
266
      end
267
    end
268
    serialize.wait
269

  
270
    mutex.synchronize do
271
      sleep(0.01)
272
      condvar.signal
273
    end
274
    # If wait_timeout thread didn't remove himself
275
    # from condvar sleepers, than signale tries to
276
    # wakeup wait_timeout instead of wait_forever
277
    # and fatal deadlock occures in a line below
278
    wait_timeout.join
279
    wait_forever.join
280

  
281
  end
282

  
212 283
  def test_local_barrier
213 284
    dir = File.dirname(__FILE__)
214 285
    lbtest = File.join(dir, "lbtest.rb")
215
- 
lib/thread.rb
63 63
  # even if no other thread doesn't signal.
64 64
  #
65 65
  def wait(mutex, timeout=nil)
66
    begin
67
      # TODO: mutex should not be used
68
      @waiters_mutex.synchronize do
69
        @waiters.push(Thread.current)
70
      end
71
      mutex.sleep timeout
72
    ensure
66
    # TODO: mutex should not be used
67
    @waiters_mutex.synchronize do
68
      @waiters.push(Thread.current)
69
    end
70
    mutex.sleep timeout do
73 71
      @waiters_mutex.synchronize do
74 72
        @waiters.delete(Thread.current)
75 73
      end
thread.c
4056 4056
}
4057 4057

  
4058 4058
static VALUE
4059
rb_mutex_sleep_forever(VALUE time)
4059
wrap_thread_sleep_deadly()
4060 4060
{
4061 4061
    rb_thread_sleep_deadly();
4062 4062
    return Qnil;
4063 4063
}
4064 4064

  
4065 4065
static VALUE
4066
rb_mutex_wait_for(VALUE time)
4066
rb_mutex_sleep_forever(VALUE time)
4067
{
4068
    if (rb_block_given_p()) {
4069
	return rb_ensure(wrap_thread_sleep_deadly, Qnil, rb_yield, Qnil);
4070
    }
4071
    return wrap_thread_sleep_deadly();
4072
}
4073

  
4074
static VALUE
4075
wrap_rb_thread_wait_for(VALUE time)
4067 4076
{
4068 4077
    const struct timeval *t = (struct timeval *)time;
4069 4078
    rb_thread_wait_for(*t);
4070 4079
    return Qnil;
4071 4080
}
4072 4081

  
4082
static VALUE
4083
rb_mutex_wait_for(VALUE time)
4084
{
4085
    if (rb_block_given_p()) {
4086
	return rb_ensure(wrap_rb_thread_wait_for, time, rb_yield, Qnil);
4087
    }
4088
    return wrap_rb_thread_wait_for(time);
4089
}
4090

  
4073 4091
VALUE
4074 4092
rb_mutex_sleep(VALUE self, VALUE timeout)
4075 4093
{
4076
- 
lib/thread.rb
20 20
  Thread.abort_on_exception = true
21 21
end
22 22

  
23
unless defined?(Thread::RELY_ON_GVL)
24
  Thread::RELY_ON_GVL = false
25
end
26

  
23 27
#
24 28
# ConditionVariable objects augment class Mutex. Using condition variables,
25 29
# it is possible to suspend while in the middle of a critical section until a
......
63 67
  # even if no other thread doesn't signal.
64 68
  #
65 69
  def wait(mutex, timeout=nil)
66
    # TODO: mutex should not be used
70
    # Rely on GVL for sychronizing @waiters.push
71
    @waiters.push(Thread.current)
72
    mutex.sleep timeout do
73
      # We could not rely on GVL cause compare is called
74
      @waiters_mutex.synchronize do
75
        @waiters.delete(Thread.current)
76
      end
77
    end
78
    self
79
  end if Thread::RELY_ON_GVL
80

  
81
  def wait(mutex, timeout=nil) # :nodoc: 
67 82
    @waiters_mutex.synchronize do
68 83
      @waiters.push(Thread.current)
69 84
    end
......
73 88
      end
74 89
    end
75 90
    self
76
  end
91
  end unless Thread::RELY_ON_GVL
77 92

  
78 93
  #
79 94
  # Wakes up the first thread in line waiting for this lock.
80 95
  #
81 96
  def signal
82 97
    begin
83
      t = @waiters_mutex.synchronize {@waiters.shift}
98
      t = @waiters.shift
84 99
      t.run if t
85 100
    rescue ThreadError
86 101
      retry
87 102
    end
88 103
    self
89
  end
104
  end if Thread::RELY_ON_GVL
105

  
106
  def signal # :nodoc:
107
    begin
108
      t = @waiters_mutex.synchronize { @waiters.shift }
109
      t.run if t
110
    rescue ThreadError
111
      retry
112
    end
113
    self
114
  end unless Thread::RELY_ON_GVL
90 115

  
91 116
  #
92 117
  # Wakes up all threads waiting for this lock.
thread.c
4528 4528
    rb_define_singleton_method(rb_cThread, "list", rb_thread_list, 0);
4529 4529
    rb_define_singleton_method(rb_cThread, "abort_on_exception", rb_thread_s_abort_exc, 0);
4530 4530
    rb_define_singleton_method(rb_cThread, "abort_on_exception=", rb_thread_s_abort_exc_set, 1);
4531
    rb_define_const(rb_cThread, "RELY_ON_GVL", Qtrue);
4531 4532
#if THREAD_DEBUG < 0
4532 4533
    rb_define_singleton_method(rb_cThread, "DEBUG", rb_thread_s_debug, 0);
4533 4534
    rb_define_singleton_method(rb_cThread, "DEBUG=", rb_thread_s_debug_set, 1);
4534
- 
lib/thread.rb
56 56
  # Creates a new ConditionVariable
57 57
  #
58 58
  def initialize
59
    @waiters = []
59
    @waiters = {}
60
    @waiters.compare_by_identity
60 61
    @waiters_mutex = Mutex.new
61 62
  end
62 63

  
......
68 69
  #
69 70
  def wait(mutex, timeout=nil)
70 71
    # Rely on GVL for sychronizing @waiters.push
71
    @waiters.push(Thread.current)
72
    @waiters[Thread.current] = true
72 73
    mutex.sleep timeout do
73
      # We could not rely on GVL cause compare is called
74
      @waiters_mutex.synchronize do
75
        @waiters.delete(Thread.current)
76
      end
74
      # We could rely on GVL cause hash were set to compare_by_identity mode
75
      @waiters.delete(Thread.current)
77 76
    end
78 77
    self
79 78
  end if Thread::RELY_ON_GVL
80 79

  
81 80
  def wait(mutex, timeout=nil) # :nodoc: 
82 81
    @waiters_mutex.synchronize do
83
      @waiters.push(Thread.current)
82
      @waiters[Thread.current] = true
84 83
    end
85 84
    mutex.sleep timeout do
86 85
      @waiters_mutex.synchronize do
......
95 94
  #
96 95
  def signal
97 96
    begin
98
      t = @waiters.shift
97
      t, _ = @waiters.shift
99 98
      t.run if t
100 99
    rescue ThreadError
101 100
      retry
......
105 104

  
106 105
  def signal # :nodoc:
107 106
    begin
108
      t = @waiters_mutex.synchronize { @waiters.shift }
107
      t, _ = @waiters_mutex.synchronize { @waiters.shift }
109 108
      t.run if t
110 109
    rescue ThreadError
111 110
      retry
......
120 119
    # TODO: incomplete
121 120
    waiters0 = nil
122 121
    @waiters_mutex.synchronize do
123
      waiters0 = @waiters.dup
122
      waiters0 = @waiters.keys
124 123
      @waiters.clear
125 124
    end
126 125
    for t in waiters0
......
166 165
  #
167 166
  def initialize
168 167
    @que = []
169
    @waiting = []
168
    @waiting = {}
169
    @waiting.compare_by_identity
170 170
    @que.taint          # enable tainted communication
171 171
    @waiting.taint
172 172
    self.taint
173 173
    @mutex = Mutex.new
174 174
  end
175 175

  
176
  def push_no_sync(obj) # :nodoc:
177
    @que.push obj
178
    begin
179
      t, _ = @waiting.shift
180
      t.wakeup if t
181
    rescue ThreadError
182
      retry
183
    end
184
  end
185
  private :push_no_sync
176 186
  #
177 187
  # Pushes +obj+ to the queue.
178 188
  #
179 189
  def push(obj)
180
    @mutex.synchronize{
181
      @que.push obj
182
      begin
183
        t = @waiting.shift
184
        t.wakeup if t
185
      rescue ThreadError
186
        retry
187
      end
188
    }
190
    @mutex.synchronize{ push_no_sync(obj) }
189 191
  end
190 192

  
191 193
  #
......
209 211
        while true
210 212
          if @que.empty?
211 213
            raise ThreadError, "queue empty" if non_block
212
            # @waiting.include? check is necessary for avoiding a race against
213
            # Thread.wakeup [Bug 5195]
214
            @waiting.push Thread.current unless @waiting.include?(Thread.current)
214
            @waiting[Thread.current] = true
215 215
            @mutex.sleep
216 216
          else
217 217
            return @que.shift
......
280 280
  def initialize(max)
281 281
    raise ArgumentError, "queue size must be positive" unless max > 0
282 282
    @max = max
283
    @queue_wait = []
283
    @queue_wait = {}
284
    @queue_wait.compare_by_identity
284 285
    @queue_wait.taint           # enable tainted comunication
285 286
    super()
286 287
  end
......
309 310
    if diff
310 311
      diff.times do
311 312
        begin
312
          t = @queue_wait.shift
313
          t, _ = @queue_wait.shift
313 314
          t.run if t
314 315
        rescue ThreadError
315 316
          retry
......
328 329
      begin
329 330
        while true
330 331
          break if @que.length < @max
331
          @queue_wait.push Thread.current unless @queue_wait.include?(Thread.current)
332
          @queue_wait[Thread.current] = true
332 333
          @mutex.sleep
333 334
        end
334 335
      ensure
335 336
        @queue_wait.delete(Thread.current)
336 337
      end
337

  
338
      @que.push obj
339
      begin
340
        t = @waiting.shift
341
        t.wakeup if t
342
      rescue ThreadError
343
        retry
344
      end
338
      push_no_sync obj
345 339
    }
346 340
  end
347 341

  
......
363 357
    @mutex.synchronize {
364 358
      if @que.length < @max
365 359
        begin
366
          t = @queue_wait.shift
360
          t, _ = @queue_wait.shift
367 361
          t.wakeup if t
368 362
        rescue ThreadError
369 363
          retry
370
- 
lib/thread.rb
200 200
  #
201 201
  alias enq push
202 202

  
203
  def pop_no_sync(non_block) # :nodoc:
204
    if non_block
205
      raise ThreadError, "queue empty" if @que.empty?
206
    else
207
      while @que.empty?
208
        @waiting[Thread.current] = true
209
        @mutex.sleep
210
      end
211
    end
212
    @que.shift
213
  ensure
214
    @waiting.delete(Thread.current)
215
  end
216
  private :pop_no_sync
203 217
  #
204 218
  # Retrieves data from the queue.  If the queue is empty, the calling thread is
205 219
  # suspended until data is pushed onto the queue.  If +non_block+ is true, the
206 220
  # thread isn't suspended, and an exception is raised.
207 221
  #
208 222
  def pop(non_block=false)
209
    @mutex.synchronize{
210
      begin
211
        while true
212
          if @que.empty?
213
            raise ThreadError, "queue empty" if non_block
214
            @waiting[Thread.current] = true
215
            @mutex.sleep
216
          else
217
            return @que.shift
218
          end
219
        end
220
      ensure
221
        @waiting.delete(Thread.current)
222
      end
223
    }
223
    @mutex.synchronize{ pop_no_sync(non_block) }
224 224
  end
225 225

  
226 226
  #
......
293 293
    @max
294 294
  end
295 295

  
296
  def wakeup_queue_waiter # :nodoc:
297
    t, _ = @queue_wait.shift
298
    t.wakeup if t
299
  rescue ThreadError
300
    retry
301
  end
302
  private :wakeup_queue_waiter
303

  
296 304
  #
297 305
  # Sets the maximum size of the queue.
298 306
  #
299 307
  def max=(max)
300 308
    raise ArgumentError, "queue size must be positive" unless max > 0
301
    diff = nil
302 309
    @mutex.synchronize {
303
      if max <= @max
304
        @max = max
305
      else
306
        diff = max - @max
307
        @max = max
310
      diff = max - @max
311
      @max = max
312
      if diff > 0
313
        diff.times { wakeup_queue_waiter }
308 314
      end
309 315
    }
310
    if diff
311
      diff.times do
312
        begin
313
          t, _ = @queue_wait.shift
314
          t.run if t
315
        rescue ThreadError
316
          retry
317
        end
318
      end
319
    end
320 316
    max
321 317
  end
322 318

  
......
327 323
  def push(obj)
328 324
    @mutex.synchronize{
329 325
      begin
330
        while true
331
          break if @que.length < @max
326
        while @que.length >= @max
332 327
          @queue_wait[Thread.current] = true
333 328
          @mutex.sleep
334 329
        end
......
352 347
  #
353 348
  # Retrieves data from the queue and runs a waiting thread, if any.
354 349
  #
355
  def pop(*args)
356
    retval = super
350
  def pop(non_block=false)
357 351
    @mutex.synchronize {
358
      if @que.length < @max
359
        begin
360
          t, _ = @queue_wait.shift
361
          t.wakeup if t
362
        rescue ThreadError
363
          retry
364
        end
365
      end
352
      retval = pop_no_sync(non_block)
353
      wakeup_queue_waiter if @que.length < @max
354
      retval
366 355
    }
367
    retval
368 356
  end
369 357

  
370 358
  #
371
-