final_queue_without_mutex.diff

final_queue patch without using mutex (relying on GVL) - Yura Sokolov, 06/26/2012 04:18 PM

Download (16.7 KB)

View differences:

ext/thread/extconf.rb
1
create_makefile('extthread')
ext/thread/extthread.c
1
#include <ruby.h>
2

  
3
RUBY_EXTERN size_t rb_objspace_data_type_memsize(VALUE);
4
RUBY_EXTERN size_t rb_ary_memsize(VALUE);
5

  
6
static VALUE
7
rb_ary_buf_new(void)
8
{
9
    VALUE ary = rb_ary_tmp_new(1);
10
    OBJ_UNTRUST(ary);
11
    return ary;
12
}
13

  
14
static void
15
wakeup_first_thread(VALUE list)
16
{
17
    VALUE thread;
18

  
19
    while (!NIL_P(thread = rb_ary_shift(list))) {
20
       if (RTEST(rb_thread_wakeup_alive(thread))) break;
21
    }
22
}
23

  
24
static void
25
wakeup_all_threads(VALUE list)
26
{
27
    VALUE thread, list0 = list;
28
    long i;
29

  
30
    list = rb_ary_subseq(list, 0, LONG_MAX);
31
    rb_ary_clear(list0);
32
    for (i = 0; i < RARRAY_LEN(list); ++i) {
33
       thread = RARRAY_PTR(list)[i];
34
       rb_thread_wakeup_alive(thread);
35
    }
36
    RB_GC_GUARD(list);
37
}
38

  
39
/*
40
 *  Document-class: ConditionVariable
41
 *
42
 *  ConditionVariable objects augment class Mutex. Using condition variables,
43
 *  it is possible to suspend while in the middle of a critical section until a
44
 *  resource becomes available.
45
 *
46
 *  Example:
47
 *
48
 *    require 'thread'
49
 *
50
 *    mutex = Mutex.new
51
 *    resource = ConditionVariable.new
52
 *
53
 *    a = Thread.new {
54
 *      mutex.synchronize {
55
 *        # Thread 'a' now needs the resource
56
 *        resource.wait(mutex)
57
 *        # 'a' can now have the resource
58
 *      }
59
 *    }
60
 *
61
 *    b = Thread.new {
62
 *      mutex.synchronize {
63
 *        # Thread 'b' has finished using the resource
64
 *        resource.signal
65
 *      }
66
 *    }
67
 */
68

  
69
typedef struct {
70
    VALUE waiters;
71
} CondVar;
72

  
73
static void
74
condvar_mark(void *ptr)
75
{
76
    CondVar *condvar = ptr;
77
    rb_gc_mark(condvar->waiters);
78
}
79

  
80
#define condvar_free RUBY_TYPED_DEFAULT_FREE
81

  
82
static size_t
83
condvar_memsize(const void *ptr)
84
{
85
    size_t size = 0;
86
    if (ptr) {
87
       const CondVar *condvar = ptr;
88
       size = sizeof(CondVar);
89
       size += rb_ary_memsize(condvar->waiters);
90
    }
91
    return size;
92
}
93

  
94
static const rb_data_type_t condvar_data_type = {
95
    "condvar",
96
    {condvar_mark, condvar_free, condvar_memsize,},
97
};
98

  
99
#define GetCondVarPtr(obj, tobj) \
100
    TypedData_Get_Struct(obj, CondVar, &condvar_data_type, tobj)
101

  
102
static CondVar *
103
get_condvar_ptr(VALUE self)
104
{
105
    CondVar *condvar;
106
    GetCondVarPtr(self, condvar);
107
    if (!condvar->waiters) {
108
       rb_raise(rb_eArgError, "uninitialized CondionVariable");
109
    }
110
    return condvar;
111
}
112

  
113
static VALUE
114
condvar_alloc(VALUE klass)
115
{
116
    CondVar *condvar;
117
    return TypedData_Make_Struct(klass, CondVar, &condvar_data_type, condvar);
118
}
119

  
120
static void
121
condvar_initialize(CondVar *condvar)
122
{
123
    condvar->waiters = rb_ary_buf_new();
124
}
125

  
126
/*
127
 * Document-method: new
128
 * call-seq: new
129
 *
130
 * Creates a new condvar.
131
 */
