thread.c

Koichi Sasada, 08/29/2013 06:54 PM

Download (12.4 KB)

 
1
#include <ruby.h>
2

    
3
enum {
4
    CONDVAR_WAITERS = 0
5
};
6

    
7
enum {
8
    QUEUE_QUE       = 0,
9
    QUEUE_WAITERS   = 1,
10
    SZQUEUE_WAITERS = 2,
11
    SZQUEUE_MAX     = 3
12
};
13

    
14
#define GET_CONDVAR_WAITERS(cv) RSTRUCT_GET((cv), CONDVAR_WAITERS)
15

    
16
#define GET_QUEUE_QUE(q)        RSTRUCT_GET((q), QUEUE_QUE)
17
#define GET_QUEUE_WAITERS(q)    RSTRUCT_GET((q), QUEUE_WAITERS)
18
#define GET_SZQUEUE_WAITERS(q)  RSTRUCT_GET((q), SZQUEUE_WAITERS)
19
#define GET_SZQUEUE_MAX(q)      RSTRUCT_GET((q), SZQUEUE_MAX)
20
#define GET_SZQUEUE_ULONGMAX(q) NUM2ULONG(GET_SZQUEUE_MAX(q))
21

    
22
static VALUE
23
ary_buf_new(void)
24
{
25
    return rb_ary_tmp_new(1);
26
}
27

    
28
static void
29
wakeup_first_thread(VALUE list)
30
{
31
    VALUE thread;
32

    
33
    while (!NIL_P(thread = rb_ary_shift(list))) {
34
        if (RTEST(rb_thread_wakeup_alive(thread))) break;
35
    }
36
}
37

    
38
static void
39
wakeup_all_threads(VALUE list)
40
{
41
    VALUE thread;
42
    long i;
43

    
44
    for (i=0; i<RARRAY_LEN(list); i++) {
45
        thread = RARRAY_AREF(list, i);
46
        rb_thread_wakeup_alive(thread);
47
    }
48
    rb_ary_clear(list);
49
}
50

    
51
/*
52
 *  Document-class: ConditionVariable
53
 *
54
 *  ConditionVariable objects augment class Mutex. Using condition variables,
55
 *  it is possible to suspend while in the middle of a critical section until a
56
 *  resource becomes available.
57
 *
58
 *  Example:
59
 *
60
 *    require 'thread'
61
 *
62
 *    mutex = Mutex.new
63
 *    resource = ConditionVariable.new
64
 *
65
 *    a = Thread.new {
66
 *         mutex.synchronize {
67
 *           # Thread 'a' now needs the resource
68
 *           resource.wait(mutex)
69
 *           # 'a' can now have the resource
70
 *         }
71
 *    }
72
 *
73
 *    b = Thread.new {
74
 *         mutex.synchronize {
75
 *           # Thread 'b' has finished using the resource
76
 *           resource.signal
77
 *         }
78
 *    }
79
 */
80

    
81
/*
82
 * Document-method: new
83
 * call-seq: new
84
 *
85
 * Creates a new condvar.
86
 */
87

    
88
static VALUE
89
rb_condvar_initialize(VALUE self)
90
{
91
    RSTRUCT_SET(self, CONDVAR_WAITERS, ary_buf_new());
92
    return self;
93
}
94

    
95
struct sleep_call {
96
    VALUE mutex;
97
    VALUE timeout;
98
};
99

    
100
static VALUE
101
do_sleep(VALUE args)
102
{
103
    struct sleep_call *p = (struct sleep_call *)args;
104
    return rb_mutex_sleep(p->mutex, p->timeout);
105
}
106

    
107
static VALUE
108
delete_current_thread(VALUE ary)
109
{
110
    return rb_ary_delete(ary, rb_thread_current());
111
}
112

    
113
/*
114
 * Document-method: wait
115
 * call-seq: wait(mutex, timeout=nil)
116
 *
117
 * Releases the lock held in +mutex+ and waits; reacquires the lock on wakeup.
118
 *
119
 * If +timeout+ is given, this method returns after +timeout+ seconds passed,
120
 * even if no other thread doesn't signal.
121
 */
