patch2.diff

using rb_funcall2() - Masaki Matsushita, 05/22/2013 09:07 PM

Download (24.4 KB)

View differences:

ext/thread/extconf.rb
1
require 'mkmf'
2

  
3
create_makefile('thread')
ext/thread/thread.c
1
#include <ruby.h>
2

  
3
RUBY_EXTERN size_t rb_objspace_data_type_memsize(VALUE);
4
RUBY_EXTERN size_t rb_ary_memsize(VALUE);
5

  
6
static VALUE
7
rb_ary_buf_new(void)
8
{
9
    VALUE ary = rb_ary_tmp_new(1);
10
    OBJ_UNTRUST(ary);
11
    return ary;
12
}
13

  
14
static void
15
wakeup_first_thread(VALUE list)
16
{
17
    VALUE thread;
18

  
19
    while (!NIL_P(thread = rb_ary_shift(list))) {
20
	if (RTEST(rb_thread_wakeup_alive(thread))) break;
21
    }
22
}
23

  
24
static void
25
wakeup_all_threads(VALUE list)
26
{
27
    VALUE thread, list0 = list;
28
    long i;
29

  
30
    list = rb_ary_subseq(list, 0, LONG_MAX);
31
    rb_ary_clear(list0);
32
    for (i = 0; i < RARRAY_LEN(list); ++i) {
33
	thread = RARRAY_PTR(list)[i];
34
	rb_thread_wakeup_alive(thread);
35
    }
36
    RB_GC_GUARD(list);
37
}
38

  
39
/*
40
 *  Document-class: ConditionVariable
41
 *
42
 *  ConditionVariable objects augment class Mutex. Using condition variables,
43
 *  it is possible to suspend while in the middle of a critical section until a
44
 *  resource becomes available.
45
 *
46
 *  Example:
47
 *
48
 *    require 'thread'
49
 *
50
 *    mutex = Mutex.new
51
 *    resource = ConditionVariable.new
52
 *
53
 *    a = Thread.new {
54
 *	 mutex.synchronize {
55
 *	   # Thread 'a' now needs the resource
56
 *	   resource.wait(mutex)
57
 *	   # 'a' can now have the resource
58
 *	 }
59
 *    }
60
 *
61
 *    b = Thread.new {
62
 *	 mutex.synchronize {
63
 *	   # Thread 'b' has finished using the resource
64
 *	   resource.signal
65
 *	 }
66
 *    }
67
 */
68

  
69
typedef struct {
70
    VALUE waiters;
71
} CondVar;
72

  
73
static void
74
condvar_mark(void *ptr)
75
{
76
    CondVar *condvar = ptr;
77
    rb_gc_mark(condvar->waiters);
78
}
79

  
80
#define condvar_free RUBY_TYPED_DEFAULT_FREE
81

  
82
static size_t
83
condvar_memsize(const void *ptr)
84
{
85
    size_t size = 0;
86
    if (ptr) {
87
	const CondVar *condvar = ptr;
88
	size = sizeof(CondVar);
89
	size += rb_ary_memsize(condvar->waiters);
90
    }
91
    return size;
92
}
93

  
94
static const rb_data_type_t condvar_data_type = {
95
    "condvar",
96
    {condvar_mark, condvar_free, condvar_memsize,},
97
};
98

  
99
#define GetCondVarPtr(obj, tobj) \
100
    TypedData_Get_Struct(obj, CondVar, &condvar_data_type, tobj)
101

  
102
static CondVar *
103
get_condvar_ptr(VALUE self)
104
{
105
    CondVar *condvar;
106
    GetCondVarPtr(self, condvar);
107
    if (!condvar->waiters) {
108
	rb_raise(rb_eArgError, "uninitialized CondionVariable");
109
    }
110
    return condvar;
111
}
112

  
113
static VALUE
114
condvar_alloc(VALUE klass)
115
{
116
    CondVar *condvar;
117
    return TypedData_Make_Struct(klass, CondVar, &condvar_data_type, condvar);
118
}
119

  
120
static void
121
condvar_initialize(CondVar *condvar)
122
{
123
    condvar->waiters = rb_ary_buf_new();
124
}
125

  
126
/*
127
 * Document-method: new
128
 * call-seq: new
129
 *
130
 * Creates a new condvar.
131
 */