132

  
133
static VALUE
134
rb_condvar_initialize(VALUE self)
135
{
136
    CondVar *condvar;
137
    GetCondVarPtr(self, condvar);
138

  
139
    condvar_initialize(condvar);
140

  
141
    return self;
142
}
143

  
144
struct sleep_call {
145
    int argc;
146
    VALUE *argv;
147
};
148

  
149
static VALUE
150
do_sleep(VALUE args)
151
{
152
    struct sleep_call *p = (struct sleep_call *)args;
153
    return rb_funcall(p->argv[0], rb_intern("sleep"), p->argc-1, p->argv+1);
154
}
155

  
156
static VALUE
157
delete_current_thread(VALUE ary)
158
{
159
    return rb_ary_delete(ary, rb_thread_current());
160
}
161

  
162
/*
163
 * Document-method: wait
164
 * call-seq: wait(mutex, timeout=nil)
165
 *
166
 * Releases the lock held in +mutex+ and waits; reacquires the lock on wakeup.
167
 *
168
 * If +timeout+ is given, this method returns after +timeout+ seconds passed,
169
 * even if no other thread doesn't signal.
170
 */
171

  
172
static VALUE
173
rb_condvar_wait(int argc, VALUE *argv, VALUE self)
174
{
175
    VALUE waiters = get_condvar_ptr(self)->waiters;
176
    struct sleep_call args;
177

  
178
    args.argc = argc;
179
    args.argv = argv;
180
    rb_ary_push(waiters, rb_thread_current());
181
    rb_ensure(do_sleep, (VALUE)&args, delete_current_thread, waiters);
182
    return self;
183
}
184

  
185
/*
186
 * Document-method: signal
187
 * call-seq: signal
188
 *
189
 * Wakes up the first thread in line waiting for this lock.
190
 */
191

  
192
static VALUE
193
rb_condvar_signal(VALUE self)
194
{
195
    wakeup_first_thread(get_condvar_ptr(self)->waiters);
196
    return self;
197
}
198

  
199
/*
200
 * Document-method: broadcast
201
 * call-seq: broadcast
202
 *
203
 * Wakes up all threads waiting for this lock.
204
 */
205

  
206
static VALUE
207
rb_condvar_broadcast(VALUE self)
208
{
209
    wakeup_all_threads(get_condvar_ptr(self)->waiters);
210
    return self;
211
}
212

  
213
/*
214
 *  Document-class: Queue
215
 *
216
 *  This class provides a way to synchronize communication between threads.
217
 *
218
 *  Example:
219
 *
220
 *    require 'thread'
221
 *    queue = Queue.new
222
 *
223
 *  producer = Thread.new do
224
 *    5.times do |i|
225
 *      sleep rand(i) # simulate expense
226
 *      queue << i
227
 *      puts "#{i} produced"
228
 *    end
229
 *  end
230
 *
231
 *  consumer = Thread.new do
232
 *    5.times do |i|
233
 *      value = queue.pop
234
 *      sleep rand(i/2) # simulate expense
235
 *      puts "consumed #{value}"
236
 *    end
237
 *  end
238
 *
239
 */
240

  
241
typedef struct {
242
    VALUE que;
243
    VALUE waiting;
244
} Queue;
245

  
246
static void
247
queue_mark(void *ptr)
248
{
249
    Queue *queue = ptr;
250
    rb_gc_mark(queue->que);
251
    rb_gc_mark(queue->waiting);
252
}
253

  
254
#define queue_free RUBY_TYPED_DEFAULT_FREE
255

  
256
static size_t
257
queue_memsize(const void *ptr)
258
{
259
    size_t size = 0;
260
    if (ptr) {
261
       const Queue *queue = ptr;
262
       size = sizeof(Queue);
263
       size += rb_ary_memsize(queue->que);
264
       size += rb_ary_memsize(queue->waiting);
265
    }
266
    return size;
267
}
268

  
269
static const rb_data_type_t queue_data_type = {
270
    "queue",
271
    {queue_mark, queue_free, queue_memsize,},
272
};
273

  
274
#define GetQueuePtr(obj, tobj) \
275
    TypedData_Get_Struct(obj, Queue, &queue_data_type, tobj)
