Project

General

Profile

Feature #10600 » patch-25f99aef.diff

djellemah (John Anderson), 02/25/2015 07:51 PM

View differences:

ext/thread/thread.c
enum {
QUEUE_QUE = 0,
QUEUE_WAITERS = 1,
SZQUEUE_WAITERS = 2,
SZQUEUE_MAX = 3
QUEUE_PENDING = 2,
QUEUE_TOKEN = 3,
SZQUEUE_WAITERS = 4,
SZQUEUE_MAX = 5
};
#define GET_CONDVAR_WAITERS(cv) get_array((cv), CONDVAR_WAITERS)
#define GET_QUEUE_QUE(q) get_array((q), QUEUE_QUE)
#define GET_QUEUE_WAITERS(q) get_array((q), QUEUE_WAITERS)
#define GET_QUEUE_TOKEN(q) get_array((q), QUEUE_TOKEN)
#define GET_SZQUEUE_WAITERS(q) get_array((q), SZQUEUE_WAITERS)
#define GET_SZQUEUE_MAX(q) RSTRUCT_GET((q), SZQUEUE_MAX)
#define GET_SZQUEUE_ULONGMAX(q) NUM2ULONG(GET_SZQUEUE_MAX(q))
/*
QUEUE_PENDING is the collection producer threads at the point where close
was called. Can't use SZQUEUE_WAITERS for this because
rb_ensure(rb_thread_sleep_deadly ...) is not atomic with respect to
add/remove of threads in SZQUEUE_WAITERS.
*/
#define GET_QUEUE_PENDING(q) get_array((q), QUEUE_PENDING)
/* Has the close method been called? */
#define QUEUE_CLOSED_P(self) RARRAY_LEN(GET_QUEUE_TOKEN(self))
static VALUE
get_array(VALUE obj, int idx)
{
......
*
* This class provides a way to synchronize communication between threads.
*
* Example:
*
* require 'thread'
* queue = Queue.new
*
* producer = Thread.new do
* 5.times do |i|
* sleep rand(i) # simulate expense
* queue << i
* puts "#{i} produced"
* end
* end
*
* consumer = Thread.new do
* 5.times do |i|
* value = queue.pop
* sleep rand(i/2) # simulate expense
* puts "consumed #{value}"
* end
* end
* Example using close(StopIteration):
*
* require 'thread'
* queue = Queue.new
*
* consumers = a_few.times.map do
* Thread.new do
* loop{ do_something_with queue.pop}
* end
* end
*
* producers = some.times.map do
* Thread.new do
* several.times{ q << something_interesting }
* end
* end
* producers.each(&:join)
* q.close StopIteration
*
*
* Example using nil to close queue:
*
* require 'thread'
* queue = Queue.new
*
* consumers = a_few.times.map do
* Thread.new do
* while item = queue.pop
* do_something_with item
* end
* end
* end
*
* producers = some.times.map do
* Thread.new do
* several.times{ q << something_interesting }
* end
* end
* producers.each(&:join)
* q.close
*
*
* Example using traditional consumer <-> producer coupling (1 producer, 1 consumer):
*
* require 'thread'
* queue = Queue.new
*
* producer = Thread.new do
* 5.times do |i|
* sleep rand(i) # simulate expense
* queue << i
* puts "#{i} produced"
* end
* end
*
* consumer = Thread.new do
* 5.times do |i|
* value = queue.pop
* sleep rand(i/2) # simulate expense
* puts "consumed #{value}"
* end
* end
*
* Example using queue poison token (1 producer, 1 consumer):
*
* require 'thread'
* queue = Queue.new
*
* producer = Thread.new do
* 5.times do |i|
* sleep rand(i) # simulate expense
* queue << i
* puts "#{i} produced"
* end
* q << :poison
* end
*
* consumer = Thread.new do
* while item = queue.pop
* break if item == :poison
* sleep rand(i/2) # simulate expense
* puts "consumed #{value}"
* end
* end
*
*/
......
{
RSTRUCT_SET(self, QUEUE_QUE, ary_buf_new());
RSTRUCT_SET(self, QUEUE_WAITERS, ary_buf_new());
RSTRUCT_SET(self, QUEUE_PENDING, ary_buf_new());
RSTRUCT_SET(self, QUEUE_TOKEN, ary_buf_new());
return self;
}
static VALUE
queue_raise_if_closed(VALUE self)
{
if (QUEUE_CLOSED_P(self)) {
rb_raise(rb_eThreadError, "queue closed");
}
return Qnil;
}
static VALUE
queue_do_push(VALUE self, VALUE obj)
{
/* TODO would be nice to not have to use macro to check for correct
initialization on every single call to push. */
rb_ary_push(GET_QUEUE_QUE(self), obj);
wakeup_first_thread(GET_QUEUE_WAITERS(self));
return self;
......
static VALUE
rb_queue_push(VALUE self, VALUE obj)
{
queue_raise_if_closed(self);
return queue_do_push(self, obj);
}
......
return RARRAY_LEN(waiters);
}
/*
Do close accounting.
1) Set CLOSE_TOKEN
2) copy pending producers to QUEUE_PENDING. This is to work around
SZQUEUE_WAITERS updating non-atomically in the thread
wake/sleep handling.
CLOSE_TOKEN contains an array with 0 or 1 elements. This is to allow the
queue to be closed several times, provided the same value is used each
time, and having an array makes that test easier.
*/
static VALUE
queue_set_close_state(int argc, VALUE *argv, VALUE self, VALUE pending_producers)
{
VALUE close_token = Qnil;
VALUE existing_close_token = Qnil;
VALUE close_token_ary = GET_QUEUE_TOKEN(self);
rb_check_arity(argc, 0, 1);
/* handle arg defaults, and conversion to an Exception instance. */
if (argc > 0) {
close_token = argv[0];
if (rb_obj_is_kind_of(close_token,rb_cClass) && rb_class_inherited_p(close_token,rb_eException)) {
close_token = rb_exc_new2(close_token, "queue closed");
}
}
/* Allow close to be called several times, with the same argument (same-ness defined by ==). */
if (RARRAY_LEN(close_token_ary) == 0) {
rb_ary_store(close_token_ary, 0, close_token);
/* Start accounting for pending producers. Can only do this once. */
rb_ary_concat(GET_QUEUE_PENDING(self), pending_producers);
return close_token;
} else {
existing_close_token = RARRAY_AREF(close_token_ary, 0);
if (!rb_eql(existing_close_token, close_token)) {
rb_raise(rb_eThreadError, "already closed with %"PRIsVALUE, rb_inspect(existing_close_token));
}
return existing_close_token;
}
}
/*
* Document-method: Queue#close
* call-seq:
* close(token=nil)
*
* Closes the queue to producers. A closed queue cannot be re-opened.
*
* +token+ can be any object, or an instance of an Exception subclass, or an
* Exception subclass. +close+ can be called repeatedly on a single queue as
* long as the same (defined by ==) +token+ is used.
*
* After the call to close completes, the following are true:
*
* - +closed?+ will return true
*
* - calling enq/push/<< will raise ThreadError('queue closed')
*
* - when +empty?+ is false, calling deq/pop/shift will return an object from the queue as usual.
*
* - when +empty?+ is true, deq(non_block=false) will not suspend and
* will either return the +token+, or raise if +token+ was an exception
* instance or class. deq(non_block=true) will ignore the parameter and
* raise a ThreadError('queue empty').
*
* And for SizedQueue, these will also be true:
*
* - each thread already suspended in enq at the time of the call
* to close will be allowed to push its object as usual.
*
* - +empty?+ will be false when there are either objects in the queue, or
* producers which were suspended at the time of the call to +close+ but whose
* objects are not yet in the queue. Therefore, it can be true (very
* briefly) that empty? == false && size == 0, since +size+ returns the number
* of objects actually in the queue.
*/
static VALUE
rb_queue_close(int argc, VALUE *argv, VALUE self)
{
/* Never any pending producers for ordinary queue, so pending is empty. */
queue_set_close_state(argc, argv, self, ary_buf_new());
if (queue_length(self) < queue_num_waiting(self)) {
wakeup_all_threads(GET_QUEUE_WAITERS(self));
}
return self;
}
static VALUE
rb_szqueue_close(int argc, VALUE *argv, VALUE self)
{
queue_set_close_state(argc, argv, self, GET_SZQUEUE_WAITERS(self));
if (RARRAY_LEN(GET_QUEUE_PENDING(self)) == 0) {
/* Wake up all consumers because there will be never be more items. */
wakeup_all_threads(GET_QUEUE_WAITERS(self));
}
return self;
}
struct waiting_delete {
VALUE waiting;
VALUE th;
VALUE pending;
VALUE self;
};
static VALUE
queue_delete_from_waiting(struct waiting_delete *p)
{
/* The waiting queues */
rb_ary_delete(p->waiting, p->th);
return Qnil;
}
static ID id_status;
static VALUE
szqueue_delete_from_waiting(struct waiting_delete *p)
{
/* The waiting queues */
rb_ary_delete(p->waiting, p->th);
/* Handle the post-close pending producers on thread abort. Only applies to SizedQueue. */
if (QUEUE_CLOSED_P(p->self)) {
/* TODO This is not great, but all the better ways seem to involved rb_thread_t.status
And it won't get in the way of normal queue operation. */
if (strcmp("aborting",RSTRING_PTR(rb_funcall(p->th,id_status,0))) == 0) {
rb_ary_delete(p->pending, p->th);
}
}
return Qnil;
}
......
}
static VALUE
queue_close_pop(VALUE self)
{
VALUE token = RARRAY_AREF(GET_QUEUE_TOKEN(self),0);
if (rb_obj_is_kind_of(token,rb_eException)) {
rb_exc_raise(token);
}
return token;
}
static VALUE
queue_do_pop(VALUE self, int should_block)
{
struct waiting_delete args;
args.waiting = GET_QUEUE_WAITERS(self);
args.th = rb_thread_current();
while (queue_length(self) == 0) {
if (!should_block) {
rb_raise(rb_eThreadError, "queue empty");
}
if (QUEUE_CLOSED_P(self) && RARRAY_LEN(GET_QUEUE_PENDING(self)) == 0) {
return queue_close_pop(self);
}
args.waiting = GET_QUEUE_WAITERS(self);
args.th = rb_thread_current();
args.pending = Qnil;
args.self = Qnil;
rb_ary_push(args.waiting, args.th);
rb_ensure(queue_sleep, (VALUE)0, queue_delete_from_waiting, (VALUE)&args);
}
......
}
/*
* Document-method: Queue#closed?
* call-seq: closed?
*
* Returns +true+ if the queue is closed.
*/
static VALUE
rb_queue_closed_p(VALUE self)
{
return QUEUE_CLOSED_P(self) ? Qtrue : Qfalse;
}
/*
* Document-method: Queue#empty?
* call-seq: empty?
*
......
static VALUE
rb_queue_empty_p(VALUE self)
{
return queue_length(self) == 0 ? Qtrue : Qfalse;
unsigned long items = queue_length(self);
if (QUEUE_CLOSED_P(self)) {
/* Add number of known pending items. */
items += RARRAY_LEN(GET_QUEUE_PENDING(self));
}
return items == 0 ? Qtrue : Qfalse;
}
/*
......
/*
* Document-class: SizedQueue
*
* This class represents queues of specified size capacity. The push operation
* may be blocked if the capacity is full.
* This class represents queues of specified size capacity, also known as a
* bounded queue. The push operation may be blocked if the capacity is full.
*
* See Queue for an example of how a SizedQueue works.
*/
......
static VALUE
rb_szqueue_initialize(VALUE self, VALUE vmax)
{
long max;
max = NUM2LONG(vmax);
long max = NUM2LONG(vmax);
if (max <= 0) {
rb_raise(rb_eArgError, "queue size must be positive");
}
RSTRUCT_SET(self, QUEUE_QUE, ary_buf_new());
RSTRUCT_SET(self, QUEUE_WAITERS, ary_buf_new());
RSTRUCT_SET(self, QUEUE_PENDING, ary_buf_new());
RSTRUCT_SET(self, QUEUE_TOKEN, ary_buf_new());
RSTRUCT_SET(self, SZQUEUE_WAITERS, ary_buf_new());
RSTRUCT_SET(self, SZQUEUE_MAX, vmax);
......
*
* If there is no space left in the queue, waits until space becomes
* available, unless +non_block+ is true. If +non_block+ is true, the
* thread isn't suspended, and an exception is raised.
* thread isn't suspended, and ThreadError('queue full') is raised.
*/
static VALUE
rb_szqueue_push(int argc, VALUE *argv, VALUE self)
{
struct waiting_delete args;
int should_block = szqueue_push_should_block(argc, argv);
args.waiting = GET_SZQUEUE_WAITERS(self);
args.th = rb_thread_current();
struct waiting_delete args;
queue_raise_if_closed(self);
while (queue_length(self) >= GET_SZQUEUE_ULONGMAX(self)) {
if (!should_block) {
rb_raise(rb_eThreadError, "queue full");
}
args.waiting = GET_SZQUEUE_WAITERS(self);
args.th = rb_thread_current();
args.pending = GET_QUEUE_PENDING(self);
args.self = self;
rb_ary_push(args.waiting, args.th);
rb_ensure((VALUE (*)())rb_thread_sleep_deadly, (VALUE)0, queue_delete_from_waiting, (VALUE)&args);
rb_ensure(queue_sleep, (VALUE)0, szqueue_delete_from_waiting, (VALUE)&args);
}
return queue_do_push(self, argv[0]);
queue_do_push(self, argv[0]);
if (QUEUE_CLOSED_P(self)) {
VALUE removed_thread = rb_ary_delete(GET_QUEUE_PENDING(self), rb_thread_current());
if (removed_thread != Qnil && RARRAY_LEN(GET_QUEUE_PENDING(self)) == 0) {
/* wake all waiting consumers, because there will never be more items. */
wakeup_all_threads(GET_QUEUE_WAITERS(self));
}
}
return self;
}
static VALUE
......
* Retrieves data from the queue.
*
* If the queue is empty, the calling thread is suspended until data is pushed
* onto the queue. If +non_block+ is true, the thread isn't suspended, and an
* exception is raised.
* onto the queue. If +non_block+ is true, the thread isn't suspended, and
* ThreadError('queue empty') is raised.
*/
static VALUE
......
VALUE rb_cQueue = rb_struct_define_without_accessor_under(
OUTER,
"Queue", rb_cObject, rb_struct_alloc_noinit,
"que", "waiters", NULL);
"que", "waiters", "pending", "token", NULL);
VALUE rb_cSizedQueue = rb_struct_define_without_accessor_under(
OUTER,
"SizedQueue", rb_cQueue, rb_struct_alloc_noinit,
"que", "waiters", "queue_waiters", "size", NULL);
"que", "waiters", "pending", "token", "queue_waiters", "size", NULL);
#if 0
rb_cConditionVariable = rb_define_class("ConditionVariable", rb_cObject); /* teach rdoc ConditionVariable */
......
#endif
id_sleep = rb_intern("sleep");
id_status = rb_intern("status");
rb_define_method(rb_cConditionVariable, "initialize", rb_condvar_initialize, 0);
rb_undef_method(rb_cConditionVariable, "initialize_copy");
......
rb_define_method(rb_cQueue, "clear", rb_queue_clear, 0);
rb_define_method(rb_cQueue, "length", rb_queue_length, 0);
rb_define_method(rb_cQueue, "num_waiting", rb_queue_num_waiting, 0);
rb_define_method(rb_cQueue, "close", rb_queue_close, -1);
rb_define_method(rb_cQueue, "closed?", rb_queue_closed_p, 0);
/* Alias for #push. */
rb_define_alias(rb_cQueue, "enq", "push");
......
rb_define_method(rb_cSizedQueue, "pop", rb_szqueue_pop, -1);
rb_define_method(rb_cSizedQueue, "clear", rb_szqueue_clear, 0);
rb_define_method(rb_cSizedQueue, "num_waiting", rb_szqueue_num_waiting, 0);
rb_define_method(rb_cSizedQueue, "close", rb_szqueue_close, -1);
/* Alias for #push. */
rb_define_alias(rb_cSizedQueue, "enq", "push");
test/thread/test_queue.rb
}
}.join
# close the queue the old way to test for backwards-compatibility
num_threads.times { to_workers.push nil }
workers.each { |t| t.join }
......
Marshal.dump(q)
end
end
def test_close
[->{Queue.new}, ->{SizedQueue.new 3}].each do |qcreate|
q = qcreate.call
assert_equal false, q.closed?
q << :something
assert_equal q, q.close
assert q.closed?
assert_raise_with_message(ThreadError, /closed/){q << :nothing}
assert_equal q.pop, :something
assert_nil q.pop
# non-blocking
assert_raise_with_message(ThreadError, /queue empty/){q.pop(non_block=true)}
end
end
# test that waiting producers are woken up on close
def close_wakeup( num_items, num_threads, &qcreate )
raise "This test won't work with num_items(#{num_items}) >= num_threads(#{num_threads})" if num_items >= num_threads
# create the Queue
q = yield
threads = num_threads.times.map{Thread.new{q.pop}}
num_items.times{|i| q << i}
# wait until queue empty
(Thread.pass; sleep 0.01) until q.size == 0
# now there should be some waiting consumers
assert_equal num_threads - num_items, threads.count{|thr| thr.alive? && thr.stop?}
# tell them all to go away
q.close
# wait for them to go away
Thread.pass until threads.all?{|thr| thr.status == false}
# check that they've gone away. Convert nil to -1 so we can sort and do the comparison
expected_values = [-1] * (num_threads - num_items) + num_items.times.to_a
assert_equal expected_values, threads.map{|thr| thr.value || -1 }.sort
end
def test_queue_close_wakeup
close_wakeup(15, 18){Queue.new}
end
def test_size_queue_close_wakeup
close_wakeup(5, 8){SizedQueue.new 9}
end
def test_sized_queue_closed_multi_interrupt
q = SizedQueue.new 1
q << :one
prod_threads = 32.times.map{|i| Thread.new{ q << i}}
sleep 0.01 until prod_threads.all?{|thr| thr.stop?}
more_threads = []
q.close
prod_threads.each{|thr| more_threads << Thread.new{ thr.kill.join }}
more_threads.each &:join
assert_equal 1, q.size
assert_equal :one, q.pop
assert q.empty?, "queue not empty"
end
# this might be unnecessary, not sure
def test_sized_queue_two_closed_interrupt
q = SizedQueue.new 1
q << :one
t1 = Thread.new { q << :two }
t2 = Thread.new { q << :tre }
sleep 0.01 until t1.stop? && t2.stop?
q.close
t1.kill.join
assert_equal 1, q.size
assert_equal :one, q.pop
assert !q.empty?, "queue empty"
t2.join
assert_equal :tre, q.pop
assert q.empty?, "queue not empty"
end
def test_sized_queue_one_closed_interrupt
q = SizedQueue.new 1
q << :one
t1 = Thread.new { q << :two }
sleep 0.01 until t1.stop?
q.close
t1.kill.join
assert_equal 1, q.size
assert_equal :one, q.pop
assert q.empty?, "queue not empty"
end
# make sure that shutdown state is handled properly by empty? for the non-blocking case
def test_empty_non_blocking
q = SizedQueue.new 3
3.times{|i| q << i}
# these all block cos the queue is full
prod_threads = 4.times.map{|i| Thread.new{q << 3+i}}
sleep 0.01 until prod_threads.all?{|thr| thr.status == 'sleep'}
q.close
items = []
# sometimes empty? is false but pop will raise ThreadError('empty'),
# meaning a value is not immediately available but will be soon.
until q.empty?
items << q.pop(non_block=true) rescue nil
end
items.compact!
assert_equal 7.times.to_a, items.sort
assert q.empty?
end
def test_sized_queue_closed_push_non_blocking
q = SizedQueue.new 7
q.close
assert_raise_with_message(ThreadError, /queue closed/){q.push(non_block=true)}
end
def test_blocked_pushers
q = SizedQueue.new 3
prod_threads = 6.times.map do |i|
thr = Thread.new{q << i}; thr[:pc] = i; thr
end
# wait until some producer threads have finished, and the other 3 are blocked
sleep 0.01 while prod_threads.reject{|t| t.status}.count < 3
# this would ensure that all producer threads call push before close
# sleep 0.01 while prod_threads.select{|t| t.status == 'sleep'}.count < 3
q.close
# more than prod_threads
cons_threads = 10.times.map do |i|
thr = Thread.new{q.pop}; thr[:pc] = i; thr
end
# values that came from the queue
popped_values = cons_threads.map &:value
# pick only the producer threads that got in before close
successful_prod_threads = prod_threads.reject{|thr| thr.status == nil}
assert_nothing_raised{ successful_prod_threads.map(&:value) }
# the producer threads that tried to push after q.close should all fail
unsuccessful_prod_threads = prod_threads - successful_prod_threads
unsuccessful_prod_threads.each do |thr|
assert_raise(ThreadError){ thr.value }
end
assert_equal cons_threads.size, popped_values.size
assert_equal 0, q.size
# check that consumer threads with values match producers that called push before close
assert_equal successful_prod_threads.map{|thr| thr[:pc]}, popped_values.compact.sort
assert_nil q.pop
end
def test_deny_pushers
[->{Queue.new}, ->{SizedQueue.new 3}].each do |qcreate|
prod_threads = nil
q = qcreate[]
producers_start = Thread.new do
prod_threads = 20.times.map do |i|
Thread.new{ sleep 0.01 until prod_threads; q << i}
end
end
q.close
# wait for all threads to be finished, because of exceptions
sleep 0.01 until prod_threads && prod_threads.all?{|thr| thr.status == nil}
# check that all threads failed to call push
prod_threads.each do |thr|
assert_kind_of ThreadError, (thr.value rescue $!)
end
end
end
# size should account for waiting pushers during shutdown
def sized_queue_size_close
q = SizedQueue.new 4
4.times{|i| q << i}
Thread.new{ q << 5 }
Thread.new{ q << 6 }
assert_equal 4, q.size
assert_equal 4, q.items
q.close
assert_equal 6, q.size
assert_equal 4, q.items
end
def test_blocked_pushers_empty
q = SizedQueue.new 3
prod_threads = 6.times.map do |i|
Thread.new{ q << i}
end
# this ensures that all producer threads call push before close
sleep 0.01 while prod_threads.select{|t| t.status == 'sleep'}.count < 3
q.close
ary = []
until q.empty?
ary << q.pop
end
assert_equal 0, q.size
assert_equal 6, ary.size
assert_equal [0,1,2,3,4,5], ary.sort
assert_nil q.pop
end
# test thread wakeup on one-element SizedQueue with close
def test_one_element_sized_queue
q = SizedQueue.new 1
t = Thread.new{ q.pop }
q.close
assert_nil t.value
end
def test_close_token
[->{Queue.new}, ->{SizedQueue.new 3}].each do |qcreate|
q = qcreate[]
q.close :token
assert_equal :token, q.pop
end
end
def test_close_token_exception
[->{Queue.new}, ->{SizedQueue.new 3}].each do |qcreate|
q = qcreate[]
q.close RuntimeError.new("no more")
assert_raise(RuntimeError){q.pop}
end
end
def test_close_token_loop
[->{Queue.new}, ->{SizedQueue.new 3}].each do |qcreate|
q = qcreate[]
popped_items = []
consumer_thread = Thread.new{loop{popped_items << q.pop}; :done}
7.times{|i| q << i}
q.close StopIteration
assert_equal :done, consumer_thread.value
assert_equal 7.times.to_a, popped_items
end
end
def test_close_wrong_token
[->{Queue.new}, ->{SizedQueue.new 3}].each do |qcreate|
q = qcreate[]
q.close :token
assert_raise(ThreadError){q.close :another_token}
q = qcreate[]
q.close
assert_raise(ThreadError){q.close :not_nil}
end
end
def test_queue_close_multi_multi
q = SizedQueue.new rand(800..1200)
count_items = rand(3000..5000)
count_producers = rand(10..20)
producers = count_producers.times.map do
Thread.new do
sleep(rand / 100)
count_items.times{|i| q << [i,"#{i} for #{Thread.current.inspect}"]}
end
end
consumers = rand(7..12).times.map do
Thread.new do
count = 0
loop do
i, st = q.pop
count += 1 if i.is_a?(Fixnum) && st.is_a?(String)
end
count
end
end
# No dead or finished threads
assert (consumers + producers).all?{|thr| thr.status =~ /\Arun|sleep\Z/}, 'no threads runnning'
# just exercising the concurrency of the support methods.
counter = Thread.new do
until q.closed? && q.empty?
raise if q.size > q.max
# otherwise this exercise causes too much contention on the lock
sleep 0.01
end
end
producers.each &:join
q.close StopIteration
# results not randomly distributed. Not sure why.
# consumers.map{|thr| thr.value}.each do |x|
# assert_not_equal 0, x
# end
all_items_count = consumers.map{|thr| thr.value}.inject(:+)
assert_equal count_items * count_producers, all_items_count
# don't leak this thread
assert_nothing_raised{counter.join}
end
end
(3-3/4)