patch3.diff

Masaki Matsushita, 09/05/2013 08:52 PM

Download (23 KB)

View differences:

common.mk
123 123
		--make-flags="$(MAKEFLAGS)"
124 124
EXTMK_ARGS    =	$(SCRIPT_ARGS) --extension $(EXTS) --extstatic $(EXTSTATIC) \
125 125
		--make-flags="V=$(V) MINIRUBY='$(MINIRUBY)'" --
126
INSTRUBY      =	$(SUDO) $(MINIRUBY) $(srcdir)/tool/rbinstall.rb
126
INSTRUBY      =	$(SUDO) $(RUNRUBY) -r./$(arch)-fake $(srcdir)/tool/rbinstall.rb
127 127
INSTRUBY_ARGS =	$(SCRIPT_ARGS) \
128 128
		--data-mode=$(INSTALL_DATA_MODE) \
129 129
		--prog-mode=$(INSTALL_PROG_MODE) \
......
449 449

  
450 450
CLEAR_INSTALLED_LIST = clear-installed-list
451 451

  
452
install-prereq: $(CLEAR_INSTALLED_LIST) PHONY
452
install-prereq: $(CLEAR_INSTALLED_LIST) yes-fake PHONY
453 453

  
454 454
clear-installed-list: PHONY
455 455
	@> $(INSTALLED_LIST) set MAKE="$(MAKE)"
ext/thread/extconf.rb
1
require 'mkmf'
2

  
3
create_makefile('thread')
ext/thread/thread.c
1
#include <ruby.h>
2

  
3
enum {
4
    CONDVAR_WAITERS = 0
5
};
6

  
7
enum {
8
    QUEUE_QUE       = 0,
9
    QUEUE_WAITERS   = 1,
10
    SZQUEUE_WAITERS = 2,
11
    SZQUEUE_MAX     = 3
12
};
13

  
14
#define GET_CONDVAR_WAITERS(cv) RSTRUCT_GET((cv), CONDVAR_WAITERS)
15

  
16
#define GET_QUEUE_QUE(q)        RSTRUCT_GET((q), QUEUE_QUE)
17
#define GET_QUEUE_WAITERS(q)    RSTRUCT_GET((q), QUEUE_WAITERS)
18
#define GET_SZQUEUE_WAITERS(q)  RSTRUCT_GET((q), SZQUEUE_WAITERS)
19
#define GET_SZQUEUE_MAX(q)      RSTRUCT_GET((q), SZQUEUE_MAX)
20
#define GET_SZQUEUE_ULONGMAX(q) NUM2ULONG(GET_SZQUEUE_MAX(q))
21

  
22
static VALUE
23
ary_buf_new(void)
24
{
25
    return rb_ary_tmp_new(1);
26
}
27

  
28
static void
29
wakeup_first_thread(VALUE list)
30
{
31
    VALUE thread;
32

  
33
    while (!NIL_P(thread = rb_ary_shift(list))) {
34
	if (RTEST(rb_thread_wakeup_alive(thread))) break;
35
    }
36
}
37

  
38
static void
39
wakeup_all_threads(VALUE list)
40
{
41
    VALUE thread;
42
    long i;
43

  
44
    for (i=0; i<RARRAY_LEN(list); i++) {
45
	thread = RARRAY_AREF(list, i);
46
	rb_thread_wakeup_alive(thread);
47
    }
48
    rb_ary_clear(list);
49
}
50

  
51
/*
52
 *  Document-class: ConditionVariable
53
 *
54
 *  ConditionVariable objects augment class Mutex. Using condition variables,
55
 *  it is possible to suspend while in the middle of a critical section until a
56
 *  resource becomes available.
57
 *
58
 *  Example:
59
 *
60
 *    require 'thread'
61
 *
62
 *    mutex = Mutex.new
63
 *    resource = ConditionVariable.new
64
 *
65
 *    a = Thread.new {
66
 *	 mutex.synchronize {
67
 *	   # Thread 'a' now needs the resource
68
 *	   resource.wait(mutex)
69
 *	   # 'a' can now have the resource
70
 *	 }
71
 *    }
72
 *
73
 *    b = Thread.new {
74
 *	 mutex.synchronize {
75
 *	   # Thread 'b' has finished using the resource
76
 *	   resource.signal
77
 *	 }
78
 *    }
79
 */