132

  
133
static VALUE
134
rb_condvar_initialize(VALUE self)
135
{
136
    CondVar *condvar;
137
    GetCondVarPtr(self, condvar);
138

  
139
    condvar_initialize(condvar);
140

  
141
    return self;
142
}
143

  
144
struct sleep_call {
145
    VALUE mutex;
146
    VALUE timeout;
147
};
148

  
149
static ID id_sleep;
150

  
151
static VALUE
152
do_sleep(VALUE args)
153
{
154
    struct sleep_call *p = (struct sleep_call *)args;
155
    return rb_funcall2(p->mutex, id_sleep, 1, &p->timeout);
156
}
157

  
158
static VALUE
159
delete_current_thread(VALUE ary)
160
{
161
    return rb_ary_delete(ary, rb_thread_current());
162
}
163

  
164
/*
165
 * Document-method: wait
166
 * call-seq: wait(mutex, timeout=nil)
167
 *
168
 * Releases the lock held in +mutex+ and waits; reacquires the lock on wakeup.
169
 *
170
 * If +timeout+ is given, this method returns after +timeout+ seconds passed,
171
 * even if no other thread doesn't signal.
172
 */
173

  
174
static VALUE
175
rb_condvar_wait(int argc, VALUE *argv, VALUE self)
176
{
177
    VALUE waiters = get_condvar_ptr(self)->waiters;
178
    VALUE mutex, timeout;
179
    struct sleep_call args;
180

  
181
    rb_scan_args(argc, argv, "11", &mutex, &timeout);
182

  
183
    args.mutex   = mutex;
184
    args.timeout = timeout;
185
    rb_ary_push(waiters, rb_thread_current());
186
    rb_ensure(do_sleep, (VALUE)&args, delete_current_thread, waiters);
187

  
188
    return self;
189
}
190

  
191
/*
192
 * Document-method: signal
193
 * call-seq: signal
194
 *
195
 * Wakes up the first thread in line waiting for this lock.
196
 */
197

  
198
static VALUE
199
rb_condvar_signal(VALUE self)
200
{
201
    wakeup_first_thread(get_condvar_ptr(self)->waiters);
202
    return self;
203
}
204

  
205
/*
206
 * Document-method: broadcast
207
 * call-seq: broadcast
208
 *
209
 * Wakes up all threads waiting for this lock.
210
 */
211

  
212
static VALUE
213
rb_condvar_broadcast(VALUE self)
214
{
215
    wakeup_all_threads(get_condvar_ptr(self)->waiters);
216
    return self;
217
}
218

  
219
/*
220
 *  Document-class: Queue
221
 *
222
 *  This class provides a way to synchronize communication between threads.
223
 *
224
 *  Example:
225
 *
226
 *    require 'thread'
227
 *    queue = Queue.new
228
 *
229
 *  producer = Thread.new do
230
 *    5.times do |i|
231
 *	 sleep rand(i) # simulate expense
232
 *	 queue << i
233
 *	 puts "#{i} produced"
234
 *    end
235
 *  end
236
 *
237
 *  consumer = Thread.new do
238
 *    5.times do |i|
239
 *	 value = queue.pop
240
 *	 sleep rand(i/2) # simulate expense
241
 *	 puts "consumed #{value}"
242
 *    end
243
 *  end
244
 *
245
 */
246

  
247
typedef struct {
248
    VALUE que;
249
    VALUE waiting;
250
} Queue;
251

  
252
static void
253
queue_mark(void *ptr)
254
{
255
    Queue *queue = ptr;
256
    rb_gc_mark(queue->que);
257
    rb_gc_mark(queue->waiting);
258
}
259

  
260
#define queue_free RUBY_TYPED_DEFAULT_FREE
261

  
262
static size_t
263
queue_memsize(const void *ptr)
264
{
265
    size_t size = 0;
266
    if (ptr) {
267
	const Queue *queue = ptr;
268
	size = sizeof(Queue);
269
	size += rb_ary_memsize(queue->que);
270
	size += rb_ary_memsize(queue->waiting);
271
    }
272
    return size;
273
}
274

  
275
static const rb_data_type_t queue_data_type = {
276
    "queue",
277
    {queue_mark, queue_free, queue_memsize,},
278
};
279

  
280
#define GetQueuePtr(obj, tobj) \
281
    TypedData_Get_Struct(obj, Queue, &queue_data_type, tobj)
