patch.diff

bug fixed final_queue_without_mutex.diff - Masaki Matsushita, 05/17/2013 10:59 PM

Download (24.3 KB)

View differences:

ext/thread/extconf.rb
1
require 'mkmf'
2

  
3
create_makefile('thread')
ext/thread/thread.c
1
#include <ruby.h>
2

  
3
RUBY_EXTERN size_t rb_objspace_data_type_memsize(VALUE);
4
RUBY_EXTERN size_t rb_ary_memsize(VALUE);
5

  
6
static VALUE
7
rb_ary_buf_new(void)
8
{
9
    VALUE ary = rb_ary_tmp_new(1);
10
    OBJ_UNTRUST(ary);
11
    return ary;
12
}
13

  
14
static void
15
wakeup_first_thread(VALUE list)
16
{
17
    VALUE thread;
18

  
19
    while (!NIL_P(thread = rb_ary_shift(list))) {
20
	if (RTEST(rb_thread_wakeup_alive(thread))) break;
21
    }
22
}
23

  
24
static void
25
wakeup_all_threads(VALUE list)
26
{
27
    VALUE thread, list0 = list;
28
    long i;
29

  
30
    list = rb_ary_subseq(list, 0, LONG_MAX);
31
    rb_ary_clear(list0);
32
    for (i = 0; i < RARRAY_LEN(list); ++i) {
33
	thread = RARRAY_PTR(list)[i];
34
	rb_thread_wakeup_alive(thread);
35
    }
36
    RB_GC_GUARD(list);
37
}
38

  
39
/*
40
 *  Document-class: ConditionVariable
41
 *
42
 *  ConditionVariable objects augment class Mutex. Using condition variables,
43
 *  it is possible to suspend while in the middle of a critical section until a
44
 *  resource becomes available.
45
 *
46
 *  Example:
47
 *
48
 *    require 'thread'
49
 *
50
 *    mutex = Mutex.new
51
 *    resource = ConditionVariable.new
52
 *
53
 *    a = Thread.new {
54
 *	 mutex.synchronize {
55
 *	   # Thread 'a' now needs the resource
56
 *	   resource.wait(mutex)
57
 *	   # 'a' can now have the resource
58
 *	 }
59
 *    }
60
 *
61
 *    b = Thread.new {
62
 *	 mutex.synchronize {
63
 *	   # Thread 'b' has finished using the resource
64
 *	   resource.signal
65
 *	 }
66
 *    }
67
 */
68

  
69
typedef struct {
70
    VALUE waiters;
71
} CondVar;
72

  
73
static void
74
condvar_mark(void *ptr)
75
{
76
    CondVar *condvar = ptr;
77
    rb_gc_mark(condvar->waiters);
78
}
79

  
80
#define condvar_free RUBY_TYPED_DEFAULT_FREE
81

  
82
static size_t
83
condvar_memsize(const void *ptr)
84
{
85
    size_t size = 0;
86
    if (ptr) {
87
	const CondVar *condvar = ptr;
88
	size = sizeof(CondVar);
89
	size += rb_ary_memsize(condvar->waiters);
90
    }
91
    return size;
92
}
93

  
94
static const rb_data_type_t condvar_data_type = {
95
    "condvar",
96
    {condvar_mark, condvar_free, condvar_memsize,},
97
};
98

  
99
#define GetCondVarPtr(obj, tobj) \
100
    TypedData_Get_Struct(obj, CondVar, &condvar_data_type, tobj)
101

  
102
static CondVar *
103
get_condvar_ptr(VALUE self)
104
{
105
    CondVar *condvar;
106
    GetCondVarPtr(self, condvar);
107
    if (!condvar->waiters) {
108
	rb_raise(rb_eArgError, "uninitialized CondionVariable");
109
    }
110
    return condvar;
111
}
112

  
113
static VALUE
114
condvar_alloc(VALUE klass)
115
{
116
    CondVar *condvar;
117
    return TypedData_Make_Struct(klass, CondVar, &condvar_data_type, condvar);
118
}
119

  
120
static void
121
condvar_initialize(CondVar *condvar)
122
{
123
    condvar->waiters = rb_ary_buf_new();
124
}
125

  
126
/*
127
 * Document-method: new
128
 * call-seq: new
129
 *
130
 * Creates a new condvar.
131
 */
