Project

General

Profile

Feature #13552 ยป 0002-thread_sync.c-rewrite-the-rest-using-using-ccan-list.patch

normalperson (Eric Wong), 05/10/2017 12:17 AM

View differences:

thread_sync.c
4 4
static VALUE rb_cMutex, rb_cQueue, rb_cSizedQueue, rb_cConditionVariable;
5 5
static VALUE rb_eClosedQueueError;
6 6

  
7
/* Mutex */
8

  
9 7
/* sync_waiter is always on-stack */
10 8
struct sync_waiter {
11 9
    rb_thread_t *th;
......
14 12

  
15 13
#define MUTEX_ALLOW_TRAP FL_USER1
16 14

  
15
static int
16
wakeup_one(struct list_head *head)
17
{
18
    struct sync_waiter *cur = 0, *next = 0;
19

  
20
    list_for_each_safe(head, cur, next, node) {
21
	list_del_init(&cur->node);
22
	if (cur->th->status != THREAD_KILLED) {
23
	    rb_threadptr_interrupt(cur->th);
24
	    cur->th->status = THREAD_RUNNABLE;
25
	    return TRUE;
26
	}
27
    }
28
    return FALSE;
29
}
30

  
31
static void
32
wakeup_all(struct list_head *head)
33
{
34
    struct sync_waiter *cur = 0, *next = 0;
35

  
36
    list_for_each_safe(head, cur, next, node) {
37
	list_del_init(&cur->node);
38
	if (cur->th->status != THREAD_KILLED) {
39
	    rb_threadptr_interrupt(cur->th);
40
	    cur->th->status = THREAD_RUNNABLE;
41
	}
42
    }
43
}
44

  
45
/* Mutex */
46

  
17 47
typedef struct rb_mutex_struct {
18 48
    struct rb_thread_struct volatile *th;
19 49
    struct rb_mutex_struct *next_mutex;
......
490 520

  
491 521
/* Queue */
492 522

  
493
enum {
494
    QUEUE_QUE,
495
    QUEUE_WAITERS,
496
    SZQUEUE_WAITERS,
497
    SZQUEUE_MAX,
498
    END_QUEUE
499
};
523
PACKED_STRUCT_UNALIGNED(struct rb_queue {
524
    struct list_head waitq;
525
    VALUE que;
526
    int num_waiting;
527
});
500 528

  
501
#define QUEUE_CLOSED          FL_USER5
529
PACKED_STRUCT_UNALIGNED(struct rb_szqueue {
530
    struct rb_queue q;
531
    int num_waiting_push;
532
    struct list_head pushq;
533
    long max;
534
});
502 535

  
503
#define GET_QUEUE_QUE(q)        get_array((q), QUEUE_QUE)
504
#define GET_QUEUE_WAITERS(q)    get_array((q), QUEUE_WAITERS)
505
#define GET_SZQUEUE_WAITERS(q)  get_array((q), SZQUEUE_WAITERS)
506
#define GET_SZQUEUE_MAX(q)      RSTRUCT_GET((q), SZQUEUE_MAX)
507
#define GET_SZQUEUE_ULONGMAX(q) NUM2ULONG(GET_SZQUEUE_MAX(q))
536
static void
537
queue_mark(void *ptr)
538
{
539
    struct rb_queue *q = ptr;
508 540

  
509
static VALUE
510
ary_buf_new(void)
541
    /* no need to mark threads in waitq, they are on stack */
542
    rb_gc_mark(q->que);
543
}
544

  
545
static size_t
546
queue_memsize(const void *ptr)
511 547
{
512
    return rb_ary_tmp_new(1);
548
    return sizeof(struct rb_queue);
513 549
}
514 550

  
551
static const rb_data_type_t queue_data_type = {
552
    "queue",
553
    {queue_mark, RUBY_TYPED_DEFAULT_FREE, queue_memsize,},
554
    0, 0, RUBY_TYPED_FREE_IMMEDIATELY
555
};
556

  
515 557
static VALUE
516
get_array(VALUE obj, int idx)
558
queue_alloc(VALUE klass)
517 559
{
518
    VALUE ary = RSTRUCT_GET(obj, idx);
519
    if (!RB_TYPE_P(ary, T_ARRAY)) {
520
	rb_raise(rb_eTypeError, "%+"PRIsVALUE" not initialized", obj);
521
    }
522
    return ary;
560
    VALUE obj;
561
    struct rb_queue *q;
562

  
563
    obj = TypedData_Make_Struct(klass, struct rb_queue, &queue_data_type, q);
564
    list_head_init(&q->waitq);
565
    return obj;
523 566
}
524 567

  
525
static void
526
wakeup_first_thread(VALUE list)
568
static struct rb_queue *
569
queue_ptr(VALUE obj)
527 570
{
528
    VALUE thread;
571
    struct rb_queue *q;
529 572

  
530
    while (!NIL_P(thread = rb_ary_shift(list))) {
531
	if (RTEST(rb_thread_wakeup_alive(thread))) break;
532
    }
573
    TypedData_Get_Struct(obj, struct rb_queue, &queue_data_type, q);
574
    return q;
533 575
}
534 576

  
577
#define QUEUE_CLOSED          FL_USER5
578

  
535 579
static void
536
wakeup_all_threads(VALUE list)
580
szqueue_mark(void *ptr)
537 581
{
538
    VALUE thread;
539
    long i;
582
    struct rb_szqueue *sq = ptr;
540 583

  
541
    for (i=0; i<RARRAY_LEN(list); i++) {
542
	thread = RARRAY_AREF(list, i);
543
	rb_thread_wakeup_alive(thread);
544
    }
545
    rb_ary_clear(list);
584
    queue_mark(&sq->q);
585
}
586

  
587
static size_t
588
szqueue_memsize(const void *ptr)
589
{
590
    return sizeof(struct rb_szqueue);
546 591
}
547 592

  
548
static unsigned long
549
queue_length(VALUE self)
593
static const rb_data_type_t szqueue_data_type = {
594
    "sized_queue",
595
    {szqueue_mark, RUBY_TYPED_DEFAULT_FREE, szqueue_memsize,},
596
    0, 0, RUBY_TYPED_FREE_IMMEDIATELY
597
};
598

  
599
static VALUE
600
szqueue_alloc(VALUE klass)
550 601
{
551
    VALUE que = GET_QUEUE_QUE(self);
552
    return RARRAY_LEN(que);
602
    struct rb_szqueue *sq;
603
    VALUE obj = TypedData_Make_Struct(klass, struct rb_szqueue,
604
					&szqueue_data_type, sq);
605
    list_head_init(&sq->q.waitq);
606
    list_head_init(&sq->pushq);
607
    return obj;
553 608
}
554 609

  
555
static unsigned long
556
queue_num_waiting(VALUE self)
610
static struct rb_szqueue *
611
szqueue_ptr(VALUE obj)
557 612
{
558
    VALUE waiters = GET_QUEUE_WAITERS(self);
559
    return RARRAY_LEN(waiters);
613
    struct rb_szqueue *sq;
614

  
615
    TypedData_Get_Struct(obj, struct rb_szqueue, &szqueue_data_type, sq);
616
    return sq;
560 617
}
561 618

  
562
static unsigned long
563
szqueue_num_waiting_producer(VALUE self)
619
static VALUE
620
ary_buf_new(void)
564 621
{
565
    VALUE waiters = GET_SZQUEUE_WAITERS(self);
566
    return RARRAY_LEN(waiters);
622
    return rb_ary_tmp_new(1);
623
}
624

  
625
static VALUE
626
check_array(VALUE obj, VALUE ary)
627
{
628
    if (!RB_TYPE_P(ary, T_ARRAY)) {
629
	rb_raise(rb_eTypeError, "%+"PRIsVALUE" not initialized", obj);
630
    }
631
    return ary;
632
}
633

  
634
static long
635
queue_length(VALUE self, struct rb_queue *q)
636
{
637
    return RARRAY_LEN(check_array(self, q->que));
567 638
}
568 639

  
569 640
static int
......
579 650
}
580 651

  
581 652
static VALUE
582
queue_closed_result(VALUE self)
653
queue_closed_result(VALUE self, struct rb_queue *q)
583 654
{
584
    assert(queue_length(self) == 0);
655
    assert(queue_length(self, q) == 0);
585 656
    return Qnil;
586 657
}
587 658

  
588
static VALUE
589
queue_do_close(VALUE self, int is_szq)
590
{
591
    if (!queue_closed_p(self)) {
592
	FL_SET(self, QUEUE_CLOSED);
593

  
594
	if (queue_num_waiting(self) > 0) {
595
	    VALUE waiters = GET_QUEUE_WAITERS(self);
596
	    wakeup_all_threads(waiters);
597
	}
598

  
599
	if (is_szq && szqueue_num_waiting_producer(self) > 0) {
600
	    VALUE waiters = GET_SZQUEUE_WAITERS(self);
601
	    wakeup_all_threads(waiters);
602
	}
603
    }
604

  
605
    return self;
606
}
607

  
608 659
/*
609 660
 *  Document-class: Queue
610 661
 *
......
648 699
static VALUE
649 700
rb_queue_initialize(VALUE self)
650 701
{
651
    RSTRUCT_SET(self, QUEUE_QUE, ary_buf_new());
652
    RSTRUCT_SET(self, QUEUE_WAITERS, ary_buf_new());
702
    struct rb_queue *q = queue_ptr(self);
703
    q->que = ary_buf_new();
704
    list_head_init(&q->waitq);
653 705
    return self;
654 706
}
655 707

  
656 708
static VALUE
657
queue_do_push(VALUE self, VALUE obj)
709
queue_do_push(VALUE self, struct rb_queue *q, VALUE obj)
658 710
{
659 711
    if (queue_closed_p(self)) {
660 712
	raise_closed_queue_error(self);
661 713
    }
662
    rb_ary_push(GET_QUEUE_QUE(self), obj);
663
    wakeup_first_thread(GET_QUEUE_WAITERS(self));
714
    rb_ary_push(check_array(self, q->que), obj);
715
    wakeup_one(&q->waitq);
664 716
    return self;
665 717
}
666 718

  
......
698 750
static VALUE
699 751
rb_queue_close(VALUE self)
700 752
{
701
    return queue_do_close(self, FALSE);
753
    struct rb_queue *q = queue_ptr(self);
754

  
755
    if (!queue_closed_p(self)) {
756
	FL_SET(self, QUEUE_CLOSED);
757

  
758
	wakeup_all(&q->waitq);
759
    }
760

  
761
    return self;
702 762
}
703 763

  
704 764
/*
......
727 787
static VALUE
728 788
rb_queue_push(VALUE self, VALUE obj)
729 789
{
730
    return queue_do_push(self, obj);
790
    return queue_do_push(self, queue_ptr(self), obj);
791
}
792

  
793
static VALUE
794
queue_sleep(VALUE arg)
795
{
796
    rb_thread_sleep_deadly_allow_spurious_wakeup();
797
    return Qnil;
731 798
}
732 799

  
733
struct waiting_delete {
734
    VALUE waiting;
735
    VALUE th;
800
struct queue_waiter {
801
    struct sync_waiter w;
802
    union {
803
	struct rb_queue *q;
804
	struct rb_szqueue *sq;
805
    } as;
736 806
};
737 807

  
738 808
static VALUE
739
queue_delete_from_waiting(struct waiting_delete *p)
809
queue_sleep_done(VALUE p)
740 810
{
741
    rb_ary_delete(p->waiting, p->th);
742
    return Qnil;
811
    struct queue_waiter *qw = (struct queue_waiter *)p;
812

  
813
    list_del(&qw->w.node);
814
    qw->as.q->num_waiting--;
815

  
816
    return Qfalse;
743 817
}
744 818

  
745 819
static VALUE
746
queue_sleep(VALUE arg)
820
szqueue_sleep_done(VALUE p)
747 821
{
748
    rb_thread_sleep_deadly_allow_spurious_wakeup();
749
    return Qnil;
822
    struct queue_waiter *qw = (struct queue_waiter *)p;
823

  
824
    list_del(&qw->w.node);
825
    qw->as.sq->num_waiting_push--;
826

  
827
    return Qfalse;
750 828
}
751 829

  
752 830
static VALUE
753
queue_do_pop(VALUE self, int should_block)
831
queue_do_pop(VALUE self, struct rb_queue *q, int should_block)
754 832
{
755
    struct waiting_delete args;
756
    args.waiting = GET_QUEUE_WAITERS(self);
757
    args.th	 = rb_thread_current();
833
    check_array(self, q->que);
758 834

  
759
    while (queue_length(self) == 0) {
835
    while (RARRAY_LEN(q->que) == 0) {
760 836
	if (!should_block) {
761 837
	    rb_raise(rb_eThreadError, "queue empty");
762 838
	}
763 839
	else if (queue_closed_p(self)) {
764
	    return queue_closed_result(self);
840
	    return queue_closed_result(self, q);
765 841
	}
766 842
	else {
767
	    assert(queue_length(self) == 0);
843
	    struct queue_waiter qw;
844

  
845
	    assert(RARRAY_LEN(q->que) == 0);
768 846
	    assert(queue_closed_p(self) == 0);
769 847

  
770
	    rb_ary_push(args.waiting, args.th);
771
	    rb_ensure(queue_sleep, Qfalse, queue_delete_from_waiting, (VALUE)&args);
848
	    qw.w.th = GET_THREAD();
849
	    qw.as.q = q;
850
	    list_add_tail(&qw.as.q->waitq, &qw.w.node);
851
	    qw.as.q->num_waiting++;
852

  
853
	    rb_ensure(queue_sleep, Qfalse, queue_sleep_done, (VALUE)&qw);
772 854
	}
773 855
    }
774 856

  
775
    return rb_ary_shift(GET_QUEUE_QUE(self));
857
    return rb_ary_shift(q->que);
776 858
}
777 859

  
778 860
static int
......
804 886
rb_queue_pop(int argc, VALUE *argv, VALUE self)
805 887
{
806 888
    int should_block = queue_pop_should_block(argc, argv);
807
    return queue_do_pop(self, should_block);
889
    return queue_do_pop(self, queue_ptr(self), should_block);
808 890
}
809 891

  
810 892
/*
......
817 899
static VALUE
818 900
rb_queue_empty_p(VALUE self)
819 901
{
820
    return queue_length(self) == 0 ? Qtrue : Qfalse;
902
    return queue_length(self, queue_ptr(self)) == 0 ? Qtrue : Qfalse;
821 903
}
822 904

  
823 905
/*
......
829 911
static VALUE
830 912
rb_queue_clear(VALUE self)
831 913
{
832
    rb_ary_clear(GET_QUEUE_QUE(self));
914
    struct rb_queue *q = queue_ptr(self);
915

  
916
    rb_ary_clear(check_array(self, q->que));
833 917
    return self;
834 918
}
835 919

  
......
845 929
static VALUE
846 930
rb_queue_length(VALUE self)
847 931
{
848
    unsigned long len = queue_length(self);
849
    return ULONG2NUM(len);
932
    return LONG2NUM(queue_length(self, queue_ptr(self)));
850 933
}
851 934

  
852 935
/*
......
858 941
static VALUE
859 942
rb_queue_num_waiting(VALUE self)
860 943
{
861
    unsigned long len = queue_num_waiting(self);
862
    return ULONG2NUM(len);
944
    struct rb_queue *q = queue_ptr(self);
945

  
946
    return INT2NUM(q->num_waiting);
863 947
}
864 948

  
865 949
/*
......
882 966
rb_szqueue_initialize(VALUE self, VALUE vmax)
883 967
{
884 968
    long max;
969
    struct rb_szqueue *sq = szqueue_ptr(self);
885 970

  
886 971
    max = NUM2LONG(vmax);
887 972
    if (max <= 0) {
888 973
	rb_raise(rb_eArgError, "queue size must be positive");
889 974
    }
890 975

  
891
    RSTRUCT_SET(self, QUEUE_QUE, ary_buf_new());
892
    RSTRUCT_SET(self, QUEUE_WAITERS, ary_buf_new());
893
    RSTRUCT_SET(self, SZQUEUE_WAITERS, ary_buf_new());
894
    RSTRUCT_SET(self, SZQUEUE_MAX, vmax);
976
    sq->q.que = ary_buf_new();
977
    list_head_init(&sq->q.waitq);
978
    list_head_init(&sq->pushq);
979
    sq->max = max;
895 980

  
896 981
    return self;
897 982
}
......
911 996
static VALUE
912 997
rb_szqueue_close(VALUE self)
913 998
{
914
    return queue_do_close(self, TRUE);
999
    if (!queue_closed_p(self)) {
1000
	struct rb_szqueue *sq = szqueue_ptr(self);
1001

  
1002
	FL_SET(self, QUEUE_CLOSED);
1003
	wakeup_all(&sq->q.waitq);
1004
	wakeup_all(&sq->pushq);
1005
    }
1006
    return self;
915 1007
}
916 1008

  
917 1009
/*
......
923 1015
static VALUE
924 1016
rb_szqueue_max_get(VALUE self)
925 1017
{
926
    return GET_SZQUEUE_MAX(self);
1018
    return LONG2NUM(szqueue_ptr(self)->max);
927 1019
}
928 1020

  
929 1021
/*
......
936 1028
static VALUE
937 1029
rb_szqueue_max_set(VALUE self, VALUE vmax)
938 1030
{
939
    long max = NUM2LONG(vmax), diff = 0;
940
    VALUE t;
1031
    long max = NUM2LONG(vmax);
1032
    long diff = 0;
1033
    struct rb_szqueue *sq = szqueue_ptr(self);
941 1034

  
942 1035
    if (max <= 0) {
943 1036
	rb_raise(rb_eArgError, "queue size must be positive");
944 1037
    }
945
    if ((unsigned long)max > GET_SZQUEUE_ULONGMAX(self)) {
946
	diff = max - GET_SZQUEUE_ULONGMAX(self);
1038
    if (max > sq->max) {
1039
	diff = max - sq->max;
947 1040
    }
948
    RSTRUCT_SET(self, SZQUEUE_MAX, vmax);
949
    while (diff-- > 0 && !NIL_P(t = rb_ary_shift(GET_SZQUEUE_WAITERS(self)))) {
950
	rb_thread_wakeup_alive(t);
1041
    sq->max = max;
1042
    while (diff-- > 0 && wakeup_one(&sq->pushq)) {
1043
	/* keep waking more up */
951 1044
    }