282

  
283
static Queue *
284
get_queue_ptr(VALUE self)
285
{
286
    Queue *queue;
287
    GetQueuePtr(self, queue);
288
    if (!queue->que || !queue->waiting) {
289
	rb_raise(rb_eArgError, "uninitialized Queue");
290
    }
291
    return queue;
292
}
293

  
294
static VALUE
295
queue_alloc(VALUE klass)
296
{
297
    Queue *queue;
298
    return TypedData_Make_Struct(klass, Queue, &queue_data_type, queue);
299
}
300

  
301
static void
302
queue_initialize(Queue *queue)
303
{
304
    queue->que = rb_ary_buf_new();
305
    queue->waiting = rb_ary_buf_new();
306
}
307

  
308
/*
309
 * Document-method: new
310
 * call-seq: new
311
 *
312
 * Creates a new queue.
313
 */
314

  
315
static VALUE
316
rb_queue_initialize(VALUE self)
317
{
318
    Queue *queue;
319
    GetQueuePtr(self, queue);
320

  
321
    queue_initialize(queue);
322

  
323
    return self;
324
}
325

  
326
static VALUE
327
queue_do_push(Queue *queue, VALUE obj)
328
{
329
    rb_ary_push(queue->que, obj);
330
    wakeup_first_thread(queue->waiting);
331
    return Qnil;
332
}
333

  
334
/*
335
 * Document-method: push
336
 * call-seq: push(obj)
337
 *
338
 * Pushes +obj+ to the queue.
339
 */
340

  
341
static VALUE
342
rb_queue_push(VALUE self, VALUE obj)
343
{
344
    queue_do_push(get_queue_ptr(self), obj);
345
    return self;
346
}
347

  
348
struct waiting_delete {
349
    VALUE waiting;
350
    VALUE th;
351
};
352

  
353
static VALUE
354
queue_delete_from_waiting(struct waiting_delete *p)
355
{
356
    rb_ary_delete(p->waiting, p->th);
357
    return Qnil;
358
}
359

  
360
static VALUE
361
queue_do_pop(Queue *queue, VALUE should_block)
362
{
363
    struct waiting_delete args;
364

  
365
    while (RARRAY_LEN(queue->que) == 0) {
366
	if (!(int)should_block) {
367
	    rb_raise(rb_eThreadError, "queue empty");
368
	}
369
	args.waiting = queue->waiting;
370
	args.th	     = rb_thread_current();
371
	rb_ary_push(args.waiting, args.th);
372
	rb_ensure((VALUE (*)())rb_thread_sleep_forever, (VALUE)0, queue_delete_from_waiting, (VALUE)&args);
373
    }
374

  
375
    return rb_ary_shift(queue->que);
376
}
377

  
378
static int
379
queue_pop_should_block(int argc, VALUE *argv)
380
{
381
    int should_block = 1;
382
    switch (argc) {
383
      case 0:
384
	break;
385
      case 1:
386
	should_block = !RTEST(argv[0]);
387
	break;
388
      default:
389
	rb_raise(rb_eArgError, "wrong number of arguments (%d for 1)", argc);
390
    }
391
    return should_block;
392
}
393

  
394
/*
395
 * Document-method: pop
396
 * call_seq: pop(non_block=false)
397
 *
398
 * Retrieves data from the queue.	If the queue is empty, the calling thread is
399
 * suspended until data is pushed onto the queue.  If +non_block+ is true, the
400
 * thread isn't suspended, and an exception is raised.
401
 */
402

  
403
static VALUE
404
rb_queue_pop(int argc, VALUE *argv, VALUE self)
405
{
406
    Queue *queue = get_queue_ptr(self);
407
    int should_block = queue_pop_should_block(argc, argv);
408
    return queue_do_pop(queue, (VALUE)should_block);
409
}
410

  
411
static inline unsigned long
412
queue_length(Queue *queue)
413
{
414
    return (unsigned long)RARRAY_LEN(queue->que);
415
}
416

  
417
static inline unsigned long
418
queue_num_waiting(Queue *queue)
419
{
420
    return (unsigned long)RARRAY_LEN(queue->waiting);
421
}
422

  
423
/*
424
 * Document-method: empty?
425
 * call-seq: empty?
426
 *
427
 * Returns +true+ if the queue is empty.
428
 */