122

    
123
static VALUE
124
rb_condvar_wait(int argc, VALUE *argv, VALUE self)
125
{
126
    VALUE waiters = GET_CONDVAR_WAITERS(self);
127
    VALUE mutex, timeout;
128
    struct sleep_call args;
129

    
130
    rb_scan_args(argc, argv, "11", &mutex, &timeout);
131

    
132
    args.mutex   = mutex;
133
    args.timeout = timeout;
134
    rb_ary_push(waiters, rb_thread_current());
135
    rb_ensure(do_sleep, (VALUE)&args, delete_current_thread, waiters);
136

    
137
    return self;
138
}
139

    
140
/*
141
 * Document-method: signal
142
 * call-seq: signal
143
 *
144
 * Wakes up the first thread in line waiting for this lock.
145
 */
146

    
147
static VALUE
148
rb_condvar_signal(VALUE self)
149
{
150
    wakeup_first_thread(GET_CONDVAR_WAITERS(self));
151
    return self;
152
}
153

    
154
/*
155
 * Document-method: broadcast
156
 * call-seq: broadcast
157
 *
158
 * Wakes up all threads waiting for this lock.
159
 */
160

    
161
static VALUE
162
rb_condvar_broadcast(VALUE self)
163
{
164
    wakeup_all_threads(GET_CONDVAR_WAITERS(self));
165
    return self;
166
}
167

    
168
/*
169
 *  Document-class: Queue
170
 *
171
 *  This class provides a way to synchronize communication between threads.
172
 *
173
 *  Example:
174
 *
175
 *    require 'thread'
176
 *    queue = Queue.new
177
 *
178
 *  producer = Thread.new do
179
 *    5.times do |i|
180
 *         sleep rand(i) # simulate expense
181
 *         queue << i
182
 *         puts "#{i} produced"
183
 *    end
184
 *  end
185
 *
186
 *  consumer = Thread.new do
187
 *    5.times do |i|
188
 *         value = queue.pop
189
 *         sleep rand(i/2) # simulate expense
190
 *         puts "consumed #{value}"
191
 *    end
192
 *  end
193
 *
194
 */
195

    
196
/*
197
 * Document-method: new
198
 * call-seq: new
199
 *
200
 * Creates a new queue.
201
 */
202

    
203
static VALUE
204
rb_queue_initialize(VALUE self)
205
{
206
    RSTRUCT_SET(self, QUEUE_QUE, ary_buf_new());
207
    RSTRUCT_SET(self, QUEUE_WAITERS, ary_buf_new());
208
    return self;
209
}
210

    
211
static VALUE
212
queue_do_push(VALUE self, VALUE obj)
213
{
214
    rb_ary_push(GET_QUEUE_QUE(self), obj);
215
    wakeup_first_thread(GET_QUEUE_WAITERS(self));
216
    return self;
217
}
218

    
219
/*
220
 * Document-method: push
221
 * call-seq: push(obj)
222
 *
223
 * Pushes +obj+ to the queue.
224
 */