132

  
133
static VALUE
134
rb_condvar_initialize(VALUE self)
135
{
136
    CondVar *condvar;
137
    GetCondVarPtr(self, condvar);
138

  
139
    condvar_initialize(condvar);
140

  
141
    return self;
142
}
143

  
144
struct sleep_call {
145
    int argc;
146
    VALUE *argv;
147
};
148

  
149
static VALUE
150
do_sleep(VALUE args)
151
{
152
    struct sleep_call *p = (struct sleep_call *)args;
153
    return rb_funcall(p->argv[0], rb_intern("sleep"), p->argc-1, p->argv[1]);
154
}
155

  
156
static VALUE
157
delete_current_thread(VALUE ary)
158
{
159
    return rb_ary_delete(ary, rb_thread_current());
160
}
161

  
162
/*
163
 * Document-method: wait
164
 * call-seq: wait(mutex, timeout=nil)
165
 *
166
 * Releases the lock held in +mutex+ and waits; reacquires the lock on wakeup.
167
 *
168
 * If +timeout+ is given, this method returns after +timeout+ seconds passed,
169
 * even if no other thread doesn't signal.
170
 */
171

  
172
static VALUE
173
rb_condvar_wait(int argc, VALUE *argv, VALUE self)
174
{
175
    VALUE waiters = get_condvar_ptr(self)->waiters;
176
    struct sleep_call args;
177

  
178
    args.argc = argc;
179
    args.argv = argv;
180
    rb_ary_push(waiters, rb_thread_current());
181
    rb_ensure(do_sleep, (VALUE)&args, delete_current_thread, waiters);
182
    return self;
183
}
184

  
185
/*
186
 * Document-method: signal
187
 * call-seq: signal
188
 *
189
 * Wakes up the first thread in line waiting for this lock.
190
 */
191

  
192
static VALUE
193
rb_condvar_signal(VALUE self)
194
{
195
    wakeup_first_thread(get_condvar_ptr(self)->waiters);
196
    return self;
197
}
198

  
199
/*
200
 * Document-method: broadcast
201
 * call-seq: broadcast
202
 *
203
 * Wakes up all threads waiting for this lock.
204
 */
205

  
206
static VALUE
207
rb_condvar_broadcast(VALUE self)
208
{
209
    wakeup_all_threads(get_condvar_ptr(self)->waiters);
210
    return self;
211
}
212

  
213
/*
214
 *  Document-class: Queue
215
 *
216
 *  This class provides a way to synchronize communication between threads.
217
 *
218
 *  Example:
219
 *
220
 *    require 'thread'
221
 *    queue = Queue.new
222
 *
223
 *  producer = Thread.new do
224
 *    5.times do |i|
225
 *	 sleep rand(i) # simulate expense
226
 *	 queue << i
227
 *	 puts "#{i} produced"
228
 *    end
229
 *  end
230
 *
231
 *  consumer = Thread.new do
232
 *    5.times do |i|
233
 *	 value = queue.pop
234
 *	 sleep rand(i/2) # simulate expense
235
 *	 puts "consumed #{value}"
236
 *    end
237
 *  end
238
 *
239
 */
240

  
241
typedef struct {
242
    VALUE que;
243
    VALUE waiting;
244
} Queue;
245

  
246
static void
247
queue_mark(void *ptr)
248
{
249
    Queue *queue = ptr;
250
    rb_gc_mark(queue->que);
251
    rb_gc_mark(queue->waiting);
252
}
253

  
254
#define queue_free RUBY_TYPED_DEFAULT_FREE
255

  
256
static size_t
257
queue_memsize(const void *ptr)
258
{
259
    size_t size = 0;
260
    if (ptr) {
261
	const Queue *queue = ptr;
262
	size = sizeof(Queue);
263
	size += rb_ary_memsize(queue->que);
264
	size += rb_ary_memsize(queue->waiting);
265
    }
266
    return size;
267
}
268

  
269
static const rb_data_type_t queue_data_type = {
270
    "queue",
271
    {queue_mark, queue_free, queue_memsize,},
272
};
273

  
274
#define GetQueuePtr(obj, tobj) \
275
    TypedData_Get_Struct(obj, Queue, &queue_data_type, tobj)