429

  
430
static VALUE
431
rb_queue_empty_p(VALUE self)
432
{
433
    return queue_length(get_queue_ptr(self)) == 0 ? Qtrue : Qfalse;
434
}
435

  
436
/*
437
 * Document-method: clear
438
 * call-seq: clear
439
 *
440
 * Removes all objects from the queue.
441
 */
442

  
443
static VALUE
444
rb_queue_clear(VALUE self)
445
{
446
    Queue *queue = get_queue_ptr(self);
447

  
448
    rb_ary_clear(queue->que);
449

  
450
    return self;
451
}
452

  
453
/*
454
 * Document-method: length
455
 * call-seq: length
456
 *
457
 * Returns the length of the queue.
458
 */
459

  
460
static VALUE
461
rb_queue_length(VALUE self)
462
{
463
    unsigned long len = queue_length(get_queue_ptr(self));
464
    return ULONG2NUM(len);
465
}
466

  
467
/*
468
 * Document-method: num_waiting
469
 * call-seq: num_waiting
470
 *
471
 * Returns the number of threads waiting on the queue.
472
 */
473

  
474
static VALUE
475
rb_queue_num_waiting(VALUE self)
476
{
477
    long len = queue_num_waiting(get_queue_ptr(self));
478
    return ULONG2NUM(len);
479
}
480

  
481
/*
482
 *  Document-class: SizedQueue
483
 *
484
 * This class represents queues of specified size capacity.  The push operation
485
 * may be blocked if the capacity is full.
486
 *
487
 * See Queue for an example of how a SizedQueue works.
488
 */
489

  
490
typedef struct  {
491
    Queue queue_;
492
    VALUE queue_wait;
493
    unsigned long max;
494
} SizedQueue;
495

  
496
static void
497
szqueue_mark(void *ptr)
498
{
499
    SizedQueue *szqueue = ptr;
500
    queue_mark(&szqueue->queue_);
501
    rb_gc_mark(szqueue->queue_wait);
502
}
503

  
504
#define szqueue_free queue_free
505

  
506
static size_t
507
szqueue_memsize(const void *ptr)
508
{
509
    size_t size = 0;
510
    if (ptr) {
511
	const SizedQueue *szqueue = ptr;
512
	size = sizeof(SizedQueue) - sizeof(Queue);
513
	size += queue_memsize(&szqueue->queue_);
514
	size += rb_ary_memsize(szqueue->queue_wait);
515
    }
516
    return size;
517
}
518

  
519
static const rb_data_type_t szqueue_data_type = {
520
    "sized_queue",
521
    {szqueue_mark, szqueue_free, szqueue_memsize,},
522
    &queue_data_type,
523
};
524

  
525
#define GetSizedQueuePtr(obj, tobj) \
526
    TypedData_Get_Struct(obj, SizedQueue, &szqueue_data_type, tobj)
527

  
528
static SizedQueue *
529
get_szqueue_ptr(VALUE self)
530
{
531
    SizedQueue *szqueue;
532
    GetSizedQueuePtr(self, szqueue);
533
    if (!szqueue->queue_.que || !szqueue->queue_.waiting || !szqueue->queue_wait) {
534
	rb_raise(rb_eArgError, "uninitialized Queue");
535
    }
536
    return szqueue;
537
}
538

  
539
static VALUE
540
szqueue_alloc(VALUE klass)
541
{
542
    SizedQueue *szqueue;
543
    return TypedData_Make_Struct(klass, SizedQueue, &szqueue_data_type, szqueue);
544
}
545

  
546
/*
547
 * Document-method: new
548
 * call-seq: new(max)
549
 *
550
 * Creates a fixed-length queue with a maximum size of +max+.
551
 */
552

  
553
static VALUE
554
rb_szqueue_initialize(VALUE self, VALUE vmax)
555
{
556
    long max;
557
    SizedQueue *szqueue;
558
    GetSizedQueuePtr(self, szqueue);
559

  
560
    max = NUM2LONG(vmax);
561
    if (max <= 0) {
562
	rb_raise(rb_eArgError, "queue size must be positive");
563
    }
564
    queue_initialize(&szqueue->queue_);
565
    szqueue->queue_wait = rb_ary_buf_new();
566
    szqueue->max = (unsigned long)max;
567

  
568
    return self;
569
}
570

  
571
/*
572
 * Document-method: max
573
 * call-seq: max
574
 *
575
 * Returns the maximum size of the queue.
576
 */