276

  
277
static Queue *
278
get_queue_ptr(VALUE self)
279
{
280
    Queue *queue;
281
    GetQueuePtr(self, queue);
282
    if (!queue->que || !queue->waiting) {
283
       rb_raise(rb_eArgError, "uninitialized Queue");
284
    }
285
    return queue;
286
}
287

  
288
static VALUE
289
queue_alloc(VALUE klass)
290
{
291
    Queue *queue;
292
    return TypedData_Make_Struct(klass, Queue, &queue_data_type, queue);
293
}
294

  
295
static void
296
queue_initialize(Queue *queue)
297
{
298
    queue->que = rb_ary_buf_new();
299
    queue->waiting = rb_ary_buf_new();
300
}
301

  
302
/*
303
 * Document-method: new
304
 * call-seq: new
305
 *
306
 * Creates a new queue.
307
 */
308

  
309
static VALUE
310
rb_queue_initialize(VALUE self)
311
{
312
    Queue *queue;
313
    GetQueuePtr(self, queue);
314

  
315
    queue_initialize(queue);
316

  
317
    return self;
318
}
319

  
320
static VALUE
321
queue_do_push(Queue *queue, VALUE obj)
322
{
323
    rb_ary_push(queue->que, obj);
324
    wakeup_first_thread(queue->waiting);
325
    return Qnil;
326
}
327

  
328
/*
329
 * Document-method: push
330
 * call-seq: push(obj)
331
 *
332
 * Pushes +obj+ to the queue.
333
 */
334

  
335
static VALUE
336
rb_queue_push(VALUE self, VALUE obj)
337
{
338
    queue_do_push(get_queue_ptr(self), obj);
339
    return self;
340
}
341

  
342
static VALUE
343
queue_do_pop(Queue *queue, VALUE should_block)
344
{
345
    while (!RARRAY_LEN(queue->que)) {
346
       if (!(int)should_block) {
347
           rb_raise(rb_eThreadError, "queue empty");
348
       }
349
       rb_ary_push(queue->waiting, rb_thread_current());
350
       rb_thread_sleep_forever();
351
    }
352

  
353
    return rb_ary_shift(queue->que);
354
}
355

  
356
static int
357
queue_pop_should_block(int argc, VALUE *argv)
358
{
359
    int should_block = 1;
360
    switch (argc) {
361
      case 0:
362
       break;
363
      case 1:
364
       should_block = !RTEST(argv[0]);
365
       break;
366
      default:
367
       rb_raise(rb_eArgError, "wrong number of arguments (%d for 1)", argc);
368
    }
369
    return should_block;
370
}
371

  
372
/*
373
 * Document-method: pop
374
 * call_seq: pop(non_block=false)
375
 *
376
 * Retrieves data from the queue.  If the queue is empty, the calling thread is
377
 * suspended until data is pushed onto the queue.  If +non_block+ is true, the
378
 * thread isn't suspended, and an exception is raised.
379
 */
380

  
381
static VALUE
382
rb_queue_pop(int argc, VALUE *argv, VALUE self)
383
{
384
    Queue *queue = get_queue_ptr(self);
385
    int should_block = queue_pop_should_block(argc, argv);
386
    return queue_do_pop(queue, (VALUE)should_block);
387
}
388

  
389
static inline unsigned long
390
queue_length(Queue *queue)
391
{
392
    return (unsigned long)RARRAY_LEN(queue->que);
393
}
394

  
395
static inline unsigned long
396
queue_num_waiting(Queue *queue)
397
{
398
    return (unsigned long)RARRAY_LEN(queue->waiting);
399
}
400

  
401
/*
402
 * Document-method: empty?
403
 * call-seq: empty?
404
 *
405
 * Returns +true+ if the queue is empty.
406
 */