276

  
277
static Queue *
278
get_queue_ptr(VALUE self)
279
{
280
    Queue *queue;
281
    GetQueuePtr(self, queue);
282
    if (!queue->que || !queue->waiting) {
283
	rb_raise(rb_eArgError, "uninitialized Queue");
284
    }
285
    return queue;
286
}
287

  
288
static VALUE
289
queue_alloc(VALUE klass)
290
{
291
    Queue *queue;
292
    return TypedData_Make_Struct(klass, Queue, &queue_data_type, queue);
293
}
294

  
295
static void
296
queue_initialize(Queue *queue)
297
{
298
    queue->que = rb_ary_buf_new();
299
    queue->waiting = rb_ary_buf_new();
300
}
301

  
302
/*
303
 * Document-method: new
304
 * call-seq: new
305
 *
306
 * Creates a new queue.
307
 */
308

  
309
static VALUE
310
rb_queue_initialize(VALUE self)
311
{
312
    Queue *queue;
313
    GetQueuePtr(self, queue);
314

  
315
    queue_initialize(queue);
316

  
317
    return self;
318
}
319

  
320
static VALUE
321
queue_do_push(Queue *queue, VALUE obj)
322
{
323
    rb_ary_push(queue->que, obj);
324
    wakeup_first_thread(queue->waiting);
325
    return Qnil;
326
}
327

  
328
/*
329
 * Document-method: push
330
 * call-seq: push(obj)
331
 *
332
 * Pushes +obj+ to the queue.
333
 */
334

  
335
static VALUE
336
rb_queue_push(VALUE self, VALUE obj)
337
{
338
    queue_do_push(get_queue_ptr(self), obj);
339
    return self;
340
}
341

  
342
struct waiting_delete {
343
    VALUE waiting;
344
    VALUE th;
345
};
346

  
347
static VALUE
348
queue_delete_from_waiting(struct waiting_delete *p)
349
{
350
    rb_ary_delete(p->waiting, p->th);
351
    return Qnil;
352
}
353

  
354
static VALUE
355
queue_do_pop(Queue *queue, VALUE should_block)
356
{
357
    struct waiting_delete args;
358

  
359
    while (RARRAY_LEN(queue->que) == 0) {
360
	if (!(int)should_block) {
361
	    rb_raise(rb_eThreadError, "queue empty");
362
	}
363
	args.waiting = queue->waiting;
364
	args.th	     = rb_thread_current();
365
	rb_ary_push(args.waiting, args.th);
366
	rb_ensure((VALUE (*)())rb_thread_sleep_forever, (VALUE)0, queue_delete_from_waiting, (VALUE)&args);
367
    }
368

  
369
    return rb_ary_shift(queue->que);
370
}
371

  
372
static int
373
queue_pop_should_block(int argc, VALUE *argv)
374
{
375
    int should_block = 1;
376
    switch (argc) {
377
      case 0:
378
	break;
379
      case 1:
380
	should_block = !RTEST(argv[0]);
381
	break;
382
      default:
383
	rb_raise(rb_eArgError, "wrong number of arguments (%d for 1)", argc);
384
    }
385
    return should_block;
386
}
387

  
388
/*
389
 * Document-method: pop
390
 * call_seq: pop(non_block=false)
391
 *
392
 * Retrieves data from the queue.	If the queue is empty, the calling thread is
393
 * suspended until data is pushed onto the queue.  If +non_block+ is true, the
394
 * thread isn't suspended, and an exception is raised.
395
 */
396

  
397
static VALUE
398
rb_queue_pop(int argc, VALUE *argv, VALUE self)
399
{
400
    Queue *queue = get_queue_ptr(self);
401
    int should_block = queue_pop_should_block(argc, argv);
402
    return queue_do_pop(queue, (VALUE)should_block);
403
}
404

  
405
static inline unsigned long
406
queue_length(Queue *queue)
407
{
408
    return (unsigned long)RARRAY_LEN(queue->que);
409
}
410

  
411
static inline unsigned long
412
queue_num_waiting(Queue *queue)
413
{
414
    return (unsigned long)RARRAY_LEN(queue->waiting);
415
}
416

  
417
/*
418
 * Document-method: empty?
419
 * call-seq: empty?
420
 *
421
 * Returns +true+ if the queue is empty.
422
 */
