Project

General

Profile

Feature #10600 » queue-close.diff

djellemah (John Anderson), 12/15/2014 09:10 AM

View differences:

ext/thread/thread.c
enum {
QUEUE_QUE = 0,
QUEUE_WAITERS = 1,
SZQUEUE_WAITERS = 2,
SZQUEUE_MAX = 3
QUEUE_CLOSED = 2,
SZQUEUE_WAITERS = 3,
SZQUEUE_MAX = 4
};
#define GET_CONDVAR_WAITERS(cv) get_array((cv), CONDVAR_WAITERS)
......
{
RSTRUCT_SET(self, QUEUE_QUE, ary_buf_new());
RSTRUCT_SET(self, QUEUE_WAITERS, ary_buf_new());
RSTRUCT_SET(self, QUEUE_CLOSED, Qfalse);
return self;
}
static VALUE
queue_raise_if_closed(VALUE self)
{
if (RSTRUCT_GET(self, QUEUE_CLOSED) == Qtrue ) {
rb_raise(rb_eThreadError, "queue closed");
}
return Qnil;
}
static VALUE
queue_do_push(VALUE self, VALUE obj)
{
rb_ary_push(GET_QUEUE_QUE(self), obj);
......
static VALUE
rb_queue_push(VALUE self, VALUE obj)
{
queue_raise_if_closed(self);
return queue_do_push(self, obj);
}
......
return RARRAY_LEN(waiters);
}
/*
* Document-method: Queue#close
* call-seq: close
*
* Closes the queue.
*
* Once the queue is closed, enq will raise an exception, although remaining
* items in the queue can be deq'd as usual. Any threads waiting on
* pop(non_block=false) at the time the queue is closed will be woken and
* given a nil.
*
* Once a closed queue is empty, deq with non_block=false will always return
* nil.
*
* A closed queue cannot be reopened.
*/
static VALUE
rb_queue_close(VALUE self)
{
RSTRUCT_SET(self, QUEUE_CLOSED, Qtrue);
if (queue_num_waiting(self) > 0) {
wakeup_all_threads(GET_QUEUE_WAITERS(self));
}
return self;
}
struct waiting_delete {
VALUE waiting;
VALUE th;
......
if (!should_block) {
rb_raise(rb_eThreadError, "queue empty");
}
if (RSTRUCT_GET(self,QUEUE_CLOSED) == Qtrue) {
return Qnil;
}
rb_ary_push(args.waiting, args.th);
rb_ensure(queue_sleep, (VALUE)0, queue_delete_from_waiting, (VALUE)&args);
}
......
}
/*
* Document-method: Queue#closed?
* call-seq: closed?
*
* Returns +true+ if the queue is closed.
*/
static VALUE
rb_queue_closed_p(VALUE self)
{
return RSTRUCT_GET(self, QUEUE_CLOSED);
}
/*
* Document-method: Queue#empty?
* call-seq: empty?
*
......
RSTRUCT_SET(self, QUEUE_QUE, ary_buf_new());
RSTRUCT_SET(self, QUEUE_WAITERS, ary_buf_new());
RSTRUCT_SET(self, QUEUE_CLOSED, Qfalse);
RSTRUCT_SET(self, SZQUEUE_WAITERS, ary_buf_new());
RSTRUCT_SET(self, SZQUEUE_MAX, vmax);
......
{
struct waiting_delete args;
int should_block = szqueue_push_should_block(argc, argv);
queue_raise_if_closed(self);
args.waiting = GET_SZQUEUE_WAITERS(self);
args.th = rb_thread_current();
......
VALUE rb_cQueue = rb_struct_define_without_accessor_under(
OUTER,
"Queue", rb_cObject, rb_struct_alloc_noinit,
"que", "waiters", NULL);
"que", "waiters", "closed", NULL);
VALUE rb_cSizedQueue = rb_struct_define_without_accessor_under(
OUTER,
"SizedQueue", rb_cQueue, rb_struct_alloc_noinit,
"que", "waiters", "queue_waiters", "size", NULL);
"que", "waiters", "closed", "queue_waiters", "size", NULL);
#if 0
rb_cConditionVariable = rb_define_class("ConditionVariable", rb_cObject); /* teach rdoc ConditionVariable */
......
rb_define_method(rb_cQueue, "clear", rb_queue_clear, 0);
rb_define_method(rb_cQueue, "length", rb_queue_length, 0);
rb_define_method(rb_cQueue, "num_waiting", rb_queue_num_waiting, 0);
rb_define_method(rb_cQueue, "close", rb_queue_close, 0);
rb_define_method(rb_cQueue, "closed?", rb_queue_closed_p, 0);
/* Alias for #push. */
rb_define_alias(rb_cQueue, "enq", "push");
test/thread/test_queue.rb
Marshal.dump(q)
end
end
def test_close
[->{Queue.new}, ->{SizedQueue.new 3}].each do |qcreate|
q = qcreate[]
assert_equal false, q.closed?
q << :something
q.close
assert_equal true, q.closed?
assert_raise_with_message(ThreadError, /closed/){q << :nothing}
assert_equal q.pop, :something
assert_equal q.pop, nil
end
end
def test_close_wakeup
[->{Queue.new}, ->{SizedQueue.new 3}].each do |qcreate|
q = qcreate[]
threads = []
4.times{threads << Thread.new{q.pop}}
3.times{|i| q << i}
sleep 0.01
assert_equal 1, threads.count{|thr| thr.alive? && thr.stop?}
q.close
assert_equal 0, threads.count{|thr| thr.alive? && thr.stop?}
end
end
end
(1-1/4)