225

    
226
static VALUE
227
rb_queue_push(VALUE self, VALUE obj)
228
{
229
    return queue_do_push(self, obj);
230
}
231

    
232
static unsigned long
233
queue_length(VALUE self)
234
{
235
    return RARRAY_LEN(GET_QUEUE_QUE(self));
236
}
237

    
238
static unsigned long
239
queue_num_waiting(VALUE self)
240
{
241
    return RARRAY_LEN(GET_QUEUE_WAITERS(self));
242
}
243

    
244
struct waiting_delete {
245
    VALUE waiting;
246
    VALUE th;
247
};
248

    
249
static VALUE
250
queue_delete_from_waiting(struct waiting_delete *p)
251
{
252
    rb_ary_delete(p->waiting, p->th);
253
    return Qnil;
254
}
255

    
256
static VALUE
257
queue_do_pop(VALUE self, VALUE should_block)
258
{
259
    struct waiting_delete args;
260
    args.waiting = GET_QUEUE_WAITERS(self);
261
    args.th         = rb_thread_current();
262

    
263
    while (queue_length(self) == 0) {
264
        if (!(int)should_block) {
265
            rb_raise(rb_eThreadError, "queue empty");
266
        }
267
        rb_ary_push(args.waiting, args.th);
268
        rb_ensure((VALUE (*)())rb_thread_sleep_forever, (VALUE)0, queue_delete_from_waiting, (VALUE)&args);
269
    }
270

    
271
    return rb_ary_shift(GET_QUEUE_QUE(self));
272
}
273

    
274
static VALUE
275
queue_pop_should_block(int argc, VALUE *argv)
276
{
277
    VALUE should_block = Qtrue;
278
    switch (argc) {
279
      case 0:
280
        break;
281
      case 1:
282
        should_block = RTEST(argv[0]) ? Qtrue : Qfalse;
283
        break;
284
      default:
285
        rb_raise(rb_eArgError, "wrong number of arguments (%d for 1)", argc);
286
    }
287
    return should_block;
288
}
289

    
290
/*
291
 * Document-method: pop
292
 * call_seq: pop(non_block=false)
293
 *
294
 * Retrieves data from the queue.        If the queue is empty, the calling thread is
295
 * suspended until data is pushed onto the queue.  If +non_block+ is true, the
296
 * thread isn't suspended, and an exception is raised.
297
 */
298

    
299
static VALUE
300
rb_queue_pop(int argc, VALUE *argv, VALUE self)
301
{
302
    VALUE should_block = queue_pop_should_block(argc, argv);
303
    return queue_do_pop(self, should_block);
304
}
305

    
306
/*
307
 * Document-method: empty?
308
 * call-seq: empty?
309
 *
310
 * Returns +true+ if the queue is empty.
311
 */
312

    
313
static VALUE
314
rb_queue_empty_p(VALUE self)
315
{
316
    return queue_length(self) == 0 ? Qtrue : Qfalse;
317
}
318

    
319
/*
320
 * Document-method: clear
321
 * call-seq: clear
322
 *
323
 * Removes all objects from the queue.
324
 */
325

    
326
static VALUE
327
rb_queue_clear(VALUE self)
328
{
329
    rb_ary_clear(GET_QUEUE_QUE(self));
330
    return self;
331
}
332

    
333
/*
334
 * Document-method: length
335
 * call-seq: length
336
 *
337
 * Returns the length of the queue.
338
 */
339

    
340
static VALUE
341
rb_queue_length(VALUE self)
342
{
343
    unsigned long len = queue_length(self);
344
    return ULONG2NUM(len);
345
}
346

    
347
/*
348
 * Document-method: num_waiting
349
 * call-seq: num_waiting
350
 *
351
 * Returns the number of threads waiting on the queue.
352
 */
353

    
354
static VALUE
355
rb_queue_num_waiting(VALUE self)
356
{
357
    unsigned long len = queue_num_waiting(self);
358
    return ULONG2NUM(len);
359
}
360

    
361
/*
362
 *  Document-class: SizedQueue
363
 *
364
 * This class represents queues of specified size capacity.  The push operation
365
 * may be blocked if the capacity is full.
366
 *
367
 * See Queue for an example of how a SizedQueue works.
368
 */
369

    
370
/*
371
 * Document-method: new
372
 * call-seq: new(max)
373
 *
374
 * Creates a fixed-length queue with a maximum size of +max+.
375
 */
376

    
377
static VALUE
378
rb_szqueue_initialize(VALUE self, VALUE vmax)
379
{
380
    long max;
381

    
382
    max = NUM2LONG(vmax);
383
    if (max <= 0) {
384
        rb_raise(rb_eArgError, "queue size must be positive");
385
    }
386

    
387
    RSTRUCT_SET(self, QUEUE_QUE, ary_buf_new());
388
    RSTRUCT_SET(self, QUEUE_WAITERS, ary_buf_new());
389
    RSTRUCT_SET(self, SZQUEUE_WAITERS, ary_buf_new());
390
    RSTRUCT_SET(self, SZQUEUE_MAX, vmax);
391

    
392
    return self;
393
}
394

    
395
/*
396
 * Document-method: max
397
 * call-seq: max
398
 *
399
 * Returns the maximum size of the queue.
400
 */