407

  
408
static VALUE
409
rb_queue_empty_p(VALUE self)
410
{
411
    return queue_length(get_queue_ptr(self)) == 0 ? Qtrue : Qfalse;
412
}
413

  
414
/*
415
 * Document-method: clear
416
 * call-seq: clear
417
 *
418
 * Removes all objects from the queue.
419
 */
420

  
421
static VALUE
422
rb_queue_clear(VALUE self)
423
{
424
    Queue *queue = get_queue_ptr(self);
425

  
426
    rb_ary_clear(queue->que);
427

  
428
    return self;
429
}
430

  
431
/*
432
 * Document-method: length
433
 * call-seq: length
434
 *
435
 * Returns the length of the queue.
436
 */
437

  
438
static VALUE
439
rb_queue_length(VALUE self)
440
{
441
    unsigned long len = queue_length(get_queue_ptr(self));
442
    return ULONG2NUM(len);
443
}
444

  
445
/*
446
 * Document-method: num_waiting
447
 * call-seq: num_waiting
448
 *
449
 * Returns the number of threads waiting on the queue.
450
 */
451

  
452
static VALUE
453
rb_queue_num_waiting(VALUE self)
454
{
455
    long len = queue_num_waiting(get_queue_ptr(self));
456
    return ULONG2NUM(len);
457
}
458

  
459
/*
460
 *  Document-class: SizedQueue
461
 *
462
 * This class represents queues of specified size capacity.  The push operation
463
 * may be blocked if the capacity is full.
464
 *
465
 * See Queue for an example of how a SizedQueue works.
466
 */
467

  
468
typedef struct  {
469
    Queue queue_;
470
    VALUE queue_wait;
471
    unsigned long max;
472
} SizedQueue;
473

  
474
static void
475
szqueue_mark(void *ptr)
476
{
477
    SizedQueue *szqueue = ptr;
478
    queue_mark(&szqueue->queue_);
479
    rb_gc_mark(szqueue->queue_wait);
480
}
481

  
482
#define szqueue_free queue_free
483

  
484
static size_t
485
szqueue_memsize(const void *ptr)
486
{
487
    size_t size = 0;
488
    if (ptr) {
489
       const SizedQueue *szqueue = ptr;
490
       size = sizeof(SizedQueue) - sizeof(Queue);
491
       size += queue_memsize(&szqueue->queue_);
492
       size += rb_ary_memsize(szqueue->queue_wait);
493
    }
494
    return size;
495
}
496

  
497
static const rb_data_type_t szqueue_data_type = {
498
    "sized_queue",
499
    {szqueue_mark, szqueue_free, szqueue_memsize,},
500
    &queue_data_type,
501
};
502

  
503
#define GetSizedQueuePtr(obj, tobj) \
504
    TypedData_Get_Struct(obj, SizedQueue, &szqueue_data_type, tobj)
505

  
506
static SizedQueue *
507
get_szqueue_ptr(VALUE self)
508
{
509
    SizedQueue *szqueue;
510
    GetSizedQueuePtr(self, szqueue);
511
    if (!szqueue->queue_.que || !szqueue->queue_.waiting || !szqueue->queue_wait) {
512
       rb_raise(rb_eArgError, "uninitialized Queue");
513
    }
514
    return szqueue;
515
}
516

  
517
static VALUE
518
szqueue_alloc(VALUE klass)
519
{
520
    SizedQueue *szqueue;
521
    return TypedData_Make_Struct(klass, SizedQueue, &szqueue_data_type, szqueue);
522
}
523

  
524
/*
525
 * Document-method: new
526
 * call-seq: new(max)
527
 *
528
 * Creates a fixed-length queue with a maximum size of +max+.
529
 */
530

  
531
static VALUE
532
rb_szqueue_initialize(VALUE self, VALUE vmax)
533
{
534
    long max;
535
    SizedQueue *szqueue;
536
    GetSizedQueuePtr(self, szqueue);
537

  
538
    max = NUM2LONG(vmax);
539
    if (max <= 0) {
540
       rb_raise(rb_eArgError, "queue size must be positive");
541
    }
542
    queue_initialize(&szqueue->queue_);
543
    szqueue->queue_wait = rb_ary_buf_new();
544
    szqueue->max = (unsigned long)max;
545

  
546
    return self;
547
}
548

  
549
/*
550
 * Document-method: max
551
 * call-seq: max
552
 *
553
 * Returns the maximum size of the queue.
554
 */
