diff --git a/ext/thread/thread.c b/ext/thread/thread.c index b8656a1..d7798cd 100644 --- a/ext/thread/thread.c +++ b/ext/thread/thread.c @@ -7,8 +7,9 @@ enum { enum { QUEUE_QUE = 0, QUEUE_WAITERS = 1, - SZQUEUE_WAITERS = 2, - SZQUEUE_MAX = 3 + QUEUE_CLOSED = 2, + SZQUEUE_WAITERS = 3, + SZQUEUE_MAX = 4 }; #define GET_CONDVAR_WAITERS(cv) get_array((cv), CONDVAR_WAITERS) @@ -213,10 +214,20 @@ rb_queue_initialize(VALUE self) { RSTRUCT_SET(self, QUEUE_QUE, ary_buf_new()); RSTRUCT_SET(self, QUEUE_WAITERS, ary_buf_new()); + RSTRUCT_SET(self, QUEUE_CLOSED, Qfalse); return self; } static VALUE +queue_raise_if_closed(VALUE self) +{ + if (RSTRUCT_GET(self, QUEUE_CLOSED) == Qtrue ) { + rb_raise(rb_eThreadError, "queue closed"); + } + return Qnil; +} + +static VALUE queue_do_push(VALUE self, VALUE obj) { rb_ary_push(GET_QUEUE_QUE(self), obj); @@ -237,6 +248,7 @@ queue_do_push(VALUE self, VALUE obj) static VALUE rb_queue_push(VALUE self, VALUE obj) { + queue_raise_if_closed(self); return queue_do_push(self, obj); } @@ -254,6 +266,33 @@ queue_num_waiting(VALUE self) return RARRAY_LEN(waiters); } +/* + * Document-method: Queue#close + * call-seq: close + * + * Closes the queue. + * + * Once the queue is closed, enq will raise an exception, although remaining + * items in the queue can be deq'd as usual. Any threads waiting on + * pop(non_block=false) at the time the queue is closed will be woken and + * given a nil. + * + * Once a closed queue is empty, deq with non_block=false will always return + * nil. + * + * A closed queue cannot be reopened. + */ + +static VALUE +rb_queue_close(VALUE self) +{ + RSTRUCT_SET(self, QUEUE_CLOSED, Qtrue); + if (queue_num_waiting(self) > 0) { + wakeup_all_threads(GET_QUEUE_WAITERS(self)); + } + return self; +} + struct waiting_delete { VALUE waiting; VALUE th; @@ -284,6 +323,11 @@ queue_do_pop(VALUE self, int should_block) if (!should_block) { rb_raise(rb_eThreadError, "queue empty"); } + + if (RSTRUCT_GET(self,QUEUE_CLOSED) == Qtrue) { + return Qnil; + } + rb_ary_push(args.waiting, args.th); rb_ensure(queue_sleep, (VALUE)0, queue_delete_from_waiting, (VALUE)&args); } @@ -324,6 +368,19 @@ rb_queue_pop(int argc, VALUE *argv, VALUE self) } /* + * Document-method: Queue#closed? + * call-seq: closed? + * + * Returns +true+ if the queue is closed. + */ + +static VALUE +rb_queue_closed_p(VALUE self) +{ + return RSTRUCT_GET(self, QUEUE_CLOSED); +} + +/* * Document-method: Queue#empty? * call-seq: empty? * @@ -406,6 +463,7 @@ rb_szqueue_initialize(VALUE self, VALUE vmax) RSTRUCT_SET(self, QUEUE_QUE, ary_buf_new()); RSTRUCT_SET(self, QUEUE_WAITERS, ary_buf_new()); + RSTRUCT_SET(self, QUEUE_CLOSED, Qfalse); RSTRUCT_SET(self, SZQUEUE_WAITERS, ary_buf_new()); RSTRUCT_SET(self, SZQUEUE_MAX, vmax); @@ -480,6 +538,9 @@ rb_szqueue_push(int argc, VALUE *argv, VALUE self) { struct waiting_delete args; int should_block = szqueue_push_should_block(argc, argv); + + queue_raise_if_closed(self); + args.waiting = GET_SZQUEUE_WAITERS(self); args.th = rb_thread_current(); @@ -590,11 +651,11 @@ Init_thread(void) VALUE rb_cQueue = rb_struct_define_without_accessor_under( OUTER, "Queue", rb_cObject, rb_struct_alloc_noinit, - "que", "waiters", NULL); + "que", "waiters", "closed", NULL); VALUE rb_cSizedQueue = rb_struct_define_without_accessor_under( OUTER, "SizedQueue", rb_cQueue, rb_struct_alloc_noinit, - "que", "waiters", "queue_waiters", "size", NULL); + "que", "waiters", "closed", "queue_waiters", "size", NULL); #if 0 rb_cConditionVariable = rb_define_class("ConditionVariable", rb_cObject); /* teach rdoc ConditionVariable */ @@ -620,6 +681,8 @@ Init_thread(void) rb_define_method(rb_cQueue, "clear", rb_queue_clear, 0); rb_define_method(rb_cQueue, "length", rb_queue_length, 0); rb_define_method(rb_cQueue, "num_waiting", rb_queue_num_waiting, 0); + rb_define_method(rb_cQueue, "close", rb_queue_close, 0); + rb_define_method(rb_cQueue, "closed?", rb_queue_closed_p, 0); /* Alias for #push. */ rb_define_alias(rb_cQueue, "enq", "push"); diff --git a/test/thread/test_queue.rb b/test/thread/test_queue.rb index 2bd71db..580cdce 100644 --- a/test/thread/test_queue.rb +++ b/test/thread/test_queue.rb @@ -277,4 +277,30 @@ class TestQueue < Test::Unit::TestCase Marshal.dump(q) end end + + def test_close + [->{Queue.new}, ->{SizedQueue.new 3}].each do |qcreate| + q = qcreate[] + assert_equal false, q.closed? + q << :something + q.close + assert_equal true, q.closed? + assert_raise_with_message(ThreadError, /closed/){q << :nothing} + assert_equal q.pop, :something + assert_equal q.pop, nil + end + end + + def test_close_wakeup + [->{Queue.new}, ->{SizedQueue.new 3}].each do |qcreate| + q = qcreate[] + threads = [] + 4.times{threads << Thread.new{q.pop}} + 3.times{|i| q << i} + sleep 0.01 + assert_equal 1, threads.count{|thr| thr.alive? && thr.stop?} + q.close + assert_equal 0, threads.count{|thr| thr.alive? && thr.stop?} + end + end end