577

  
578
static VALUE
579
rb_szqueue_max_get(VALUE self)
580
{
581
    unsigned long max = get_szqueue_ptr(self)->max;
582
    return ULONG2NUM(max);
583
}
584

  
585
/*
586
 * Document-method: max=
587
 * call-seq: max=(n)
588
 *
589
 * Sets the maximum size of the queue.
590
 */
591

  
592
static VALUE
593
rb_szqueue_max_set(VALUE self, VALUE vmax)
594
{
595
    SizedQueue *szqueue = get_szqueue_ptr(self);
596
    long max = NUM2LONG(vmax), diff = 0;
597
    VALUE t;
598

  
599
    if (max <= 0) {
600
	rb_raise(rb_eArgError, "queue size must be positive");
601
    }
602
    if ((unsigned long)max > szqueue->max) {
603
	diff = max - szqueue->max;
604
    }
605
    szqueue->max = max;
606
    while (diff > 0 && !NIL_P(t = rb_ary_shift(szqueue->queue_wait))) {
607
	rb_thread_wakeup_alive(t);
608
    }
609
    return vmax;
610
}
611

  
612
static VALUE
613
szqueue_do_push(SizedQueue *szqueue, VALUE obj)
614
{
615
    struct waiting_delete args;
616

  
617
    while (queue_length(&szqueue->queue_) >= szqueue->max) {
618
	args.waiting = szqueue->queue_wait;
619
	args.th      = rb_thread_current();
620
	rb_ary_push(args.waiting, args.th);
621
	rb_ensure((VALUE (*)())rb_thread_sleep_forever, (VALUE)0, queue_delete_from_waiting, (VALUE)&args);
622
    }
623
    return queue_do_push(&szqueue->queue_, obj);
624
}
625

  
626
/*
627
 * Document-method: push
628
 * call-seq: push(obj)
629
 *
630
 * Pushes +obj+ to the queue.  If there is no space left in the queue, waits
631
 * until space becomes available.
632
 */
633

  
634
static VALUE
635
rb_szqueue_push(VALUE self, VALUE obj)
636
{
637
    szqueue_do_push(get_szqueue_ptr(self), obj);
638
    return self;
639
}
640

  
641
static VALUE
642
szqueue_do_pop(SizedQueue *szqueue, VALUE should_block)
643
{
644
    VALUE retval = queue_do_pop(&szqueue->queue_, should_block);
645

  
646
    if (queue_length(&szqueue->queue_) < szqueue->max) {
647
	wakeup_first_thread(szqueue->queue_wait);
648
    }
649

  
650
    return retval;
651
}
652

  
653
/*
654
 * Document-method: pop
655
 * call_seq: pop(non_block=false)
656
 *
657
 * Returns the number of threads waiting on the queue.
658
 */
659

  
660
static VALUE
661
rb_szqueue_pop(int argc, VALUE *argv, VALUE self)
662
{
663
    SizedQueue *szqueue = get_szqueue_ptr(self);
664
    int should_block = queue_pop_should_block(argc, argv);
665
    return szqueue_do_pop(szqueue, (VALUE)should_block);
666
}
667

  
668
/*
669
 * Document-method: pop
670
 * call_seq: pop(non_block=false)
671
 *
672
 * Returns the number of threads waiting on the queue.
673
 */