423

  
424
static VALUE
425
rb_queue_empty_p(VALUE self)
426
{
427
    return queue_length(get_queue_ptr(self)) == 0 ? Qtrue : Qfalse;
428
}
429

  
430
/*
431
 * Document-method: clear
432
 * call-seq: clear
433
 *
434
 * Removes all objects from the queue.
435
 */
436

  
437
static VALUE
438
rb_queue_clear(VALUE self)
439
{
440
    Queue *queue = get_queue_ptr(self);
441

  
442
    rb_ary_clear(queue->que);
443

  
444
    return self;
445
}
446

  
447
/*
448
 * Document-method: length
449
 * call-seq: length
450
 *
451
 * Returns the length of the queue.
452
 */
453

  
454
static VALUE
455
rb_queue_length(VALUE self)
456
{
457
    unsigned long len = queue_length(get_queue_ptr(self));
458
    return ULONG2NUM(len);
459
}
460

  
461
/*
462
 * Document-method: num_waiting
463
 * call-seq: num_waiting
464
 *
465
 * Returns the number of threads waiting on the queue.
466
 */
467

  
468
static VALUE
469
rb_queue_num_waiting(VALUE self)
470
{
471
    long len = queue_num_waiting(get_queue_ptr(self));
472
    return ULONG2NUM(len);
473
}
474

  
475
/*
476
 *  Document-class: SizedQueue
477
 *
478
 * This class represents queues of specified size capacity.  The push operation
479
 * may be blocked if the capacity is full.
480
 *
481
 * See Queue for an example of how a SizedQueue works.
482
 */
483

  
484
typedef struct  {
485
    Queue queue_;
486
    VALUE queue_wait;
487
    unsigned long max;
488
} SizedQueue;
489

  
490
static void
491
szqueue_mark(void *ptr)
492
{
493
    SizedQueue *szqueue = ptr;
494
    queue_mark(&szqueue->queue_);
495
    rb_gc_mark(szqueue->queue_wait);
496
}
497

  
498
#define szqueue_free queue_free
499

  
500
static size_t
501
szqueue_memsize(const void *ptr)
502
{
503
    size_t size = 0;
504
    if (ptr) {
505
	const SizedQueue *szqueue = ptr;
506
	size = sizeof(SizedQueue) - sizeof(Queue);
507
	size += queue_memsize(&szqueue->queue_);
508
	size += rb_ary_memsize(szqueue->queue_wait);
509
    }
510
    return size;
511
}
512

  
513
static const rb_data_type_t szqueue_data_type = {
514
    "sized_queue",
515
    {szqueue_mark, szqueue_free, szqueue_memsize,},
516
    &queue_data_type,
517
};
518

  
519
#define GetSizedQueuePtr(obj, tobj) \
520
    TypedData_Get_Struct(obj, SizedQueue, &szqueue_data_type, tobj)
521

  
522
static SizedQueue *
523
get_szqueue_ptr(VALUE self)
524
{
525
    SizedQueue *szqueue;
526
    GetSizedQueuePtr(self, szqueue);
527
    if (!szqueue->queue_.que || !szqueue->queue_.waiting || !szqueue->queue_wait) {
528
	rb_raise(rb_eArgError, "uninitialized Queue");
529
    }
530
    return szqueue;
531
}
532

  
533
static VALUE
534
szqueue_alloc(VALUE klass)
535
{
536
    SizedQueue *szqueue;
537
    return TypedData_Make_Struct(klass, SizedQueue, &szqueue_data_type, szqueue);
538
}
539

  
540
/*
541
 * Document-method: new
542
 * call-seq: new(max)
543
 *
544
 * Creates a fixed-length queue with a maximum size of +max+.
545
 */
546

  
547
static VALUE
548
rb_szqueue_initialize(VALUE self, VALUE vmax)
549
{
550
    long max;
551
    SizedQueue *szqueue;
552
    GetSizedQueuePtr(self, szqueue);
553

  
554
    max = NUM2LONG(vmax);
555
    if (max <= 0) {
556
	rb_raise(rb_eArgError, "queue size must be positive");
557
    }
558
    queue_initialize(&szqueue->queue_);
559
    szqueue->queue_wait = rb_ary_buf_new();
560
    szqueue->max = (unsigned long)max;
561

  
562
    return self;
563
}
564

  
565
/*
566
 * Document-method: max
567
 * call-seq: max
568
 *
569
 * Returns the maximum size of the queue.
570
 */
