diff --git a/ext/thread/thread.c b/ext/thread/thread.c index b8656a1..c26f028 100644 --- a/ext/thread/thread.c +++ b/ext/thread/thread.c @@ -7,18 +7,32 @@ enum { enum { QUEUE_QUE = 0, QUEUE_WAITERS = 1, - SZQUEUE_WAITERS = 2, - SZQUEUE_MAX = 3 + QUEUE_PENDING = 2, + QUEUE_TOKEN = 3, + SZQUEUE_WAITERS = 4, + SZQUEUE_MAX = 5 }; #define GET_CONDVAR_WAITERS(cv) get_array((cv), CONDVAR_WAITERS) #define GET_QUEUE_QUE(q) get_array((q), QUEUE_QUE) #define GET_QUEUE_WAITERS(q) get_array((q), QUEUE_WAITERS) +#define GET_QUEUE_TOKEN(q) get_array((q), QUEUE_TOKEN) #define GET_SZQUEUE_WAITERS(q) get_array((q), SZQUEUE_WAITERS) #define GET_SZQUEUE_MAX(q) RSTRUCT_GET((q), SZQUEUE_MAX) #define GET_SZQUEUE_ULONGMAX(q) NUM2ULONG(GET_SZQUEUE_MAX(q)) +/* + QUEUE_PENDING is the collection producer threads at the point where close + was called. Can't use SZQUEUE_WAITERS for this because + rb_ensure(rb_thread_sleep_deadly ...) is not atomic with respect to + add/remove of threads in SZQUEUE_WAITERS. +*/ +#define GET_QUEUE_PENDING(q) get_array((q), QUEUE_PENDING) + +/* Has the close method been called? */ +#define QUEUE_CLOSED_P(self) RARRAY_LEN(GET_QUEUE_TOKEN(self)) + static VALUE get_array(VALUE obj, int idx) { @@ -179,26 +193,91 @@ rb_condvar_broadcast(VALUE self) * * This class provides a way to synchronize communication between threads. * - * Example: * - * require 'thread' - * queue = Queue.new - * - * producer = Thread.new do - * 5.times do |i| - * sleep rand(i) # simulate expense - * queue << i - * puts "#{i} produced" - * end - * end - * - * consumer = Thread.new do - * 5.times do |i| - * value = queue.pop - * sleep rand(i/2) # simulate expense - * puts "consumed #{value}" - * end - * end + * Example using close(StopIteration): + * + * require 'thread' + * queue = Queue.new + * + * consumers = a_few.times.map do + * Thread.new do + * loop{ do_something_with queue.pop} + * end + * end + * + * producers = some.times.map do + * Thread.new do + * several.times{ q << something_interesting } + * end + * end + * producers.each(&:join) + * q.close StopIteration + * + * + * Example using nil to close queue: + * + * require 'thread' + * queue = Queue.new + * + * consumers = a_few.times.map do + * Thread.new do + * while item = queue.pop + * do_something_with item + * end + * end + * end + * + * producers = some.times.map do + * Thread.new do + * several.times{ q << something_interesting } + * end + * end + * producers.each(&:join) + * q.close + * + * + * Example using traditional consumer <-> producer coupling (1 producer, 1 consumer): + * + * require 'thread' + * queue = Queue.new + * + * producer = Thread.new do + * 5.times do |i| + * sleep rand(i) # simulate expense + * queue << i + * puts "#{i} produced" + * end + * end + * + * consumer = Thread.new do + * 5.times do |i| + * value = queue.pop + * sleep rand(i/2) # simulate expense + * puts "consumed #{value}" + * end + * end + * + * Example using queue poison token (1 producer, 1 consumer): + * + * require 'thread' + * queue = Queue.new + * + * producer = Thread.new do + * 5.times do |i| + * sleep rand(i) # simulate expense + * queue << i + * puts "#{i} produced" + * end + * q << :poison + * end + * + * consumer = Thread.new do + * while item = queue.pop + * break if item == :poison + * sleep rand(i/2) # simulate expense + * puts "consumed #{value}" + * end + * end * */ @@ -213,12 +292,25 @@ rb_queue_initialize(VALUE self) { RSTRUCT_SET(self, QUEUE_QUE, ary_buf_new()); RSTRUCT_SET(self, QUEUE_WAITERS, ary_buf_new()); + RSTRUCT_SET(self, QUEUE_PENDING, ary_buf_new()); + RSTRUCT_SET(self, QUEUE_TOKEN, ary_buf_new()); return self; } static VALUE +queue_raise_if_closed(VALUE self) +{ + if (QUEUE_CLOSED_P(self)) { + rb_raise(rb_eThreadError, "queue closed"); + } + return Qnil; +} + +static VALUE queue_do_push(VALUE self, VALUE obj) { + /* TODO would be nice to not have to use macro to check for correct + initialization on every single call to push. */ rb_ary_push(GET_QUEUE_QUE(self), obj); wakeup_first_thread(GET_QUEUE_WAITERS(self)); return self; @@ -237,6 +329,7 @@ queue_do_push(VALUE self, VALUE obj) static VALUE rb_queue_push(VALUE self, VALUE obj) { + queue_raise_if_closed(self); return queue_do_push(self, obj); } @@ -254,15 +347,140 @@ queue_num_waiting(VALUE self) return RARRAY_LEN(waiters); } +/* + Do close accounting. + + 1) Set CLOSE_TOKEN + + 2) copy pending producers to QUEUE_PENDING. This is to work around + SZQUEUE_WAITERS updating non-atomically in the thread + wake/sleep handling. + + CLOSE_TOKEN contains an array with 0 or 1 elements. This is to allow the + queue to be closed several times, provided the same value is used each + time, and having an array makes that test easier. +*/ +static VALUE +queue_set_close_state(int argc, VALUE *argv, VALUE self, VALUE pending_producers) +{ + VALUE close_token = Qnil; + VALUE existing_close_token = Qnil; + VALUE close_token_ary = GET_QUEUE_TOKEN(self); + + rb_check_arity(argc, 0, 1); + + /* handle arg defaults, and conversion to an Exception instance. */ + if (argc > 0) { + close_token = argv[0]; + if (rb_obj_is_kind_of(close_token,rb_cClass) && rb_class_inherited_p(close_token,rb_eException)) { + close_token = rb_exc_new2(close_token, "queue closed"); + } + } + + /* Allow close to be called several times, with the same argument (same-ness defined by ==). */ + if (RARRAY_LEN(close_token_ary) == 0) { + rb_ary_store(close_token_ary, 0, close_token); + /* Start accounting for pending producers. Can only do this once. */ + rb_ary_concat(GET_QUEUE_PENDING(self), pending_producers); + return close_token; + } else { + existing_close_token = RARRAY_AREF(close_token_ary, 0); + if (!rb_eql(existing_close_token, close_token)) { + rb_raise(rb_eThreadError, "already closed with %"PRIsVALUE, rb_inspect(existing_close_token)); + } + return existing_close_token; + } +} + +/* + * Document-method: Queue#close + * call-seq: + * close(token=nil) + * + * Closes the queue to producers. A closed queue cannot be re-opened. + * + * +token+ can be any object, or an instance of an Exception subclass, or an + * Exception subclass. +close+ can be called repeatedly on a single queue as + * long as the same (defined by ==) +token+ is used. + * + * After the call to close completes, the following are true: + * + * - +closed?+ will return true + * + * - calling enq/push/<< will raise ThreadError('queue closed') + * + * - when +empty?+ is false, calling deq/pop/shift will return an object from the queue as usual. + * + * - when +empty?+ is true, deq(non_block=false) will not suspend and + * will either return the +token+, or raise if +token+ was an exception + * instance or class. deq(non_block=true) will ignore the parameter and + * raise a ThreadError('queue empty'). + * + * And for SizedQueue, these will also be true: + * + * - each thread already suspended in enq at the time of the call + * to close will be allowed to push its object as usual. + * + * - +empty?+ will be false when there are either objects in the queue, or + * producers which were suspended at the time of the call to +close+ but whose + * objects are not yet in the queue. Therefore, it can be true (very + * briefly) that empty? == false && size == 0, since +size+ returns the number + * of objects actually in the queue. + */ + +static VALUE +rb_queue_close(int argc, VALUE *argv, VALUE self) +{ + /* Never any pending producers for ordinary queue, so pending is empty. */ + queue_set_close_state(argc, argv, self, ary_buf_new()); + if (queue_length(self) < queue_num_waiting(self)) { + wakeup_all_threads(GET_QUEUE_WAITERS(self)); + } + return self; +} + +static VALUE +rb_szqueue_close(int argc, VALUE *argv, VALUE self) +{ + queue_set_close_state(argc, argv, self, GET_SZQUEUE_WAITERS(self)); + if (RARRAY_LEN(GET_QUEUE_PENDING(self)) == 0) { + /* Wake up all consumers because there will be never be more items. */ + wakeup_all_threads(GET_QUEUE_WAITERS(self)); + } + return self; +} + struct waiting_delete { VALUE waiting; VALUE th; + VALUE pending; + VALUE self; }; static VALUE queue_delete_from_waiting(struct waiting_delete *p) { + /* The waiting queues */ + rb_ary_delete(p->waiting, p->th); + return Qnil; +} + +static ID id_status; + +static VALUE +szqueue_delete_from_waiting(struct waiting_delete *p) +{ + /* The waiting queues */ rb_ary_delete(p->waiting, p->th); + + /* Handle the post-close pending producers on thread abort. Only applies to SizedQueue. */ + if (QUEUE_CLOSED_P(p->self)) { + /* TODO This is not great, but all the better ways seem to involved rb_thread_t.status + And it won't get in the way of normal queue operation. */ + if (strcmp("aborting",RSTRING_PTR(rb_funcall(p->th,id_status,0))) == 0) { + rb_ary_delete(p->pending, p->th); + } + } return Qnil; } @@ -274,16 +492,34 @@ queue_sleep(VALUE arg) } static VALUE +queue_close_pop(VALUE self) +{ + VALUE token = RARRAY_AREF(GET_QUEUE_TOKEN(self),0); + if (rb_obj_is_kind_of(token,rb_eException)) { + rb_exc_raise(token); + } + return token; +} + +static VALUE queue_do_pop(VALUE self, int should_block) { struct waiting_delete args; - args.waiting = GET_QUEUE_WAITERS(self); - args.th = rb_thread_current(); while (queue_length(self) == 0) { if (!should_block) { rb_raise(rb_eThreadError, "queue empty"); } + + if (QUEUE_CLOSED_P(self) && RARRAY_LEN(GET_QUEUE_PENDING(self)) == 0) { + return queue_close_pop(self); + } + + args.waiting = GET_QUEUE_WAITERS(self); + args.th = rb_thread_current(); + args.pending = Qnil; + args.self = Qnil; + rb_ary_push(args.waiting, args.th); rb_ensure(queue_sleep, (VALUE)0, queue_delete_from_waiting, (VALUE)&args); } @@ -324,6 +560,19 @@ rb_queue_pop(int argc, VALUE *argv, VALUE self) } /* + * Document-method: Queue#closed? + * call-seq: closed? + * + * Returns +true+ if the queue is closed. + */ + +static VALUE +rb_queue_closed_p(VALUE self) +{ + return QUEUE_CLOSED_P(self) ? Qtrue : Qfalse; +} + +/* * Document-method: Queue#empty? * call-seq: empty? * @@ -333,7 +582,12 @@ rb_queue_pop(int argc, VALUE *argv, VALUE self) static VALUE rb_queue_empty_p(VALUE self) { - return queue_length(self) == 0 ? Qtrue : Qfalse; + unsigned long items = queue_length(self); + if (QUEUE_CLOSED_P(self)) { + /* Add number of known pending items. */ + items += RARRAY_LEN(GET_QUEUE_PENDING(self)); + } + return items == 0 ? Qtrue : Qfalse; } /* @@ -381,8 +635,8 @@ rb_queue_num_waiting(VALUE self) /* * Document-class: SizedQueue * - * This class represents queues of specified size capacity. The push operation - * may be blocked if the capacity is full. + * This class represents queues of specified size capacity, also known as a + * bounded queue. The push operation may be blocked if the capacity is full. * * See Queue for an example of how a SizedQueue works. */ @@ -397,15 +651,15 @@ rb_queue_num_waiting(VALUE self) static VALUE rb_szqueue_initialize(VALUE self, VALUE vmax) { - long max; - - max = NUM2LONG(vmax); + long max = NUM2LONG(vmax); if (max <= 0) { rb_raise(rb_eArgError, "queue size must be positive"); } RSTRUCT_SET(self, QUEUE_QUE, ary_buf_new()); RSTRUCT_SET(self, QUEUE_WAITERS, ary_buf_new()); + RSTRUCT_SET(self, QUEUE_PENDING, ary_buf_new()); + RSTRUCT_SET(self, QUEUE_TOKEN, ary_buf_new()); RSTRUCT_SET(self, SZQUEUE_WAITERS, ary_buf_new()); RSTRUCT_SET(self, SZQUEUE_MAX, vmax); @@ -472,25 +726,42 @@ szqueue_push_should_block(int argc, const VALUE *argv) * * If there is no space left in the queue, waits until space becomes * available, unless +non_block+ is true. If +non_block+ is true, the - * thread isn't suspended, and an exception is raised. + * thread isn't suspended, and ThreadError('queue full') is raised. */ static VALUE rb_szqueue_push(int argc, VALUE *argv, VALUE self) { - struct waiting_delete args; int should_block = szqueue_push_should_block(argc, argv); - args.waiting = GET_SZQUEUE_WAITERS(self); - args.th = rb_thread_current(); + struct waiting_delete args; + + queue_raise_if_closed(self); while (queue_length(self) >= GET_SZQUEUE_ULONGMAX(self)) { if (!should_block) { rb_raise(rb_eThreadError, "queue full"); } + + args.waiting = GET_SZQUEUE_WAITERS(self); + args.th = rb_thread_current(); + args.pending = GET_QUEUE_PENDING(self); + args.self = self; + rb_ary_push(args.waiting, args.th); - rb_ensure((VALUE (*)())rb_thread_sleep_deadly, (VALUE)0, queue_delete_from_waiting, (VALUE)&args); + rb_ensure(queue_sleep, (VALUE)0, szqueue_delete_from_waiting, (VALUE)&args); } - return queue_do_push(self, argv[0]); + + queue_do_push(self, argv[0]); + + if (QUEUE_CLOSED_P(self)) { + VALUE removed_thread = rb_ary_delete(GET_QUEUE_PENDING(self), rb_thread_current()); + if (removed_thread != Qnil && RARRAY_LEN(GET_QUEUE_PENDING(self)) == 0) { + /* wake all waiting consumers, because there will never be more items. */ + wakeup_all_threads(GET_QUEUE_WAITERS(self)); + } + } + + return self; } static VALUE @@ -515,8 +786,8 @@ szqueue_do_pop(VALUE self, int should_block) * Retrieves data from the queue. * * If the queue is empty, the calling thread is suspended until data is pushed - * onto the queue. If +non_block+ is true, the thread isn't suspended, and an - * exception is raised. + * onto the queue. If +non_block+ is true, the thread isn't suspended, and + * ThreadError('queue empty') is raised. */ static VALUE @@ -590,11 +861,11 @@ Init_thread(void) VALUE rb_cQueue = rb_struct_define_without_accessor_under( OUTER, "Queue", rb_cObject, rb_struct_alloc_noinit, - "que", "waiters", NULL); + "que", "waiters", "pending", "token", NULL); VALUE rb_cSizedQueue = rb_struct_define_without_accessor_under( OUTER, "SizedQueue", rb_cQueue, rb_struct_alloc_noinit, - "que", "waiters", "queue_waiters", "size", NULL); + "que", "waiters", "pending", "token", "queue_waiters", "size", NULL); #if 0 rb_cConditionVariable = rb_define_class("ConditionVariable", rb_cObject); /* teach rdoc ConditionVariable */ @@ -603,6 +874,7 @@ Init_thread(void) #endif id_sleep = rb_intern("sleep"); + id_status = rb_intern("status"); rb_define_method(rb_cConditionVariable, "initialize", rb_condvar_initialize, 0); rb_undef_method(rb_cConditionVariable, "initialize_copy"); @@ -620,6 +892,8 @@ Init_thread(void) rb_define_method(rb_cQueue, "clear", rb_queue_clear, 0); rb_define_method(rb_cQueue, "length", rb_queue_length, 0); rb_define_method(rb_cQueue, "num_waiting", rb_queue_num_waiting, 0); + rb_define_method(rb_cQueue, "close", rb_queue_close, -1); + rb_define_method(rb_cQueue, "closed?", rb_queue_closed_p, 0); /* Alias for #push. */ rb_define_alias(rb_cQueue, "enq", "push"); @@ -639,6 +913,7 @@ Init_thread(void) rb_define_method(rb_cSizedQueue, "pop", rb_szqueue_pop, -1); rb_define_method(rb_cSizedQueue, "clear", rb_szqueue_clear, 0); rb_define_method(rb_cSizedQueue, "num_waiting", rb_szqueue_num_waiting, 0); + rb_define_method(rb_cSizedQueue, "close", rb_szqueue_close, -1); /* Alias for #push. */ rb_define_alias(rb_cSizedQueue, "enq", "push"); diff --git a/test/thread/test_queue.rb b/test/thread/test_queue.rb index 2bd71db..db401e6 100644 --- a/test/thread/test_queue.rb +++ b/test/thread/test_queue.rb @@ -43,6 +43,7 @@ class TestQueue < Test::Unit::TestCase } }.join + # close the queue the old way to test for backwards-compatibility num_threads.times { to_workers.push nil } workers.each { |t| t.join } @@ -277,4 +278,321 @@ class TestQueue < Test::Unit::TestCase Marshal.dump(q) end end + + def test_close + [->{Queue.new}, ->{SizedQueue.new 3}].each do |qcreate| + q = qcreate.call + assert_equal false, q.closed? + q << :something + assert_equal q, q.close + assert q.closed? + assert_raise_with_message(ThreadError, /closed/){q << :nothing} + assert_equal q.pop, :something + assert_nil q.pop + # non-blocking + assert_raise_with_message(ThreadError, /queue empty/){q.pop(non_block=true)} + end + end + + # test that waiting producers are woken up on close + def close_wakeup( num_items, num_threads, &qcreate ) + raise "This test won't work with num_items(#{num_items}) >= num_threads(#{num_threads})" if num_items >= num_threads + + # create the Queue + q = yield + threads = num_threads.times.map{Thread.new{q.pop}} + num_items.times{|i| q << i} + + # wait until queue empty + (Thread.pass; sleep 0.01) until q.size == 0 + + # now there should be some waiting consumers + assert_equal num_threads - num_items, threads.count{|thr| thr.alive? && thr.stop?} + + # tell them all to go away + q.close + + # wait for them to go away + Thread.pass until threads.all?{|thr| thr.status == false} + + # check that they've gone away. Convert nil to -1 so we can sort and do the comparison + expected_values = [-1] * (num_threads - num_items) + num_items.times.to_a + assert_equal expected_values, threads.map{|thr| thr.value || -1 }.sort + end + + def test_queue_close_wakeup + close_wakeup(15, 18){Queue.new} + end + + def test_size_queue_close_wakeup + close_wakeup(5, 8){SizedQueue.new 9} + end + + def test_sized_queue_closed_multi_interrupt + q = SizedQueue.new 1 + q << :one + prod_threads = 32.times.map{|i| Thread.new{ q << i}} + sleep 0.01 until prod_threads.all?{|thr| thr.stop?} + + more_threads = [] + q.close + prod_threads.each{|thr| more_threads << Thread.new{ thr.kill.join }} + more_threads.each &:join + + assert_equal 1, q.size + assert_equal :one, q.pop + assert q.empty?, "queue not empty" + end + + # this might be unnecessary, not sure + def test_sized_queue_two_closed_interrupt + q = SizedQueue.new 1 + q << :one + t1 = Thread.new { q << :two } + t2 = Thread.new { q << :tre } + sleep 0.01 until t1.stop? && t2.stop? + q.close + + t1.kill.join + assert_equal 1, q.size + assert_equal :one, q.pop + assert !q.empty?, "queue empty" + + t2.join + assert_equal :tre, q.pop + assert q.empty?, "queue not empty" + end + + def test_sized_queue_one_closed_interrupt + q = SizedQueue.new 1 + q << :one + t1 = Thread.new { q << :two } + sleep 0.01 until t1.stop? + q.close + + t1.kill.join + assert_equal 1, q.size + assert_equal :one, q.pop + assert q.empty?, "queue not empty" + end + + # make sure that shutdown state is handled properly by empty? for the non-blocking case + def test_empty_non_blocking + q = SizedQueue.new 3 + 3.times{|i| q << i} + + # these all block cos the queue is full + prod_threads = 4.times.map{|i| Thread.new{q << 3+i}} + sleep 0.01 until prod_threads.all?{|thr| thr.status == 'sleep'} + q.close + + items = [] + # sometimes empty? is false but pop will raise ThreadError('empty'), + # meaning a value is not immediately available but will be soon. + until q.empty? + items << q.pop(non_block=true) rescue nil + end + items.compact! + + assert_equal 7.times.to_a, items.sort + assert q.empty? + end + + def test_sized_queue_closed_push_non_blocking + q = SizedQueue.new 7 + q.close + assert_raise_with_message(ThreadError, /queue closed/){q.push(non_block=true)} + end + + def test_blocked_pushers + q = SizedQueue.new 3 + prod_threads = 6.times.map do |i| + thr = Thread.new{q << i}; thr[:pc] = i; thr + end + + # wait until some producer threads have finished, and the other 3 are blocked + sleep 0.01 while prod_threads.reject{|t| t.status}.count < 3 + # this would ensure that all producer threads call push before close + # sleep 0.01 while prod_threads.select{|t| t.status == 'sleep'}.count < 3 + q.close + + # more than prod_threads + cons_threads = 10.times.map do |i| + thr = Thread.new{q.pop}; thr[:pc] = i; thr + end + + # values that came from the queue + popped_values = cons_threads.map &:value + + # pick only the producer threads that got in before close + successful_prod_threads = prod_threads.reject{|thr| thr.status == nil} + assert_nothing_raised{ successful_prod_threads.map(&:value) } + + # the producer threads that tried to push after q.close should all fail + unsuccessful_prod_threads = prod_threads - successful_prod_threads + unsuccessful_prod_threads.each do |thr| + assert_raise(ThreadError){ thr.value } + end + + assert_equal cons_threads.size, popped_values.size + assert_equal 0, q.size + + # check that consumer threads with values match producers that called push before close + assert_equal successful_prod_threads.map{|thr| thr[:pc]}, popped_values.compact.sort + assert_nil q.pop + end + + def test_deny_pushers + [->{Queue.new}, ->{SizedQueue.new 3}].each do |qcreate| + prod_threads = nil + q = qcreate[] + producers_start = Thread.new do + prod_threads = 20.times.map do |i| + Thread.new{ sleep 0.01 until prod_threads; q << i} + end + end + q.close + + # wait for all threads to be finished, because of exceptions + sleep 0.01 until prod_threads && prod_threads.all?{|thr| thr.status == nil} + + # check that all threads failed to call push + prod_threads.each do |thr| + assert_kind_of ThreadError, (thr.value rescue $!) + end + end + end + + # size should account for waiting pushers during shutdown + def sized_queue_size_close + q = SizedQueue.new 4 + 4.times{|i| q << i} + Thread.new{ q << 5 } + Thread.new{ q << 6 } + assert_equal 4, q.size + assert_equal 4, q.items + q.close + assert_equal 6, q.size + assert_equal 4, q.items + end + + def test_blocked_pushers_empty + q = SizedQueue.new 3 + prod_threads = 6.times.map do |i| + Thread.new{ q << i} + end + + # this ensures that all producer threads call push before close + sleep 0.01 while prod_threads.select{|t| t.status == 'sleep'}.count < 3 + q.close + + ary = [] + until q.empty? + ary << q.pop + end + assert_equal 0, q.size + + assert_equal 6, ary.size + assert_equal [0,1,2,3,4,5], ary.sort + assert_nil q.pop + end + + # test thread wakeup on one-element SizedQueue with close + def test_one_element_sized_queue + q = SizedQueue.new 1 + t = Thread.new{ q.pop } + q.close + assert_nil t.value + end + + def test_close_token + [->{Queue.new}, ->{SizedQueue.new 3}].each do |qcreate| + q = qcreate[] + q.close :token + assert_equal :token, q.pop + end + end + + def test_close_token_exception + [->{Queue.new}, ->{SizedQueue.new 3}].each do |qcreate| + q = qcreate[] + q.close RuntimeError.new("no more") + assert_raise(RuntimeError){q.pop} + end + end + + def test_close_token_loop + [->{Queue.new}, ->{SizedQueue.new 3}].each do |qcreate| + q = qcreate[] + popped_items = [] + consumer_thread = Thread.new{loop{popped_items << q.pop}; :done} + 7.times{|i| q << i} + q.close StopIteration + assert_equal :done, consumer_thread.value + assert_equal 7.times.to_a, popped_items + end + end + + def test_close_wrong_token + [->{Queue.new}, ->{SizedQueue.new 3}].each do |qcreate| + q = qcreate[] + q.close :token + assert_raise(ThreadError){q.close :another_token} + + q = qcreate[] + q.close + assert_raise(ThreadError){q.close :not_nil} + end + end + + def test_queue_close_multi_multi + q = SizedQueue.new rand(800..1200) + + count_items = rand(3000..5000) + count_producers = rand(10..20) + + producers = count_producers.times.map do + Thread.new do + sleep(rand / 100) + count_items.times{|i| q << [i,"#{i} for #{Thread.current.inspect}"]} + end + end + + consumers = rand(7..12).times.map do + Thread.new do + count = 0 + loop do + i, st = q.pop + count += 1 if i.is_a?(Fixnum) && st.is_a?(String) + end + count + end + end + + # No dead or finished threads + assert (consumers + producers).all?{|thr| thr.status =~ /\Arun|sleep\Z/}, 'no threads runnning' + + # just exercising the concurrency of the support methods. + counter = Thread.new do + until q.closed? && q.empty? + raise if q.size > q.max + # otherwise this exercise causes too much contention on the lock + sleep 0.01 + end + end + + producers.each &:join + q.close StopIteration + + # results not randomly distributed. Not sure why. + # consumers.map{|thr| thr.value}.each do |x| + # assert_not_equal 0, x + # end + + all_items_count = consumers.map{|thr| thr.value}.inject(:+) + assert_equal count_items * count_producers, all_items_count + + # don't leak this thread + assert_nothing_raised{counter.join} + end end