674

  
675
static VALUE
676
rb_szqueue_num_waiting(VALUE self)
677
{
678
    SizedQueue *szqueue = get_szqueue_ptr(self);
679
    long len = queue_num_waiting(&szqueue->queue_);
680
    len += RARRAY_LEN(szqueue->queue_wait);
681
    return ULONG2NUM(len);
682
}
683

  
684
#ifndef UNDER_THREAD
685
#define UNDER_THREAD 1
686
#endif
687

  
688
void
689
Init_thread(void)
690
{
691
#if UNDER_THREAD
692
#define DEFINE_CLASS_UNDER_THREAD(name, super) rb_define_class_under(rb_cThread, #name, super)
693
#define ALIAS_GLOBCAL_CONST(name) do {		      \
694
	ID id = rb_intern_const(#name);		      \
695
	if (!rb_const_defined_at(rb_cObject, id)) {     \
696
	    rb_const_set(rb_cObject, id, rb_c##name);   \
697
	}						      \
698
    } while (0)
699
#else
700
#define DEFINE_CLASS_UNDER_THREAD(name, super) rb_define_class(name, super)
701
#define ALIAS_GLOBCAL_CONST(name) do { /* nothing */ } while (0)
702
#endif
703
    VALUE rb_cConditionVariable = DEFINE_CLASS_UNDER_THREAD(ConditionVariable, rb_cObject);
704
    VALUE rb_cQueue = DEFINE_CLASS_UNDER_THREAD(Queue, rb_cObject);
705
    VALUE rb_cSizedQueue = DEFINE_CLASS_UNDER_THREAD(SizedQueue, rb_cQueue);
706

  
707
    id_sleep = rb_intern("sleep");
708

  
709
    rb_define_alloc_func(rb_cConditionVariable, condvar_alloc);
710
    rb_define_method(rb_cConditionVariable, "initialize", rb_condvar_initialize, 0);
711
    rb_define_method(rb_cConditionVariable, "wait", rb_condvar_wait, -1);
712
    rb_define_method(rb_cConditionVariable, "signal", rb_condvar_signal, 0);
713
    rb_define_method(rb_cConditionVariable, "broadcast", rb_condvar_broadcast, 0);
714

  
715
    rb_define_alloc_func(rb_cQueue, queue_alloc);
716
    rb_define_method(rb_cQueue, "initialize", rb_queue_initialize, 0);
717
    rb_define_method(rb_cQueue, "push", rb_queue_push, 1);
718
    rb_define_method(rb_cQueue, "pop", rb_queue_pop, -1);
719
    rb_define_method(rb_cQueue, "empty?", rb_queue_empty_p, 0);
720
    rb_define_method(rb_cQueue, "clear", rb_queue_clear, 0);
721
    rb_define_method(rb_cQueue, "length", rb_queue_length, 0);
722
    rb_define_method(rb_cQueue, "num_waiting", rb_queue_num_waiting, 0);
723
    rb_alias(rb_cQueue, rb_intern("enq"), rb_intern("push"));
724
    rb_alias(rb_cQueue, rb_intern("<<"), rb_intern("push"));
725
    rb_alias(rb_cQueue, rb_intern("deq"), rb_intern("pop"));
726
    rb_alias(rb_cQueue, rb_intern("shift"), rb_intern("pop"));
727
    rb_alias(rb_cQueue, rb_intern("size"), rb_intern("length"));
728

  
729
    rb_define_alloc_func(rb_cSizedQueue, szqueue_alloc);
730
    rb_define_method(rb_cSizedQueue, "initialize", rb_szqueue_initialize, 1);
731
    rb_define_method(rb_cSizedQueue, "max", rb_szqueue_max_get, 0);
732
    rb_define_method(rb_cSizedQueue, "max=", rb_szqueue_max_set, 1);
733
    rb_define_method(rb_cSizedQueue, "push", rb_szqueue_push, 1);
734
    rb_define_method(rb_cSizedQueue, "pop", rb_szqueue_pop, -1);
735
    rb_define_method(rb_cSizedQueue, "num_waiting", rb_szqueue_num_waiting, 0);
736
    rb_alias(rb_cSizedQueue, rb_intern("enq"), rb_intern("push"));
737
    rb_alias(rb_cSizedQueue, rb_intern("<<"), rb_intern("push"));
738
    rb_alias(rb_cSizedQueue, rb_intern("deq"), rb_intern("pop"));
739
    rb_alias(rb_cSizedQueue, rb_intern("shift"), rb_intern("pop"));
740

  
741
    rb_provide("thread.rb");
742
    ALIAS_GLOBCAL_CONST(ConditionVariable);
743
    ALIAS_GLOBCAL_CONST(Queue);
744
    ALIAS_GLOBCAL_CONST(SizedQueue);
745
}
lib/thread.rb
1
#
2
#               thread.rb - thread support classes
3
#                       by Yukihiro Matsumoto <matz@netlab.co.jp>
4
#
5
# Copyright (C) 2001  Yukihiro Matsumoto
6
# Copyright (C) 2000  Network Applied Communication Laboratory, Inc.
7
# Copyright (C) 2000  Information-technology Promotion Agency, Japan
8
#
9

  
10
unless defined? Thread
11
  raise "Thread not available for this ruby interpreter"
12
end
13

  
14
unless defined? ThreadError
15
  class ThreadError < StandardError
16
  end
17
end
18

  
19
if $DEBUG
20
  Thread.abort_on_exception = true
21
end
22

  
23
#
24
# ConditionVariable objects augment class Mutex. Using condition variables,
25
# it is possible to suspend while in the middle of a critical section until a
26
# resource becomes available.
27
#
28
# Example:
29
#
30
#   require 'thread'
31
#
32
#   mutex = Mutex.new
33
#   resource = ConditionVariable.new
34
#
35
#   a = Thread.new {
36
#     mutex.synchronize {
37
#       # Thread 'a' now needs the resource
38
#       resource.wait(mutex)
39
#       # 'a' can now have the resource
40
#     }
41
#   }
42
#
43
#   b = Thread.new {
44
#     mutex.synchronize {
45
#       # Thread 'b' has finished using the resource
46
#       resource.signal
47
#     }
48
#   }
49
#
50
class ConditionVariable
51
  #
52
  # Creates a new ConditionVariable
53
  #
54
  def initialize
55
    @waiters = {}
56
    @waiters_mutex = Mutex.new
57
  end
58

  
59
  #
60
  # Releases the lock held in +mutex+ and waits; reacquires the lock on wakeup.
61
  #
62
  # If +timeout+ is given, this method returns after +timeout+ seconds passed,
63
  # even if no other thread doesn't signal.
64
  #
65
  def wait(mutex, timeout=nil)
66
    Thread.handle_interrupt(StandardError => :never) do
67
      begin
68
        Thread.handle_interrupt(StandardError => :on_blocking) do
69
          @waiters_mutex.synchronize do
70
            @waiters[Thread.current] = true
71
          end
72
          mutex.sleep timeout
73
        end
74
      ensure
75
        @waiters_mutex.synchronize do
76
          @waiters.delete(Thread.current)
77
        end
78
      end
79
    end
80
    self
81
  end
82

  
83
  #
84
  # Wakes up the first thread in line waiting for this lock.
85
  #
86
  def signal
87
    Thread.handle_interrupt(StandardError => :on_blocking) do
88
      begin
89
        t, _ = @waiters_mutex.synchronize { @waiters.shift }
90
        t.run if t
91
      rescue ThreadError
92
        retry # t was already dead?
93
      end
94
    end
95
    self
96
  end
97

  
98
  #
99
  # Wakes up all threads waiting for this lock.
100
  #
101
  def broadcast
102
    Thread.handle_interrupt(StandardError => :on_blocking) do
103
      threads = nil
104
      @waiters_mutex.synchronize do
105
        threads = @waiters.keys
106
        @waiters.clear
107
      end
108
      for t in threads
109
        begin
110
          t.run
111
        rescue ThreadError
112
        end
113
      end
114
    end
115
    self
116
  end
117
end
118

  
119
#
120
# This class provides a way to synchronize communication between threads.
121
#
122
# Example:
123
#
124
#   require 'thread'
125
#
126
#   queue = Queue.new
127
#
128
#   producer = Thread.new do
129
#     5.times do |i|
130
#       sleep rand(i) # simulate expense
131
#       queue << i
132
#       puts "#{i} produced"
133
#     end
134
#   end
135
#
136
#   consumer = Thread.new do
137
#     5.times do |i|
138
#       value = queue.pop
139
#       sleep rand(i/2) # simulate expense
140
#       puts "consumed #{value}"
141
#     end
142
#   end
143
#
144
#   consumer.join
145
#
146
class Queue
147
  #
148
  # Creates a new queue.
149
  #
150
  def initialize
151
    @que = []
152
    @que.taint          # enable tainted communication
153
    @num_waiting = 0
154
    self.taint
155
    @mutex = Mutex.new
156
    @cond = ConditionVariable.new
157
  end
158

  
159
  #
160
  # Pushes +obj+ to the queue.
161
  #
162
  def push(obj)
163
    Thread.handle_interrupt(StandardError => :on_blocking) do
164
      @mutex.synchronize do
165
        @que.push obj
166
        @cond.signal
167
      end
168
      self
169
    end
170
  end
171

  
172
  #
173
  # Alias of push
174
  #
175
  alias << push
176

  
177
  #
178
  # Alias of push
179
  #
180
  alias enq push
181

  
182
  #
183
  # Retrieves data from the queue.  If the queue is empty, the calling thread is
184
  # suspended until data is pushed onto the queue.  If +non_block+ is true, the
185
  # thread isn't suspended, and an exception is raised.
186
  #
187
  def pop(non_block=false)
188
    Thread.handle_interrupt(StandardError => :on_blocking) do
189
      @mutex.synchronize do
190
        while true
191
          if @que.empty?
192
            if non_block
193
              raise ThreadError, "queue empty"
194
            else
195
              begin
196
                @num_waiting += 1
197
                @cond.wait @mutex
198
              ensure
199
                @num_waiting -= 1
200
              end
201
            end
202
          else
203
            return @que.shift
204
          end
205
        end
206
      end
207
    end
208
  end
209

  
210
  #
211
  # Alias of pop
212
  #
213
  alias shift pop
214

  
215
  #
216
  # Alias of pop
217
  #
218
  alias deq pop
219

  
220
  #
221
  # Returns +true+ if the queue is empty.
222
  #
223
  def empty?
224
    @que.empty?
225
  end
226

  
227
  #
228
  # Removes all objects from the queue.
229
  #
230
  def clear
231
    @que.clear
232
    self
233
  end
234

  
235
  #
236
  # Returns the length of the queue.
237
  #
238
  def length
239
    @que.length
240
  end
241

  
242
  #
243
  # Alias of length.
244
  #
245
  alias size length
246

  
247
  #
248
  # Returns the number of threads waiting on the queue.
249
  #
250
  def num_waiting
251
    @num_waiting
252
  end
253
end
254

  
255
#
256
# This class represents queues of specified size capacity.  The push operation
257
# may be blocked if the capacity is full.
258
#
259
# See Queue for an example of how a SizedQueue works.
260
#
261
class SizedQueue < Queue
262
  #
263
  # Creates a fixed-length queue with a maximum size of +max+.
264
  #
265
  def initialize(max)
266
    raise ArgumentError, "queue size must be positive" unless max > 0
267
    @max = max
268
    @enque_cond = ConditionVariable.new
269
    @num_enqueue_waiting = 0
270
    super()
271
  end
272

  
273
  #
274
  # Returns the maximum size of the queue.
275
  #
276
  def max
277
    @max
278
  end
279

  
280
  #
281
  # Sets the maximum size of the queue.
282
  #
283
  def max=(max)
284
    raise ArgumentError, "queue size must be positive" unless max > 0
285

  
286
    @mutex.synchronize do
287
      if max <= @max
288
        @max = max
289
      else
290
        diff = max - @max
291
        @max = max
292
        diff.times do
293
          @enque_cond.signal
294
        end
295
      end
296
    end
297
    max
298
  end
299

  
300
  #
301
  # Pushes +obj+ to the queue.  If there is no space left in the queue, waits
302
  # until space becomes available.
303
  #
304
  def push(obj)
305
    Thread.handle_interrupt(RuntimeError => :on_blocking) do
306
      @mutex.synchronize do
307
        while true
308
          break if @que.length < @max
309
          @num_enqueue_waiting += 1
310
          begin
311
            @enque_cond.wait @mutex
312
          ensure
313
            @num_enqueue_waiting -= 1
314
          end
315
        end
316

  
317
        @que.push obj
318
        @cond.signal
319
      end
320
      self
321
    end
322
  end
323

  
324
  #
325
  # Alias of push
326
  #
327
  alias << push
328

  
329
  #
330
  # Alias of push
331
  #
332
  alias enq push
333

  
334
  #
335
  # Retrieves data from the queue and runs a waiting thread, if any.
336
  #
337
  def pop(*args)
338
    retval = super
339
    @mutex.synchronize do
340
      if @que.length < @max
341
        @enque_cond.signal
342
      end
343
    end
344
    retval
345
  end
346

  
347
  #
348
  # Alias of pop
349
  #
350
  alias shift pop
351

  
352
  #
353
  # Alias of pop
354
  #
355
  alias deq pop
356

  
357
  #
358
  # Returns the number of threads waiting on the queue.
359
  #
360
  def num_waiting
361
    @num_waiting + @num_enqueue_waiting
362
  end
363
end
364

  
365
# Documentation comments:
366
#  - How do you make RDoc inherit documentation from superclass?