401

    
402
static VALUE
403
rb_szqueue_max_get(VALUE self)
404
{
405
    return GET_SZQUEUE_MAX(self);
406
}
407

    
408
/*
409
 * Document-method: max=
410
 * call-seq: max=(n)
411
 *
412
 * Sets the maximum size of the queue.
413
 */
414

    
415
static VALUE
416
rb_szqueue_max_set(VALUE self, VALUE vmax)
417
{
418
    long max = NUM2LONG(vmax), diff = 0;
419
    VALUE t;
420

    
421
    if (max <= 0) {
422
        rb_raise(rb_eArgError, "queue size must be positive");
423
    }
424
    if ((unsigned long)max > GET_SZQUEUE_ULONGMAX(self)) {
425
        diff = max - GET_SZQUEUE_ULONGMAX(self);
426
    }
427
    RSTRUCT_SET(self, SZQUEUE_MAX, vmax);
428
    while (diff > 0 && !NIL_P(t = rb_ary_shift(GET_QUEUE_QUE(self)))) {
429
        rb_thread_wakeup_alive(t);
430
    }
431
    return vmax;
432
}
433

    
434
/*
435
 * Document-method: push
436
 * call-seq: push(obj)
437
 *
438
 * Pushes +obj+ to the queue.  If there is no space left in the queue, waits
439
 * until space becomes available.
440
 */
441

    
442
static VALUE
443
rb_szqueue_push(VALUE self, VALUE obj)
444
{
445
    struct waiting_delete args;
446
    args.waiting = GET_QUEUE_WAITERS(self);
447
    args.th      = rb_thread_current();
448

    
449
    while (queue_length(self) >= GET_SZQUEUE_ULONGMAX(self)) {
450
        rb_ary_push(args.waiting, args.th);
451
        rb_ensure((VALUE (*)())rb_thread_sleep_forever, (VALUE)0, queue_delete_from_waiting, (VALUE)&args);
452
    }
453
    return queue_do_push(self, obj);
454
}
455

    
456
static VALUE
457
szqueue_do_pop(VALUE self, VALUE should_block)
458
{
459
    VALUE retval = queue_do_pop(self, should_block);
460

    
461
    if (queue_length(self) < GET_SZQUEUE_ULONGMAX(self)) {
462
        wakeup_first_thread(GET_SZQUEUE_WAITERS(self));
463
    }
464

    
465
    return retval;
466
}
467

    
468
/*
469
 * Document-method: pop
470
 * call_seq: pop(non_block=false)
471
 *
472
 * Returns the number of threads waiting on the queue.
473
 */
474

    
475
static VALUE
476
rb_szqueue_pop(int argc, VALUE *argv, VALUE self)
477
{
478
    VALUE should_block = queue_pop_should_block(argc, argv);
479
    return szqueue_do_pop(self, should_block);
480
}
481

    
482
/*
483
 * Document-method: pop
484
 * call_seq: pop(non_block=false)
485
 *
486
 * Returns the number of threads waiting on the queue.
487
 */