80

  
81
/*
82
 * Document-method: new
83
 * call-seq: new
84
 *
85
 * Creates a new condvar.
86
 */
87

  
88
static VALUE
89
rb_condvar_initialize(VALUE self)
90
{
91
    RSTRUCT_SET(self, CONDVAR_WAITERS, ary_buf_new());
92
    return self;
93
}
94

  
95
struct sleep_call {
96
    VALUE mutex;
97
    VALUE timeout;
98
};
99

  
100
static ID id_sleep;
101

  
102
static VALUE
103
do_sleep(VALUE args)
104
{
105
    struct sleep_call *p = (struct sleep_call *)args;
106
    return rb_funcall2(p->mutex, id_sleep, 1, &p->timeout);
107
}
108

  
109
static VALUE
110
delete_current_thread(VALUE ary)
111
{
112
    return rb_ary_delete(ary, rb_thread_current());
113
}
114

  
115
/*
116
 * Document-method: wait
117
 * call-seq: wait(mutex, timeout=nil)
118
 *
119
 * Releases the lock held in +mutex+ and waits; reacquires the lock on wakeup.
120
 *
121
 * If +timeout+ is given, this method returns after +timeout+ seconds passed,
122
 * even if no other thread doesn't signal.
123
 */
124

  
125
static VALUE
126
rb_condvar_wait(int argc, VALUE *argv, VALUE self)
127
{
128
    VALUE waiters = GET_CONDVAR_WAITERS(self);
129
    VALUE mutex, timeout;
130
    struct sleep_call args;
131

  
132
    rb_scan_args(argc, argv, "11", &mutex, &timeout);
133

  
134
    args.mutex   = mutex;
135
    args.timeout = timeout;
136
    rb_ary_push(waiters, rb_thread_current());
137
    rb_ensure(do_sleep, (VALUE)&args, delete_current_thread, waiters);
138

  
139
    return self;
140
}
141

  
142
/*
143
 * Document-method: signal
144
 * call-seq: signal
145
 *
146
 * Wakes up the first thread in line waiting for this lock.
147
 */
148

  
149
static VALUE
150
rb_condvar_signal(VALUE self)
151
{
152
    wakeup_first_thread(GET_CONDVAR_WAITERS(self));
153
    return self;
154
}
155

  
156
/*
157
 * Document-method: broadcast
158
 * call-seq: broadcast
159
 *
160
 * Wakes up all threads waiting for this lock.
161
 */
162

  
163
static VALUE
164
rb_condvar_broadcast(VALUE self)
165
{
166
    wakeup_all_threads(GET_CONDVAR_WAITERS(self));
167
    return self;
168
}
169

  
170
/*
171
 *  Document-class: Queue
172
 *
173
 *  This class provides a way to synchronize communication between threads.
174
 *
175
 *  Example:
176
 *
177
 *    require 'thread'
178
 *    queue = Queue.new
179
 *
180
 *  producer = Thread.new do
181
 *    5.times do |i|
182
 *	 sleep rand(i) # simulate expense
183
 *	 queue << i
184
 *	 puts "#{i} produced"
185
 *    end
186
 *  end
187
 *
188
 *  consumer = Thread.new do
189
 *    5.times do |i|
190
 *	 value = queue.pop
191
 *	 sleep rand(i/2) # simulate expense
192
 *	 puts "consumed #{value}"
193
 *    end
194
 *  end
195
 *
196
 */
197

  
198
/*
199
 * Document-method: new
200
 * call-seq: new
201
 *
202
 * Creates a new queue.
203
 */