571

  
572
static VALUE
573
rb_szqueue_max_get(VALUE self)
574
{
575
    unsigned long max = get_szqueue_ptr(self)->max;
576
    return ULONG2NUM(max);
577
}
578

  
579
/*
580
 * Document-method: max=
581
 * call-seq: max=(n)
582
 *
583
 * Sets the maximum size of the queue.
584
 */
585

  
586
static VALUE
587
rb_szqueue_max_set(VALUE self, VALUE vmax)
588
{
589
    SizedQueue *szqueue = get_szqueue_ptr(self);
590
    long max = NUM2LONG(vmax), diff = 0;
591
    VALUE t;
592

  
593
    if (max <= 0) {
594
	rb_raise(rb_eArgError, "queue size must be positive");
595
    }
596
    if ((unsigned long)max > szqueue->max) {
597
	diff = max - szqueue->max;
598
    }
599
    szqueue->max = max;
600
    while (diff > 0 && !NIL_P(t = rb_ary_shift(szqueue->queue_wait))) {
601
	rb_thread_wakeup_alive(t);
602
    }
603
    return vmax;
604
}
605

  
606
static VALUE
607
szqueue_do_push(SizedQueue *szqueue, VALUE obj)
608
{
609
    struct waiting_delete args;
610
    VALUE thread;
611

  
612
    while (queue_length(&szqueue->queue_) >= szqueue->max) {
613
	args.waiting = szqueue->queue_wait;
614
	args.th      = rb_thread_current();
615
	rb_ary_push(args.waiting, args.th);
616
	rb_ensure((VALUE (*)())rb_thread_sleep_forever, (VALUE)0, queue_delete_from_waiting, (VALUE)&args);
617
    }
618
    return queue_do_push(&szqueue->queue_, obj);
619
}
620

  
621
/*
622
 * Document-method: push
623
 * call-seq: push(obj)
624
 *
625
 * Pushes +obj+ to the queue.  If there is no space left in the queue, waits
626
 * until space becomes available.
627
 */
628

  
629
static VALUE
630
rb_szqueue_push(VALUE self, VALUE obj)
631
{
632
    szqueue_do_push(get_szqueue_ptr(self), obj);
633
    return self;
634
}
635

  
636
static VALUE
637
szqueue_do_pop(SizedQueue *szqueue, VALUE should_block)
638
{
639
    VALUE retval = queue_do_pop(&szqueue->queue_, should_block);
640

  
641
    if (queue_length(&szqueue->queue_) < szqueue->max) {
642
	wakeup_first_thread(szqueue->queue_wait);
643
    }
644

  
645
    return retval;
646
}
647

  
648
/*
649
 * Document-method: pop
650
 * call_seq: pop(non_block=false)
651
 *
652
 * Returns the number of threads waiting on the queue.
653
 */
654

  
655
static VALUE
656
rb_szqueue_pop(int argc, VALUE *argv, VALUE self)
657
{
658
    SizedQueue *szqueue = get_szqueue_ptr(self);
659
    int should_block = queue_pop_should_block(argc, argv);
660
    return szqueue_do_pop(szqueue, (VALUE)should_block);
661
}
662

  
663
/*
664
 * Document-method: pop
665
 * call_seq: pop(non_block=false)
666
 *
667
 * Returns the number of threads waiting on the queue.
668
 */