488

    
489
static VALUE
490
rb_szqueue_num_waiting(VALUE self)
491
{
492
    long len = queue_num_waiting(self);
493
    len += RARRAY_LEN(GET_SZQUEUE_WAITERS(self));
494
    return ULONG2NUM(len);
495
}
496

    
497
#ifndef UNDER_THREAD
498
#define UNDER_THREAD 1
499
#endif
500

    
501
void
502
Init_thread(void)
503
{
504
#if UNDER_THREAD
505
#define ALIAS_GLOBCAL_CONST(name) do {                      \
506
        ID id = rb_intern_const(#name);                      \
507
        if (!rb_const_defined_at(rb_cObject, id)) {   \
508
            rb_const_set(rb_cObject, id, rb_c##name); \
509
        }                                             \
510
    } while (0)
511
#else
512
#define ALIAS_GLOBCAL_CONST(name) do { /* nothing */ } while (0)
513
#endif
514

    
515
    VALUE rb_cConditionVariable = rb_struct_define_without_accessor_under(
516
        UNDER_THREAD ? rb_cThread : 0,
517
        "ConditionVariable", rb_cObject, rb_struct_alloc_noinit,
518
        "waiters", NULL);
519
    VALUE rb_cQueue = rb_struct_define_without_accessor_under(
520
        UNDER_THREAD ? rb_cThread : 0,
521
        "Queue", rb_cObject, rb_struct_alloc_noinit,
522
        "que", "waiters", NULL);
523
    VALUE rb_cSizedQueue = rb_struct_define_without_accessor_under(
524
        UNDER_THREAD ? rb_cThread : 0,
525
        "SizedQueue", rb_cQueue, rb_struct_alloc_noinit,
526
        "que", "waiters", "queue_waiters", "size", NULL);
527

    
528
    rb_define_method(rb_cConditionVariable, "initialize", rb_condvar_initialize, 0);
529
    rb_define_method(rb_cConditionVariable, "wait", rb_condvar_wait, -1);
530
    rb_define_method(rb_cConditionVariable, "signal", rb_condvar_signal, 0);
531
    rb_define_method(rb_cConditionVariable, "broadcast", rb_condvar_broadcast, 0);
532

    
533
    rb_define_method(rb_cQueue, "initialize", rb_queue_initialize, 0);
534
    rb_define_method(rb_cQueue, "push", rb_queue_push, 1);
535
    rb_define_method(rb_cQueue, "pop", rb_queue_pop, -1);
536
    rb_define_method(rb_cQueue, "empty?", rb_queue_empty_p, 0);
537
    rb_define_method(rb_cQueue, "clear", rb_queue_clear, 0);
538
    rb_define_method(rb_cQueue, "length", rb_queue_length, 0);
539
    rb_define_method(rb_cQueue, "num_waiting", rb_queue_num_waiting, 0);
540

    
541
    rb_alias(rb_cQueue, rb_intern("enq"), rb_intern("push"));
542
    rb_alias(rb_cQueue, rb_intern("<<"), rb_intern("push"));
543
    rb_alias(rb_cQueue, rb_intern("deq"), rb_intern("pop"));
544
    rb_alias(rb_cQueue, rb_intern("shift"), rb_intern("pop"));
545
    rb_alias(rb_cQueue, rb_intern("size"), rb_intern("length"));
546

    
547
    rb_define_method(rb_cSizedQueue, "initialize", rb_szqueue_initialize, 1);
548
    rb_define_method(rb_cSizedQueue, "max", rb_szqueue_max_get, 0);
549
    rb_define_method(rb_cSizedQueue, "max=", rb_szqueue_max_set, 1);
550
    rb_define_method(rb_cSizedQueue, "push", rb_szqueue_push, 1);
551
    rb_define_method(rb_cSizedQueue, "pop", rb_szqueue_pop, -1);
552
    rb_define_method(rb_cSizedQueue, "num_waiting", rb_szqueue_num_waiting, 0);
553
    rb_alias(rb_cSizedQueue, rb_intern("enq"), rb_intern("push"));
554
    rb_alias(rb_cSizedQueue, rb_intern("<<"), rb_intern("push"));
555
    rb_alias(rb_cSizedQueue, rb_intern("deq"), rb_intern("pop"));
556
    rb_alias(rb_cSizedQueue, rb_intern("shift"), rb_intern("pop"));
557

    
558
    rb_provide("thread.rb");
559
    ALIAS_GLOBCAL_CONST(ConditionVariable);
560
    ALIAS_GLOBCAL_CONST(Queue);
561
    ALIAS_GLOBCAL_CONST(SizedQueue);
562
}