952 1045
    return vmax;
953 1046
}
......
980 1073
static VALUE
981 1074
rb_szqueue_push(int argc, VALUE *argv, VALUE self)
982 1075
{
983
    struct waiting_delete args;
1076
    struct rb_szqueue *sq = szqueue_ptr(self);
984 1077
    int should_block = szqueue_push_should_block(argc, argv);
985
    args.waiting = GET_SZQUEUE_WAITERS(self);
986
    args.th      = rb_thread_current();
987 1078

  
988
    while (queue_length(self) >= GET_SZQUEUE_ULONGMAX(self)) {
1079
    while (queue_length(self, &sq->q) >= sq->max) {
989 1080
	if (!should_block) {
990 1081
	    rb_raise(rb_eThreadError, "queue full");
991 1082
	}
......
993 1084
	    goto closed;
994 1085
	}
995 1086
	else {
996
	    rb_ary_push(args.waiting, args.th);
997
	    rb_ensure(queue_sleep, Qfalse, queue_delete_from_waiting, (VALUE)&args);
1087
	    struct queue_waiter qw;
1088

  
1089
	    qw.w.th = GET_THREAD();
1090
	    qw.as.sq = sq;
1091
	    list_add_tail(&sq->pushq, &qw.w.node);
1092
	    sq->num_waiting_push++;
1093

  
1094
	    rb_ensure(queue_sleep, Qfalse, szqueue_sleep_done, (VALUE)&qw);
998 1095
	}
999 1096
    }
1000 1097

  
......
1003 1100
	raise_closed_queue_error(self);
1004 1101
    }