204

  
205
static VALUE
206
rb_queue_initialize(VALUE self)
207
{
208
    RSTRUCT_SET(self, QUEUE_QUE, ary_buf_new());
209
    RSTRUCT_SET(self, QUEUE_WAITERS, ary_buf_new());
210
    return self;
211
}
212

  
213
static VALUE
214
queue_do_push(VALUE self, VALUE obj)
215
{
216
    rb_ary_push(GET_QUEUE_QUE(self), obj);
217
    wakeup_first_thread(GET_QUEUE_WAITERS(self));
218
    return self;
219
}
220

  
221
/*
222
 * Document-method: push
223
 * call-seq: push(obj)
224
 *
225
 * Pushes +obj+ to the queue.
226
 */
227

  
228
static VALUE
229
rb_queue_push(VALUE self, VALUE obj)
230
{
231
    return queue_do_push(self, obj);
232
}
233

  
234
static unsigned long
235
queue_length(VALUE self)
236
{
237
    return RARRAY_LEN(GET_QUEUE_QUE(self));
238
}
239

  
240
static unsigned long
241
queue_num_waiting(VALUE self)
242
{
243
    return RARRAY_LEN(GET_QUEUE_WAITERS(self));
244
}
245

  
246
struct waiting_delete {
247
    VALUE waiting;
248
    VALUE th;
249
};
250

  
251
static VALUE
252
queue_delete_from_waiting(struct waiting_delete *p)
253
{
254
    rb_ary_delete(p->waiting, p->th);
255
    return Qnil;
256
}
257

  
258
static VALUE
259
queue_do_pop(VALUE self, VALUE should_block)
260
{
261
    struct waiting_delete args;
262
    args.waiting = GET_QUEUE_WAITERS(self);
263
    args.th	 = rb_thread_current();
264

  
265
    while (queue_length(self) == 0) {
266
	if (!(int)should_block) {
267
	    rb_raise(rb_eThreadError, "queue empty");
268
	}
269
	rb_ary_push(args.waiting, args.th);
270
	rb_ensure((VALUE (*)())rb_thread_sleep_deadly, (VALUE)0, queue_delete_from_waiting, (VALUE)&args);
271
    }
272

  
273
    return rb_ary_shift(GET_QUEUE_QUE(self));
274
}
275

  
276
static VALUE
277
queue_pop_should_block(int argc, VALUE *argv)
278
{
279
    VALUE should_block = Qtrue;
280
    switch (argc) {
281
      case 0:
282
	break;
283
      case 1:
284
	should_block = RTEST(argv[0]) ? Qtrue : Qfalse;
285
	break;
286
      default:
287
	rb_raise(rb_eArgError, "wrong number of arguments (%d for 1)", argc);
288
    }
289
    return should_block;
290
}
291

  
292
/*
293
 * Document-method: pop
294
 * call_seq: pop(non_block=false)
295
 *
296
 * Retrieves data from the queue.	If the queue is empty, the calling thread is
297
 * suspended until data is pushed onto the queue.  If +non_block+ is true, the
298
 * thread isn't suspended, and an exception is raised.
299
 */
300

  
301
static VALUE
302
rb_queue_pop(int argc, VALUE *argv, VALUE self)
303
{
304
    VALUE should_block = queue_pop_should_block(argc, argv);
305
    return queue_do_pop(self, should_block);
306
}
307

  
308
/*
309
 * Document-method: empty?
310
 * call-seq: empty?
311
 *
312
 * Returns +true+ if the queue is empty.
313
 */
314

  
315
static VALUE
316
rb_queue_empty_p(VALUE self)
317
{
318
    return queue_length(self) == 0 ? Qtrue : Qfalse;
319
}
320

  
321
/*
322
 * Document-method: clear
323
 * call-seq: clear
324
 *
325
 * Removes all objects from the queue.
326
 */
327

  
328
static VALUE
329
rb_queue_clear(VALUE self)
330
{
331
    rb_ary_clear(GET_QUEUE_QUE(self));
332
    return self;
333
}
334

  
335
/*
336
 * Document-method: length
337
 * call-seq: length
338
 *
339
 * Returns the length of the queue.
340
 */