555

  
556
static VALUE
557
rb_szqueue_max_get(VALUE self)
558
{
559
    unsigned long max = get_szqueue_ptr(self)->max;
560
    return ULONG2NUM(max);
561
}
562

  
563
/*
564
 * Document-method: max=
565
 * call-seq: max=(n)
566
 *
567
 * Sets the maximum size of the queue.
568
 */
569

  
570
static VALUE
571
rb_szqueue_max_set(VALUE self, VALUE vmax)
572
{
573
    SizedQueue *szqueue = get_szqueue_ptr(self);
574
    long max = NUM2LONG(vmax), diff = 0;
575
    VALUE t;
576

  
577
    if (max <= 0) {
578
       rb_raise(rb_eArgError, "queue size must be positive");
579
    }
580
    if ((unsigned long)max > szqueue->max) {
581
       diff = max - szqueue->max;
582
    }
583
    szqueue->max = max;
584
    while (diff > 0 && !NIL_P(t = rb_ary_shift(szqueue->queue_wait))) {
585
       rb_thread_wakeup_alive(t);
586
    }
587
    return vmax;
588
}
589

  
590
static VALUE
591
szqueue_do_push(SizedQueue *szqueue, VALUE obj)
592
{
593
    while (queue_length(&szqueue->queue_) >= szqueue->max) {
594
       rb_ary_push(szqueue->queue_wait, rb_thread_current());
595
       rb_thread_sleep_forever();
596
    }
597
    return queue_do_push(&szqueue->queue_, obj);
598
}
599

  
600
/*
601
 * Document-method: push
602
 * call-seq: push(obj)
603
 *
604
 * Pushes +obj+ to the queue.  If there is no space left in the queue, waits
605
 * until space becomes available.
606
 */
607

  
608
static VALUE
609
rb_szqueue_push(VALUE self, VALUE obj)
610
{
611
    szqueue_do_push(get_szqueue_ptr(self), obj);
612
    return self;
613
}
614

  
615
static VALUE
616
szqueue_do_pop(SizedQueue *szqueue, VALUE should_block)
617
{
618
    VALUE retval = queue_do_pop(&szqueue->queue_, should_block);
619

  
620
    if (queue_length(&szqueue->queue_) < szqueue->max) {
621
       wakeup_first_thread(szqueue->queue_wait);
622
    }
623

  
624
    return retval;
625
}
626

  
627
/*
628
 * Document-method: pop
629
 * call_seq: pop(non_block=false)
630
 *
631
 * Returns the number of threads waiting on the queue.
632
 */
633

  
634
static VALUE
635
rb_szqueue_pop(int argc, VALUE *argv, VALUE self)
636
{
637
    SizedQueue *szqueue = get_szqueue_ptr(self);
638
    int should_block = queue_pop_should_block(argc, argv);
639
    return szqueue_do_pop(szqueue, (VALUE)should_block);
640
}
641

  
642
/*
643
 * Document-method: pop
644
 * call_seq: pop(non_block=false)
645
 *
646
 * Returns the number of threads waiting on the queue.
647
 */