1005 1102

  
1006
    return queue_do_push(self, argv[0]);
1103
    return queue_do_push(self, &sq->q, argv[0]);
1007 1104
}
1008 1105

  
1009 1106
static VALUE
1010 1107
szqueue_do_pop(VALUE self, int should_block)
1011 1108
{
1012
    VALUE retval = queue_do_pop(self, should_block);
1109
    struct rb_szqueue *sq = szqueue_ptr(self);
1110
    VALUE retval = queue_do_pop(self, &sq->q, should_block);
1013 1111

  
1014
    if (queue_length(self) < GET_SZQUEUE_ULONGMAX(self)) {
1015
	wakeup_first_thread(GET_SZQUEUE_WAITERS(self));
1112
    if (queue_length(self, &sq->q) < sq->max) {
1113
	wakeup_one(&sq->pushq);
1016 1114
    }
1017 1115

  
1018 1116
    return retval;
......
1048 1146
static VALUE
1049 1147
rb_szqueue_clear(VALUE self)
1050 1148
{
1051
    rb_ary_clear(GET_QUEUE_QUE(self));
1052
    wakeup_all_threads(GET_SZQUEUE_WAITERS(self));
1149
    struct rb_szqueue *sq = szqueue_ptr(self);
1150

  
1151
    rb_ary_clear(check_array(self, sq->q.que));
1152
    wakeup_all(&sq->pushq);
1053 1153
    return self;
1054 1154
}
1055 1155

  
1156
static VALUE
1157
rb_szqueue_length(VALUE self)
1158
{
1159
    struct rb_szqueue *sq = szqueue_ptr(self);
1160

  
1161
    return LONG2NUM(queue_length(self, &sq->q));
1162
}
1163

  
1056 1164
/*
1057 1165
 * Document-method: SizedQueue#num_waiting
1058 1166
 *
......
1062 1170
static VALUE
1063 1171
rb_szqueue_num_waiting(VALUE self)
1064 1172
{
1065
    long len = queue_num_waiting(self) + szqueue_num_waiting_producer(self);
1066
    return ULONG2NUM(len);
1173
    struct rb_szqueue *sq = szqueue_ptr(self);
1174

  
1175
    return INT2NUM(sq->q.num_waiting + sq->num_waiting_push);
1067 1176
}
1068 1177

  
1069
/* ConditionalVariable */
1178
/*
1179
 * Document-method: SizedQueue#empty?
1180
 * call-seq: empty?
1181
 *
1182
 * Returns +true+ if the queue is empty.
1183
 */
1184

  
1185
static VALUE
1186
rb_szqueue_empty_p(VALUE self)
1187
{
1188
    struct rb_szqueue *sq = szqueue_ptr(self);
1189

  
1190
    return queue_length(self, &sq->q) == 0 ? Qtrue : Qfalse;
1191
}
1070 1192

  
1071
enum {
1072
    CONDVAR_WAITERS,
1073
    END_CONDVAR
1074
};
1075 1193

  
1076
#define GET_CONDVAR_WAITERS(cv) get_array((cv), CONDVAR_WAITERS)
1194
/* ConditionalVariable */
1195
/* TODO: maybe this can be IMEMO */
1196
struct rb_condvar {
1197
    struct list_head waitq;
1198
};
1077 1199

  
1078 1200
/*
1079 1201
 *  Document-class: ConditionVariable
......
1105 1227
 *    }
1106 1228
 */
1107 1229

  
1230
static size_t
1231
condvar_memsize(const void *ptr)
1232
{
1233
    return sizeof(struct rb_condvar);
1234
}
1235

  
1236
static const rb_data_type_t cv_data_type = {
1237
    "condvar",
1238
    {0, RUBY_TYPED_DEFAULT_FREE, condvar_memsize,},
1239
    0, 0, RUBY_TYPED_FREE_IMMEDIATELY
1240
};
1241

  
1242
static struct rb_condvar *
1243
condvar_ptr(VALUE self)
1244
{
1245
    struct rb_condvar *cv;
1246

  
1247
    TypedData_Get_Struct(self, struct rb_condvar, &cv_data_type, cv);
1248

  
1249
    return cv;
1250
}
1251

  
1252
static VALUE
1253
condvar_alloc(VALUE klass)
1254
{
1255
    struct rb_condvar *cv;
1256
    VALUE obj;
1257

  
1258
    obj = TypedData_Make_Struct(klass, struct rb_condvar, &cv_data_type, cv);
1259
    list_head_init(&cv->waitq);
1260

  
1261
    return obj;
1262
}
1263

  
1108 1264
/*
1109 1265
 * Document-method: ConditionVariable::new
1110 1266
 *
......
1114 1270
static VALUE
1115 1271
rb_condvar_initialize(VALUE self)
1116 1272
{
1117
    RSTRUCT_SET(self, CONDVAR_WAITERS, ary_buf_new());
1273
    struct rb_condvar *cv = condvar_ptr(self);;
1274
    list_head_init(&cv->waitq);
1118 1275
    return self;
1119 1276
}
1120 1277

  
......
1133 1290
}
1134 1291

  
1135 1292
static VALUE
1136
delete_current_thread(VALUE ary)
1293
delete_from_waitq(struct sync_waiter *w)
1137 1294
{
1138
    return rb_ary_delete(ary, rb_thread_current());
1295
    list_del(&w->node);
1296

  
1297
    return Qnil;
1139 1298
}
1140 1299

  
1141 1300
/*
......
1151 1310
static VALUE
1152 1311
rb_condvar_wait(int argc, VALUE *argv, VALUE self)
1153 1312
{
1154
    VALUE waiters = GET_CONDVAR_WAITERS(self);
1313
    struct rb_condvar *cv = condvar_ptr(self);
1155 1314
    VALUE mutex, timeout;
1156 1315
    struct sleep_call args;
1316
    struct sync_waiter w;
1157 1317

  
1158 1318
    rb_scan_args(argc, argv, "11", &mutex, &timeout);
1159 1319

  
1160 1320
    args.mutex   = mutex;
1161 1321
    args.timeout = timeout;
1162
    rb_ary_push(waiters, rb_thread_current());
1163
    rb_ensure(do_sleep, (VALUE)&args, delete_current_thread, waiters);
1322
    w.th = GET_THREAD();
1323
    list_add_tail(&cv->waitq, &w.node);
1324
    rb_ensure(do_sleep, (VALUE)&args, delete_from_waitq, (VALUE)&w);
1164 1325

  
1165 1326
    return self;
1166 1327
}
......
1174 1335
static VALUE
1175 1336
rb_condvar_signal(VALUE self)
1176 1337
{
1177
    wakeup_first_thread(GET_CONDVAR_WAITERS(self));
1338
    struct rb_condvar *cv = condvar_ptr(self);
1339
    wakeup_one(&cv->waitq);
1178 1340
    return self;
1179 1341
}
1180 1342

  
......
1187 1349
static VALUE
1188 1350
rb_condvar_broadcast(VALUE self)
1189 1351
{
1190
    wakeup_all_threads(GET_CONDVAR_WAITERS(self));
1352
    struct rb_condvar *cv = condvar_ptr(self);
1353
    wakeup_all(&cv->waitq);
1191 1354
    return self;
1192 1355
}
1193 1356

  
......
1227 1390
    rb_define_method(rb_cMutex, "owned?", rb_mutex_owned_p, 0);
1228 1391

  
1229 1392
    /* Queue */
1230
    rb_cQueue = rb_struct_define_without_accessor_under(
1231
	rb_cThread,
1232
	"Queue", rb_cObject, rb_struct_alloc_noinit,
1233
	"que", "waiters", NULL);
1393
    rb_cQueue = rb_define_class_under(rb_cThread, "Queue", rb_cObject);
1394
    rb_define_alloc_func(rb_cQueue, queue_alloc);
1234 1395

  
1235 1396
    rb_eClosedQueueError = rb_define_class("ClosedQueueError", rb_eStopIteration);
1236 1397

  
......
1252 1413
    rb_define_alias(rb_cQueue, "shift", "pop");
1253 1414
    rb_define_alias(rb_cQueue, "size", "length");
1254 1415

  
1255
    rb_cSizedQueue = rb_struct_define_without_accessor_under(
1256
	rb_cThread,
1257
	"SizedQueue", rb_cQueue, rb_struct_alloc_noinit,
1258
	"que", "waiters", "queue_waiters", "size", NULL);
1416
    rb_cSizedQueue = rb_define_class_under(rb_cThread, "SizedQueue", rb_cQueue);
1417
    rb_define_alloc_func(rb_cSizedQueue, szqueue_alloc);
1259 1418

  
1260 1419
    rb_define_method(rb_cSizedQueue, "initialize", rb_szqueue_initialize, 1);
1261 1420
    rb_define_method(rb_cSizedQueue, "close", rb_szqueue_close, 0);
......
1263 1422
    rb_define_method(rb_cSizedQueue, "max=", rb_szqueue_max_set, 1);
1264 1423
    rb_define_method(rb_cSizedQueue, "push", rb_szqueue_push, -1);
1265 1424
    rb_define_method(rb_cSizedQueue, "pop", rb_szqueue_pop, -1);
1425
    rb_define_method(rb_cSizedQueue, "empty?", rb_szqueue_empty_p, 0);
1266 1426
    rb_define_method(rb_cSizedQueue, "clear", rb_szqueue_clear, 0);
1427
    rb_define_method(rb_cSizedQueue, "length", rb_szqueue_length, 0);
1267 1428
    rb_define_method(rb_cSizedQueue, "num_waiting", rb_szqueue_num_waiting, 0);
1268 1429

  
1269 1430
    rb_define_alias(rb_cSizedQueue, "enq", "push");
1270 1431
    rb_define_alias(rb_cSizedQueue, "<<", "push");
1271 1432
    rb_define_alias(rb_cSizedQueue, "deq", "pop");
1272 1433
    rb_define_alias(rb_cSizedQueue, "shift", "pop");
1434
    rb_define_alias(rb_cSizedQueue, "size", "length");
1273 1435

  
1274 1436
    /* CVar */
1275
    rb_cConditionVariable = rb_struct_define_without_accessor_under(
1276
	rb_cThread,
1277
	"ConditionVariable", rb_cObject, rb_struct_alloc_noinit,
1278
	"waiters", NULL);
1437
    rb_cConditionVariable = rb_define_class_under(rb_cThread,
1438
					"ConditionVariable", rb_cObject);
1439
    rb_define_alloc_func(rb_cConditionVariable, condvar_alloc);
1279 1440

  
1280 1441
    id_sleep = rb_intern("sleep");
1281 1442

  
1282
-