341

  
342
static VALUE
343
rb_queue_length(VALUE self)
344
{
345
    unsigned long len = queue_length(self);
346
    return ULONG2NUM(len);
347
}
348

  
349
/*
350
 * Document-method: num_waiting
351
 * call-seq: num_waiting
352
 *
353
 * Returns the number of threads waiting on the queue.
354
 */
355

  
356
static VALUE
357
rb_queue_num_waiting(VALUE self)
358
{
359
    unsigned long len = queue_num_waiting(self);
360
    return ULONG2NUM(len);
361
}
362

  
363
/*
364
 *  Document-class: SizedQueue
365
 *
366
 * This class represents queues of specified size capacity.  The push operation
367
 * may be blocked if the capacity is full.
368
 *
369
 * See Queue for an example of how a SizedQueue works.
370
 */
371

  
372
/*
373
 * Document-method: new
374
 * call-seq: new(max)
375
 *
376
 * Creates a fixed-length queue with a maximum size of +max+.
377
 */
378

  
379
static VALUE
380
rb_szqueue_initialize(VALUE self, VALUE vmax)
381
{
382
    long max;
383

  
384
    max = NUM2LONG(vmax);
385
    if (max <= 0) {
386
	rb_raise(rb_eArgError, "queue size must be positive");
387
    }
388

  
389
    RSTRUCT_SET(self, QUEUE_QUE, ary_buf_new());
390
    RSTRUCT_SET(self, QUEUE_WAITERS, ary_buf_new());
391
    RSTRUCT_SET(self, SZQUEUE_WAITERS, ary_buf_new());
392
    RSTRUCT_SET(self, SZQUEUE_MAX, vmax);
393

  
394
    return self;
395
}
396

  
397
/*
398
 * Document-method: max
399
 * call-seq: max
400
 *
401
 * Returns the maximum size of the queue.
402
 */
403

  
404
static VALUE
405
rb_szqueue_max_get(VALUE self)
406
{
407
    return GET_SZQUEUE_MAX(self);
408
}
409

  
410
/*
411
 * Document-method: max=
412
 * call-seq: max=(n)
413
 *
414
 * Sets the maximum size of the queue.
415
 */
416

  
417
static VALUE
418
rb_szqueue_max_set(VALUE self, VALUE vmax)
419
{
420
    long max = NUM2LONG(vmax), diff = 0;
421
    VALUE t;
422

  
423
    if (max <= 0) {
424
	rb_raise(rb_eArgError, "queue size must be positive");
425
    }
426
    if ((unsigned long)max > GET_SZQUEUE_ULONGMAX(self)) {
427
	diff = max - GET_SZQUEUE_ULONGMAX(self);
428
    }
429
    RSTRUCT_SET(self, SZQUEUE_MAX, vmax);
430
    while (diff > 0 && !NIL_P(t = rb_ary_shift(GET_QUEUE_QUE(self)))) {
431
	rb_thread_wakeup_alive(t);
432
    }
433
    return vmax;
434
}
435

  
436
/*
437
 * Document-method: push
438
 * call-seq: push(obj)
439
 *
440
 * Pushes +obj+ to the queue.  If there is no space left in the queue, waits
441
 * until space becomes available.
442
 */
443

  
444
static VALUE
445
rb_szqueue_push(VALUE self, VALUE obj)
446
{
447
    struct waiting_delete args;
448
    args.waiting = GET_QUEUE_WAITERS(self);
449
    args.th      = rb_thread_current();
450

  
451
    while (queue_length(self) >= GET_SZQUEUE_ULONGMAX(self)) {
452
	rb_ary_push(args.waiting, args.th);
453
	rb_ensure((VALUE (*)())rb_thread_sleep_deadly, (VALUE)0, queue_delete_from_waiting, (VALUE)&args);
454
    }
455
    return queue_do_push(self, obj);
456
}
457

  
458
static VALUE
459
szqueue_do_pop(VALUE self, VALUE should_block)
460
{
461
    VALUE retval = queue_do_pop(self, should_block);
462

  
463
    if (queue_length(self) < GET_SZQUEUE_ULONGMAX(self)) {
464
	wakeup_first_thread(GET_SZQUEUE_WAITERS(self));
465
    }
466

  
467
    return retval;
468
}
469

  
470
/*
471
 * Document-method: pop
472
 * call_seq: pop(non_block=false)
473
 *
474
 * Returns the number of threads waiting on the queue.
475
 */