648

  
649
static VALUE
650
rb_szqueue_num_waiting(VALUE self)
651
{
652
    SizedQueue *szqueue = get_szqueue_ptr(self);
653
    long len = queue_num_waiting(&szqueue->queue_);
654
    len += RARRAY_LEN(szqueue->queue_wait);
655
    return ULONG2NUM(len);
656
}
657

  
658
#ifndef UNDER_THREAD
659
#define UNDER_THREAD 1
660
#endif
661

  
662
void
663
Init_extthread(void)
664
{
665
#if UNDER_THREAD
666
#define DEFINE_CLASS_UNDER_THREAD(name, super) rb_define_class_under(rb_cThread, #name, super)
667
#define ALIAS_GLOBCAL_CONST(name) do {                 \
668
       ID id = rb_intern_const(#name);                 \
669
       if (!rb_const_defined_at(rb_cObject, id)) {     \
670
           rb_const_set(rb_cObject, id, rb_c##name);   \
671
       }                                               \
672
    } while (0)
673
#else
674
#define DEFINE_CLASS_UNDER_THREAD(name, super) rb_define_class(name, super)
675
#define ALIAS_GLOBCAL_CONST(name) do { /* nothing */ } while (0)
676
#endif
677
    VALUE rb_cConditionVariable = DEFINE_CLASS_UNDER_THREAD(ConditionVariable, rb_cObject);
678
    VALUE rb_cQueue = DEFINE_CLASS_UNDER_THREAD(Queue, rb_cObject);
679
    VALUE rb_cSizedQueue = DEFINE_CLASS_UNDER_THREAD(SizedQueue, rb_cQueue);
680

  
681
    rb_define_alloc_func(rb_cConditionVariable, condvar_alloc);
682
    rb_define_method(rb_cConditionVariable, "initialize", rb_condvar_initialize, 0);
683
    rb_define_method(rb_cConditionVariable, "wait", rb_condvar_wait, -1);
684
    rb_define_method(rb_cConditionVariable, "signal", rb_condvar_signal, 0);
685
    rb_define_method(rb_cConditionVariable, "broadcast", rb_condvar_broadcast, 0);
686

  
687
    rb_define_alloc_func(rb_cQueue, queue_alloc);
688
    rb_define_method(rb_cQueue, "initialize", rb_queue_initialize, 0);
689
    rb_define_method(rb_cQueue, "push", rb_queue_push, 1);
690
    rb_define_method(rb_cQueue, "pop", rb_queue_pop, -1);
691
    rb_define_method(rb_cQueue, "empty?", rb_queue_empty_p, 0);
692
    rb_define_method(rb_cQueue, "clear", rb_queue_clear, 0);
693
    rb_define_method(rb_cQueue, "length", rb_queue_length, 0);
694
    rb_define_method(rb_cQueue, "num_waiting", rb_queue_num_waiting, 0);
695
    rb_alias(rb_cQueue, rb_intern("enq"), rb_intern("push"));
696
    rb_alias(rb_cQueue, rb_intern("<<"), rb_intern("push"));
697
    rb_alias(rb_cQueue, rb_intern("deq"), rb_intern("pop"));
698
    rb_alias(rb_cQueue, rb_intern("shift"), rb_intern("pop"));
699
    rb_alias(rb_cQueue, rb_intern("size"), rb_intern("length"));
700

  
701
    rb_define_alloc_func(rb_cSizedQueue, szqueue_alloc);
702
    rb_define_method(rb_cSizedQueue, "initialize", rb_szqueue_initialize, 1);
703
    rb_define_method(rb_cSizedQueue, "max", rb_szqueue_max_get, 0);
704
    rb_define_method(rb_cSizedQueue, "max=", rb_szqueue_max_set, 1);
705
    rb_define_method(rb_cSizedQueue, "push", rb_szqueue_push, 1);
706
    rb_define_method(rb_cSizedQueue, "pop", rb_szqueue_pop, -1);
707
    rb_define_method(rb_cSizedQueue, "num_waiting", rb_szqueue_num_waiting, 0);
708
    rb_alias(rb_cSizedQueue, rb_intern("enq"), rb_intern("push"));
709
    rb_alias(rb_cSizedQueue, rb_intern("<<"), rb_intern("push"));
710
    rb_alias(rb_cSizedQueue, rb_intern("deq"), rb_intern("pop"));
711
    rb_alias(rb_cSizedQueue, rb_intern("shift"), rb_intern("pop"));
712

  
713
    rb_provide("thread.rb");
714
    ALIAS_GLOBCAL_CONST(ConditionVariable);
715
    ALIAS_GLOBCAL_CONST(Queue);
716
    ALIAS_GLOBCAL_CONST(SizedQueue);
717
}