669

  
670
static VALUE
671
rb_szqueue_num_waiting(VALUE self)
672
{
673
    SizedQueue *szqueue = get_szqueue_ptr(self);
674
    long len = queue_num_waiting(&szqueue->queue_);
675
    len += RARRAY_LEN(szqueue->queue_wait);
676
    return ULONG2NUM(len);
677
}
678

  
679
#ifndef UNDER_THREAD
680
#define UNDER_THREAD 1
681
#endif
682

  
683
void
684
Init_thread(void)
685
{
686
#if UNDER_THREAD
687
#define DEFINE_CLASS_UNDER_THREAD(name, super) rb_define_class_under(rb_cThread, #name, super)
688
#define ALIAS_GLOBCAL_CONST(name) do {		      \
689
	ID id = rb_intern_const(#name);		      \
690
	if (!rb_const_defined_at(rb_cObject, id)) {     \
691
	    rb_const_set(rb_cObject, id, rb_c##name);   \
692
	}						      \
693
    } while (0)
694
#else
695
#define DEFINE_CLASS_UNDER_THREAD(name, super) rb_define_class(name, super)
696
#define ALIAS_GLOBCAL_CONST(name) do { /* nothing */ } while (0)
697
#endif
698
    VALUE rb_cConditionVariable = DEFINE_CLASS_UNDER_THREAD(ConditionVariable, rb_cObject);
699
    VALUE rb_cQueue = DEFINE_CLASS_UNDER_THREAD(Queue, rb_cObject);
700
    VALUE rb_cSizedQueue = DEFINE_CLASS_UNDER_THREAD(SizedQueue, rb_cQueue);
701

  
702
    rb_define_alloc_func(rb_cConditionVariable, condvar_alloc);
703
    rb_define_method(rb_cConditionVariable, "initialize", rb_condvar_initialize, 0);
704
    rb_define_method(rb_cConditionVariable, "wait", rb_condvar_wait, -1);
705
    rb_define_method(rb_cConditionVariable, "signal", rb_condvar_signal, 0);
706
    rb_define_method(rb_cConditionVariable, "broadcast", rb_condvar_broadcast, 0);
707

  
708
    rb_define_alloc_func(rb_cQueue, queue_alloc);
709
    rb_define_method(rb_cQueue, "initialize", rb_queue_initialize, 0);
710
    rb_define_method(rb_cQueue, "push", rb_queue_push, 1);
711
    rb_define_method(rb_cQueue, "pop", rb_queue_pop, -1);
712
    rb_define_method(rb_cQueue, "empty?", rb_queue_empty_p, 0);
713
    rb_define_method(rb_cQueue, "clear", rb_queue_clear, 0);
714
    rb_define_method(rb_cQueue, "length", rb_queue_length, 0);
715
    rb_define_method(rb_cQueue, "num_waiting", rb_queue_num_waiting, 0);
716
    rb_alias(rb_cQueue, rb_intern("enq"), rb_intern("push"));
717
    rb_alias(rb_cQueue, rb_intern("<<"), rb_intern("push"));
718
    rb_alias(rb_cQueue, rb_intern("deq"), rb_intern("pop"));
719
    rb_alias(rb_cQueue, rb_intern("shift"), rb_intern("pop"));
720
    rb_alias(rb_cQueue, rb_intern("size"), rb_intern("length"));
721

  
722
    rb_define_alloc_func(rb_cSizedQueue, szqueue_alloc);
723
    rb_define_method(rb_cSizedQueue, "initialize", rb_szqueue_initialize, 1);
724
    rb_define_method(rb_cSizedQueue, "max", rb_szqueue_max_get, 0);
725
    rb_define_method(rb_cSizedQueue, "max=", rb_szqueue_max_set, 1);
726
    rb_define_method(rb_cSizedQueue, "push", rb_szqueue_push, 1);
727
    rb_define_method(rb_cSizedQueue, "pop", rb_szqueue_pop, -1);
728
    rb_define_method(rb_cSizedQueue, "num_waiting", rb_szqueue_num_waiting, 0);
729
    rb_alias(rb_cSizedQueue, rb_intern("enq"), rb_intern("push"));
730
    rb_alias(rb_cSizedQueue, rb_intern("<<"), rb_intern("push"));
731
    rb_alias(rb_cSizedQueue, rb_intern("deq"), rb_intern("pop"));
732
    rb_alias(rb_cSizedQueue, rb_intern("shift"), rb_intern("pop"));
733

  
734
    rb_provide("thread.rb");
735
    ALIAS_GLOBCAL_CONST(ConditionVariable);
736
    ALIAS_GLOBCAL_CONST(Queue);
737
    ALIAS_GLOBCAL_CONST(SizedQueue);
738
}
lib/thread.rb
1
#
2
#               thread.rb - thread support classes
3
#                       by Yukihiro Matsumoto <matz@netlab.co.jp>
4
#
5
# Copyright (C) 2001  Yukihiro Matsumoto
6
# Copyright (C) 2000  Network Applied Communication Laboratory, Inc.
7
# Copyright (C) 2000  Information-technology Promotion Agency, Japan
8
#
9

  
10
unless defined? Thread
11
  raise "Thread not available for this ruby interpreter"
12
end
13

  
14
unless defined? ThreadError
15
  class ThreadError < StandardError
16
  end
17
end
18

  
19
if $DEBUG
20
  Thread.abort_on_exception = true
21
end
22

  
23
#
24
# ConditionVariable objects augment class Mutex. Using condition variables,
25
# it is possible to suspend while in the middle of a critical section until a
26
# resource becomes available.
27
#
28
# Example:
29
#
30
#   require 'thread'
31
#
32
#   mutex = Mutex.new
33
#   resource = ConditionVariable.new
34
#
35
#   a = Thread.new {
36
#     mutex.synchronize {
37
#       # Thread 'a' now needs the resource
38
#       resource.wait(mutex)
39
#       # 'a' can now have the resource
40
#     }
41
#   }
42
#
43
#   b = Thread.new {
44
#     mutex.synchronize {
45
#       # Thread 'b' has finished using the resource
46
#       resource.signal
47
#     }
48
#   }
49
#
50
class ConditionVariable
51
  #
52
  # Creates a new ConditionVariable
53
  #
54
  def initialize
55
    @waiters = {}
56
    @waiters_mutex = Mutex.new
57
  end
58

  
59
  #
60
  # Releases the lock held in +mutex+ and waits; reacquires the lock on wakeup.
61
  #
62
  # If +timeout+ is given, this method returns after +timeout+ seconds passed,
63
  # even if no other thread doesn't signal.
64
  #
65
  def wait(mutex, timeout=nil)
66
    Thread.handle_interrupt(StandardError => :never) do
67
      begin
68
        Thread.handle_interrupt(StandardError => :on_blocking) do
69
          @waiters_mutex.synchronize do
70
            @waiters[Thread.current] = true
71
          end
72
          mutex.sleep timeout
73
        end
74
      ensure
75
        @waiters_mutex.synchronize do
76
          @waiters.delete(Thread.current)
77
        end
78
      end
79
    end
80
    self
81
  end
82

  
83
  #
84
  # Wakes up the first thread in line waiting for this lock.
85
  #
86
  def signal
87
    Thread.handle_interrupt(StandardError => :on_blocking) do
88
      begin
89
        t, _ = @waiters_mutex.synchronize { @waiters.shift }
90
        t.run if t
91
      rescue ThreadError
92
        retry # t was already dead?
93
      end
94
    end
95
    self
96
  end
97

  
98
  #
99
  # Wakes up all threads waiting for this lock.
100
  #
101
  def broadcast
102
    Thread.handle_interrupt(StandardError => :on_blocking) do
103
      threads = nil
104
      @waiters_mutex.synchronize do
105
        threads = @waiters.keys
106
        @waiters.clear
107
      end
108
      for t in threads
109
        begin
110
          t.run
111
        rescue ThreadError
112
        end
113
      end
114
    end
115
    self
116
  end
117
end
118

  
119
#
120
# This class provides a way to synchronize communication between threads.
121
#
122
# Example:
123
#
124
#   require 'thread'
125
#
126
#   queue = Queue.new
127
#
128
#   producer = Thread.new do
129
#     5.times do |i|
130
#       sleep rand(i) # simulate expense
131
#       queue << i
132
#       puts "#{i} produced"
133
#     end
134
#   end
135
#
136
#   consumer = Thread.new do
137
#     5.times do |i|
138
#       value = queue.pop
139
#       sleep rand(i/2) # simulate expense
140
#       puts "consumed #{value}"
141
#     end
142
#   end
143
#
144
#   consumer.join
145
#
146
class Queue
147
  #
148
  # Creates a new queue.
149
  #
150
  def initialize
151
    @que = []
152
    @que.taint          # enable tainted communication
153
    @num_waiting = 0
154
    self.taint
155
    @mutex = Mutex.new
156
    @cond = ConditionVariable.new
157
  end
158

  
159
  #
160
  # Pushes +obj+ to the queue.
161
  #
162
  def push(obj)
163
    Thread.handle_interrupt(StandardError => :on_blocking) do
164
      @mutex.synchronize do
165
        @que.push obj
166
        @cond.signal
167
      end
168
      self
169
    end
170
  end
171

  
172
  #
173
  # Alias of push
174
  #
175
  alias << push
176

  
177
  #
178
  # Alias of push
179
  #
180
  alias enq push
181

  
182
  #
183
  # Retrieves data from the queue.  If the queue is empty, the calling thread is
184
  # suspended until data is pushed onto the queue.  If +non_block+ is true, the
185
  # thread isn't suspended, and an exception is raised.
186
  #
187
  def pop(non_block=false)
188
    Thread.handle_interrupt(StandardError => :on_blocking) do
189
      @mutex.synchronize do
190
        while true
191
          if @que.empty?
192
            if non_block
193
              raise ThreadError, "queue empty"
194
            else
195
              begin
196
                @num_waiting += 1
197
                @cond.wait @mutex
198
              ensure
199
                @num_waiting -= 1
200
              end
201
            end
202
          else
203
            return @que.shift
204
          end
205
        end
206
      end
207
    end
208
  end
209

  
210
  #
211
  # Alias of pop
212
  #
213
  alias shift pop
214

  
215
  #
216
  # Alias of pop
217
  #
218
  alias deq pop
219

  
220
  #
221
  # Returns +true+ if the queue is empty.
222
  #
223
  def empty?
224
    @que.empty?
225
  end
226

  
227
  #
228
  # Removes all objects from the queue.
229
  #
230
  def clear
231
    @que.clear
232
    self
233
  end
234

  
235
  #
236
  # Returns the length of the queue.
237
  #
238
  def length
239
    @que.length
240
  end
241

  
242
  #
243
  # Alias of length.
244
  #
245
  alias size length
246

  
247
  #
248
  # Returns the number of threads waiting on the queue.
249
  #
250
  def num_waiting
251
    @num_waiting
252
  end
253
end
254

  
255
#
256
# This class represents queues of specified size capacity.  The push operation
257
# may be blocked if the capacity is full.
258
#
259
# See Queue for an example of how a SizedQueue works.
260
#
261
class SizedQueue < Queue
262
  #
263
  # Creates a fixed-length queue with a maximum size of +max+.
264
  #
265
  def initialize(max)
266
    raise ArgumentError, "queue size must be positive" unless max > 0
267
    @max = max
268
    @enque_cond = ConditionVariable.new
269
    @num_enqueue_waiting = 0
270
    super()
271
  end
272

  
273
  #
274
  # Returns the maximum size of the queue.
275
  #
276
  def max
277
    @max
278
  end
279

  
280
  #
281
  # Sets the maximum size of the queue.
282
  #
283
  def max=(max)
284
    raise ArgumentError, "queue size must be positive" unless max > 0
285

  
286
    @mutex.synchronize do
287
      if max <= @max
288
        @max = max
289
      else
290
        diff = max - @max
291
        @max = max
292
        diff.times do
293
          @enque_cond.signal
294
        end
295
      end
296
    end
297
    max
298
  end
299

  
300
  #
301
  # Pushes +obj+ to the queue.  If there is no space left in the queue, waits
302
  # until space becomes available.
303
  #
304
  def push(obj)
305
    Thread.handle_interrupt(RuntimeError => :on_blocking) do
306
      @mutex.synchronize do
307
        while true
308
          break if @que.length < @max
309
          @num_enqueue_waiting += 1
310
          begin
311
            @enque_cond.wait @mutex
312
          ensure
313
            @num_enqueue_waiting -= 1
314
          end
315
        end
316

  
317
        @que.push obj
318
        @cond.signal
319
      end
320
      self
321
    end
322
  end
323

  
324
  #
325
  # Alias of push
326
  #
327
  alias << push
328

  
329
  #
330
  # Alias of push
331
  #
332
  alias enq push
333

  
334
  #
335
  # Retrieves data from the queue and runs a waiting thread, if any.
336
  #
337
  def pop(*args)
338
    retval = super
339
    @mutex.synchronize do
340
      if @que.length < @max
341
        @enque_cond.signal
342
      end
343
    end
344
    retval
345
  end
346

  
347
  #
348
  # Alias of pop
349
  #
350
  alias shift pop
351

  
352
  #
353
  # Alias of pop
354
  #
355
  alias deq pop
356

  
357
  #
358
  # Returns the number of threads waiting on the queue.
359
  #
360
  def num_waiting
361
    @num_waiting + @num_enqueue_waiting
362
  end
363
end
364

  
365
# Documentation comments:
366
#  - How do you make RDoc inherit documentation from superclass?