476

  
477
static VALUE
478
rb_szqueue_pop(int argc, VALUE *argv, VALUE self)
479
{
480
    VALUE should_block = queue_pop_should_block(argc, argv);
481
    return szqueue_do_pop(self, should_block);
482
}
483

  
484
/*
485
 * Document-method: pop
486
 * call_seq: pop(non_block=false)
487
 *
488
 * Returns the number of threads waiting on the queue.
489
 */
490

  
491
static VALUE
492
rb_szqueue_num_waiting(VALUE self)
493
{
494
    long len = queue_num_waiting(self);
495
    len += RARRAY_LEN(GET_SZQUEUE_WAITERS(self));
496
    return ULONG2NUM(len);
497
}
498

  
499
#ifndef UNDER_THREAD
500
#define UNDER_THREAD 1
501
#endif
502

  
503
void
504
Init_thread(void)
505
{
506
#if UNDER_THREAD
507
#define ALIAS_GLOBCAL_CONST(name) do {	              \
508
	ID id = rb_intern_const(#name);	              \
509
	if (!rb_const_defined_at(rb_cObject, id)) {   \
510
	    rb_const_set(rb_cObject, id, rb_c##name); \
511
	}                                             \
512
    } while (0)
513
#else
514
#define ALIAS_GLOBCAL_CONST(name) do { /* nothing */ } while (0)
515
#endif
516

  
517
    VALUE rb_cConditionVariable = rb_struct_define_without_accessor_under(
518
	UNDER_THREAD ? rb_cThread : 0,
519
	"ConditionVariable", rb_cObject, rb_struct_alloc_noinit,
520
	"waiters", NULL);
521
    VALUE rb_cQueue = rb_struct_define_without_accessor_under(
522
	UNDER_THREAD ? rb_cThread : 0,
523
	"Queue", rb_cObject, rb_struct_alloc_noinit,
524
	"que", "waiters", NULL);
525
    VALUE rb_cSizedQueue = rb_struct_define_without_accessor_under(
526
	UNDER_THREAD ? rb_cThread : 0,
527
	"SizedQueue", rb_cQueue, rb_struct_alloc_noinit,
528
	"que", "waiters", "queue_waiters", "size", NULL);
529

  
530
    id_sleep = rb_intern("sleep");
531

  
532
    rb_define_method(rb_cConditionVariable, "initialize", rb_condvar_initialize, 0);
533
    rb_define_method(rb_cConditionVariable, "wait", rb_condvar_wait, -1);
534
    rb_define_method(rb_cConditionVariable, "signal", rb_condvar_signal, 0);
535
    rb_define_method(rb_cConditionVariable, "broadcast", rb_condvar_broadcast, 0);
536

  
537
    rb_define_method(rb_cQueue, "initialize", rb_queue_initialize, 0);
538
    rb_define_method(rb_cQueue, "push", rb_queue_push, 1);
539
    rb_define_method(rb_cQueue, "pop", rb_queue_pop, -1);
540
    rb_define_method(rb_cQueue, "empty?", rb_queue_empty_p, 0);
541
    rb_define_method(rb_cQueue, "clear", rb_queue_clear, 0);
542
    rb_define_method(rb_cQueue, "length", rb_queue_length, 0);
543
    rb_define_method(rb_cQueue, "num_waiting", rb_queue_num_waiting, 0);
544

  
545
    rb_alias(rb_cQueue, rb_intern("enq"), rb_intern("push"));
546
    rb_alias(rb_cQueue, rb_intern("<<"), rb_intern("push"));
547
    rb_alias(rb_cQueue, rb_intern("deq"), rb_intern("pop"));
548
    rb_alias(rb_cQueue, rb_intern("shift"), rb_intern("pop"));
549
    rb_alias(rb_cQueue, rb_intern("size"), rb_intern("length"));
550

  
551
    rb_define_method(rb_cSizedQueue, "initialize", rb_szqueue_initialize, 1);
552
    rb_define_method(rb_cSizedQueue, "max", rb_szqueue_max_get, 0);
553
    rb_define_method(rb_cSizedQueue, "max=", rb_szqueue_max_set, 1);
554
    rb_define_method(rb_cSizedQueue, "push", rb_szqueue_push, 1);
555
    rb_define_method(rb_cSizedQueue, "pop", rb_szqueue_pop, -1);
556
    rb_define_method(rb_cSizedQueue, "num_waiting", rb_szqueue_num_waiting, 0);
557
    rb_alias(rb_cSizedQueue, rb_intern("enq"), rb_intern("push"));
558
    rb_alias(rb_cSizedQueue, rb_intern("<<"), rb_intern("push"));
559
    rb_alias(rb_cSizedQueue, rb_intern("deq"), rb_intern("pop"));
560
    rb_alias(rb_cSizedQueue, rb_intern("shift"), rb_intern("pop"));
561

  
562
    rb_provide("thread.rb");
563
    ALIAS_GLOBCAL_CONST(ConditionVariable);
564
    ALIAS_GLOBCAL_CONST(Queue);
565
    ALIAS_GLOBCAL_CONST(SizedQueue);
566
}
include/ruby/intern.h
428 428
DEPRECATED(void rb_thread_polling(void));
429 429
void rb_thread_sleep(int);
430 430
void rb_thread_sleep_forever(void);
431
void rb_thread_sleep_deadly(void);
431 432
VALUE rb_thread_stop(void);
432 433
VALUE rb_thread_wakeup(VALUE);
433 434
VALUE rb_thread_wakeup_alive(VALUE);
lib/thread.rb
1
#
2
#               thread.rb - thread support classes
3
#                       by Yukihiro Matsumoto <matz@netlab.co.jp>
4
#
5
# Copyright (C) 2001  Yukihiro Matsumoto
6
# Copyright (C) 2000  Network Applied Communication Laboratory, Inc.
7
# Copyright (C) 2000  Information-technology Promotion Agency, Japan
8
#
9

  
10
unless defined? Thread
11
  raise "Thread not available for this ruby interpreter"
12
end
13

  
14
unless defined? ThreadError
15
  class ThreadError < StandardError
16
  end
17
end
18

  
19
if $DEBUG
20
  Thread.abort_on_exception = true
21
end
22

  
23
#
24
# ConditionVariable objects augment class Mutex. Using condition variables,
25
# it is possible to suspend while in the middle of a critical section until a
26
# resource becomes available.
27
#
28
# Example:
29
#
30
#   require 'thread'
31
#
32
#   mutex = Mutex.new
33
#   resource = ConditionVariable.new
34
#
35
#   a = Thread.new {
36
#     mutex.synchronize {
37
#       # Thread 'a' now needs the resource
38
#       resource.wait(mutex)
39
#       # 'a' can now have the resource
40
#     }
41
#   }
42
#
43
#   b = Thread.new {
44
#     mutex.synchronize {
45
#       # Thread 'b' has finished using the resource
46
#       resource.signal
47
#     }
48
#   }
49
#
50
class ConditionVariable
51
  #
52
  # Creates a new ConditionVariable
53
  #
54
  def initialize
55
    @waiters = {}
56
    @waiters_mutex = Mutex.new
57
  end
58

  
59
  #
60
  # Releases the lock held in +mutex+ and waits; reacquires the lock on wakeup.
61
  #
62
  # If +timeout+ is given, this method returns after +timeout+ seconds passed,
63
  # even if no other thread has signaled.
64
  #
65
  def wait(mutex, timeout=nil)
66
    Thread.handle_interrupt(StandardError => :never) do
67
      begin
68
        Thread.handle_interrupt(StandardError => :on_blocking) do
69
          @waiters_mutex.synchronize do
70
            @waiters[Thread.current] = true
71
          end
72
          mutex.sleep timeout
73
        end
74
      ensure
75
        @waiters_mutex.synchronize do
76
          @waiters.delete(Thread.current)
77
        end
78
      end
79
    end
80
    self
81
  end
82

  
83
  #
84
  # Wakes up the first thread in line waiting for this lock.
85
  #
86
  def signal
87
    Thread.handle_interrupt(StandardError => :on_blocking) do
88
      begin
89
        t, _ = @waiters_mutex.synchronize { @waiters.shift }
90
        t.run if t
91
      rescue ThreadError
92
        retry # t was already dead?
93
      end
94
    end
95
    self
96
  end
97

  
98
  #
99
  # Wakes up all threads waiting for this lock.
100
  #
101
  def broadcast
102
    Thread.handle_interrupt(StandardError => :on_blocking) do
103
      threads = nil
104
      @waiters_mutex.synchronize do
105
        threads = @waiters.keys
106
        @waiters.clear
107
      end
108
      for t in threads
109
        begin
110
          t.run
111
        rescue ThreadError
112
        end
113
      end
114
    end
115
    self
116
  end
117
end
118

  
119
#
120
# This class provides a way to synchronize communication between threads.
121
#
122
# Example:
123
#
124
#   require 'thread'
125
#
126
#   queue = Queue.new
127
#
128
#   producer = Thread.new do
129
#     5.times do |i|
130
#       sleep rand(i) # simulate expense
131
#       queue << i
132
#       puts "#{i} produced"
133
#     end
134
#   end
135
#
136
#   consumer = Thread.new do
137
#     5.times do |i|
138
#       value = queue.pop
139
#       sleep rand(i/2) # simulate expense
140
#       puts "consumed #{value}"
141
#     end
142
#   end
143
#
144
#   consumer.join
145
#
146
class Queue
147
  #
148
  # Creates a new queue.
149
  #
150
  def initialize
151
    @que = []
152
    @que.taint          # enable tainted communication
153
    @num_waiting = 0
154
    self.taint
155
    @mutex = Mutex.new
156
    @cond = ConditionVariable.new
157
  end
158

  
159
  #
160
  # Pushes +obj+ to the queue.
161
  #
162
  def push(obj)
163
    Thread.handle_interrupt(StandardError => :on_blocking) do
164
      @mutex.synchronize do
165
        @que.push obj
166
        @cond.signal
167
      end
168
      self
169
    end
170
  end
171

  
172
  #
173
  # Alias of push
174
  #
175
  alias << push
176

  
177
  #
178
  # Alias of push
179
  #
180
  alias enq push
181

  
182
  #
183
  # Retrieves data from the queue.  If the queue is empty, the calling thread is
184
  # suspended until data is pushed onto the queue.  If +non_block+ is true, the
185
  # thread isn't suspended, and an exception is raised.
186
  #
187
  def pop(non_block=false)
188
    Thread.handle_interrupt(StandardError => :on_blocking) do
189
      @mutex.synchronize do
190
        while true
191
          if @que.empty?
192
            if non_block
193
              raise ThreadError, "queue empty"
194
            else
195
              begin
196
                @num_waiting += 1
197
                @cond.wait @mutex
198
              ensure
199
                @num_waiting -= 1
200
              end
201
            end
202
          else
203
            return @que.shift
204
          end
205
        end
206
      end
207
    end
208
  end
209

  
210
  #
211
  # Alias of pop
212
  #
213
  alias shift pop
214

  
215
  #
216
  # Alias of pop
217
  #
218
  alias deq pop
219

  
220
  #
221
  # Returns +true+ if the queue is empty.
222
  #
223
  def empty?
224
    @que.empty?
225
  end
226

  
227
  #
228
  # Removes all objects from the queue.
229
  #
230
  def clear
231
    @que.clear
232
    self
233
  end
234

  
235
  #
236
  # Returns the length of the queue.
237
  #
238
  def length
239
    @que.length
240
  end
241

  
242
  #
243
  # Alias of length.
244
  #
245
  alias size length
246

  
247
  #
248
  # Returns the number of threads waiting on the queue.
249
  #
250
  def num_waiting
251
    @num_waiting
252
  end
253
end
254

  
255
#
256
# This class represents queues of specified size capacity.  The push operation
257
# may be blocked if the capacity is full.
258
#
259
# See Queue for an example of how a SizedQueue works.
260
#
261
class SizedQueue < Queue
262
  #
263
  # Creates a fixed-length queue with a maximum size of +max+.
264
  #
265
  def initialize(max)
266
    raise ArgumentError, "queue size must be positive" unless max > 0
267
    @max = max
268
    @enque_cond = ConditionVariable.new
269
    @num_enqueue_waiting = 0
270
    super()
271
  end
272

  
273
  #
274
  # Returns the maximum size of the queue.
275
  #
276
  def max
277
    @max
278
  end
279

  
280
  #
281
  # Sets the maximum size of the queue.
282
  #
283
  def max=(max)
284
    raise ArgumentError, "queue size must be positive" unless max > 0
285

  
286
    @mutex.synchronize do
287
      if max <= @max
288
        @max = max
289
      else
290
        diff = max - @max
291
        @max = max
292
        diff.times do
293
          @enque_cond.signal
294
        end
295
      end
296
    end
297
    max
298
  end
299

  
300
  #
301
  # Pushes +obj+ to the queue.  If there is no space left in the queue, waits
302
  # until space becomes available.
303
  #
304
  def push(obj)
305
    Thread.handle_interrupt(RuntimeError => :on_blocking) do
306
      @mutex.synchronize do
307
        while true
308
          break if @que.length < @max
309
          @num_enqueue_waiting += 1
310
          begin
311
            @enque_cond.wait @mutex
312
          ensure
313
            @num_enqueue_waiting -= 1
314
          end
315
        end
316

  
317
        @que.push obj
318
        @cond.signal
319
      end
320
      self
321
    end
322
  end
323

  
324
  #
325
  # Alias of push
326
  #
327
  alias << push
328

  
329
  #
330
  # Alias of push
331
  #
332
  alias enq push
333

  
334
  #
335
  # Retrieves data from the queue and runs a waiting thread, if any.
336
  #
337
  def pop(*args)
338
    retval = super
339
    @mutex.synchronize do
340
      if @que.length < @max
341
        @enque_cond.signal
342
      end
343
    end
344
    retval
345
  end
346

  
347
  #
348
  # Alias of pop
349
  #
350
  alias shift pop
351

  
352
  #
353
  # Alias of pop
354
  #
355
  alias deq pop
356

  
357
  #
358
  # Returns the number of threads waiting on the queue.
359
  #
360
  def num_waiting
361
    @num_waiting + @num_enqueue_waiting
362
  end
363
end
364

  
365
# Documentation comments:
366
#  - How do you make RDoc inherit documentation from superclass?
test/thread/test_queue.rb
1 1
require 'test/unit'
2 2
require 'thread'
3 3
require 'tmpdir'
4
require 'timeout'
4 5
require_relative '../ruby/envutil'
5 6

  
6 7
class TestQueue < Test::Unit::TestCase
......
133 134
    assert_same q, retval
134 135
  end
135 136

  
137
  def test_queue_thread_raise
138
    q = Queue.new
139
    th1 = Thread.new do
140
      begin
141
        q.pop
142
      rescue RuntimeError
143
        sleep
144
      end
145
    end
146
    th2 = Thread.new do
147
      sleep 0.1
148
      q.pop
149
    end
150
    sleep 0.1
151
    th1.raise
152
    sleep 0.1
153
    q << :s
154
    assert_nothing_raised(TimeoutError) do
155
      timeout(1) { th2.join }
156
    end
157
  end
136 158
end
thread.c
1058 1058
    sleep_forever(GET_THREAD(), 0, 1);
1059 1059
}
1060 1060

  
1061
static void
1061
void
1062 1062
rb_thread_sleep_deadly(void)
1063 1063
{
1064 1064
    thread_debug("rb_thread_sleep_deadly\n");