copy_stream_interrupt_handling.patch

Akira Tanaka, 05/29/2011 07:29 PM

Download (7.97 KB)

View differences:

io.c (working copy)
13 13

  
14 14
#include "ruby/ruby.h"
15 15
#include "ruby/io.h"
16
#include "vm_core.h"
16 17
#include "dln.h"
17 18
#include <ctype.h>
18 19
#include <errno.h>
......
8384 8385
    VALUE th;
8385 8386
};
8386 8387

  
8388
static void *
8389
exec_interrupts(void *arg)
8390
{
8391
    rb_thread_t *th = arg;
8392
    rb_threadptr_execute_interrupts(th);
8393
    return NULL;
8394
}
8395

  
8396
/*
8397
 * returns TRUE if the preceding system call was interrupted
8398
 * so we can continue.  If the thread was interrupted, we
8399
 * reacquire the GVL to execute interrupts before continuing.
8400
 */
8387 8401
static int
8388
maygvl_copy_stream_wait_read(struct copy_stream_struct *stp)
8402
maygvl_copy_stream_continue_p(int has_gvl, struct copy_stream_struct *stp)
8403
{
8404
    switch (errno) {
8405
      case EINTR:
8406
#if defined(ERESTART)
8407
      case ERESTART:
8408
#endif
8409
	if (rb_thread_interrupted(stp->th))
8410
            if (has_gvl)
8411
                rb_threadptr_execute_interrupts((rb_thread_t *)stp->th);
8412
            else
8413
                rb_thread_call_with_gvl(exec_interrupts, (void *)stp->th);
8414
	return TRUE;
8415
    }
8416
    return FALSE;
8417
}
8418

  
8419
static int
8420
maygvl_select(int has_gvl, int n, rb_fdset_t *rfds, rb_fdset_t *wfds, rb_fdset_t *efds, struct timeval *timeout)
8421
{
8422
    if (has_gvl)
8423
        return rb_thread_fd_select(n, rfds, wfds, efds, timeout);
8424
    else
8425
        return rb_fd_select(n, rfds, wfds, efds, timeout);
8426
}
8427

  
8428
static int
8429
maygvl_copy_stream_wait_read(int has_gvl, struct copy_stream_struct *stp)
8389 8430
{
8390 8431
    int ret;
8391
    rb_fd_zero(&stp->fds);
8392
    rb_fd_set(stp->src_fd, &stp->fds);
8393
    ret = rb_fd_select(rb_fd_max(&stp->fds), &stp->fds, NULL, NULL, NULL);
8432

  
8433
    do {
8434
	rb_fd_zero(&stp->fds);
8435
	rb_fd_set(stp->src_fd, &stp->fds);
8436
        ret = maygvl_select(has_gvl, rb_fd_max(&stp->fds), &stp->fds, NULL, NULL, NULL);
8437
    } while (ret == -1 && maygvl_copy_stream_continue_p(has_gvl, stp));
8438

  
8394 8439
    if (ret == -1) {
8395 8440
        stp->syserr = "select";
8396 8441
        stp->error_no = errno;
......
8403 8448
nogvl_copy_stream_wait_write(struct copy_stream_struct *stp)
8404 8449
{
8405 8450
    int ret;
8406
    rb_fd_zero(&stp->fds);
8407
    rb_fd_set(stp->dst_fd, &stp->fds);
8408
    ret = rb_fd_select(rb_fd_max(&stp->fds), NULL, &stp->fds, NULL, NULL);
8451

  
8452
    do {
8453
	rb_fd_zero(&stp->fds);
8454
	rb_fd_set(stp->dst_fd, &stp->fds);
8455
        ret = rb_fd_select(rb_fd_max(&stp->fds), NULL, &stp->fds, NULL, NULL);
8456
    } while (ret == -1 && maygvl_copy_stream_continue_p(0, stp));
8457

  
8409 8458
    if (ret == -1) {
8410 8459
        stp->syserr = "select";
8411 8460
        stp->error_no = errno;
......
8467 8516

  
8468 8517
#ifdef USE_SENDFILE
8469 8518
static int
8470
maygvl_copy_stream_wait_readwrite(struct copy_stream_struct *stp)
8519
maygvl_copy_stream_wait_readwrite(int has_gvl, struct copy_stream_struct *stp)
8471 8520
{
8472 8521
    int ret;
8473 8522
    rb_fd_zero(&stp->fds);
8474 8523
    rb_fd_set(stp->src_fd, &stp->fds);
8475 8524
    rb_fd_set(stp->dst_fd, &stp->fds);
8476
    ret = rb_fd_select(rb_fd_max(&stp->fds), &stp->fds, NULL, NULL, NULL);
8525
    ret = maygvl_select(has_gvl, rb_fd_max(&stp->fds), &stp->fds, NULL, NULL, NULL);
8477 8526
    if (ret == -1) {
8478 8527
        stp->syserr = "select";
8479 8528
        stp->error_no = errno;
......
8552 8601
        }
8553 8602
    }
8554 8603
    if (ss == -1) {
8604
	if (maygvl_copy_stream_continue_p(0, stp))
8605
	    goto retry_sendfile;
8555 8606
        switch (errno) {
8556 8607
	  case EINVAL:
8557 8608
#ifdef ENOSYS
......
8562 8613
#if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN
8563 8614
	  case EWOULDBLOCK:
8564 8615
#endif
8565
            if (maygvl_copy_stream_wait_readwrite(stp) == -1)
8566
                return -1;
8567
            if (rb_thread_interrupted(stp->th))
8616
            if (maygvl_copy_stream_wait_readwrite(0, stp) == -1)
8568 8617
                return -1;
8569 8618
            goto retry_sendfile;
8570 8619
        }
......
8577 8626
#endif
8578 8627

  
8579 8628
static ssize_t
8580
maygvl_copy_stream_read(struct copy_stream_struct *stp, char *buf, size_t len, off_t offset)
8629
maygvl_read(int has_gvl, int fd, void *buf, size_t count)
8630
{
8631
    if (has_gvl)
8632
        return rb_read_internal(fd, buf, count);
8633
    else
8634
        return read(fd, buf, count);
8635
}
8636

  
8637
static ssize_t
8638
maygvl_copy_stream_read(int has_gvl, struct copy_stream_struct *stp, char *buf, size_t len, off_t offset)
8581 8639
{
8582 8640
    ssize_t ss;
8583 8641
  retry_read:
8584
    if (offset == (off_t)-1)
8585
        ss = read(stp->src_fd, buf, len);
8642
    if (offset == (off_t)-1) {
8643
        ss = maygvl_read(has_gvl, stp->src_fd, buf, len);
8644
    }
8586 8645
    else {
8587 8646
#ifdef HAVE_PREAD
8588 8647
        ss = pread(stp->src_fd, buf, len, offset);
......
8595 8654
        return 0;
8596 8655
    }
8597 8656
    if (ss == -1) {
8657
	if (maygvl_copy_stream_continue_p(has_gvl, stp))
8658
	    goto retry_read;
8598 8659
        switch (errno) {
8599 8660
	  case EAGAIN:
8600 8661
#if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN
8601 8662
	  case EWOULDBLOCK:
8602 8663
#endif
8603
            if (maygvl_copy_stream_wait_read(stp) == -1)
8664
            if (maygvl_copy_stream_wait_read(has_gvl, stp) == -1)
8604 8665
                return -1;
8605 8666
            goto retry_read;
8606 8667
#ifdef ENOSYS
......
8624 8685
    while (len) {
8625 8686
        ss = write(stp->dst_fd, buf+off, len);
8626 8687
        if (ss == -1) {
8688
	    if (maygvl_copy_stream_continue_p(0, stp))
8689
		continue;
8627 8690
            if (errno == EAGAIN || errno == EWOULDBLOCK) {
8628 8691
                if (nogvl_copy_stream_wait_write(stp) == -1)
8629 8692
                    return -1;
......
8678 8741
            len = sizeof(buf);
8679 8742
        }
8680 8743
        if (use_pread) {
8681
            ss = maygvl_copy_stream_read(stp, buf, len, src_offset);
8744
            ss = maygvl_copy_stream_read(0, stp, buf, len, src_offset);
8682 8745
            if (0 < ss)
8683 8746
                src_offset += ss;
8684 8747
        }
8685 8748
        else {
8686
            ss = maygvl_copy_stream_read(stp, buf, len, (off_t)-1);
8749
            ss = maygvl_copy_stream_read(0, stp, buf, len, (off_t)-1);
8687 8750
        }
8688 8751
        if (ss <= 0) /* EOF or error */
8689 8752
            return;
......
8694 8757

  
8695 8758
        if (!use_eof)
8696 8759
            copy_length -= ss;
8697

  
8698
        if (rb_thread_interrupted(stp->th))
8699
            return;
8700 8760
    }
8701 8761
}
8702 8762

  
......
8757 8817
            ssize_t ss;
8758 8818
            rb_thread_wait_fd(stp->src_fd);
8759 8819
            rb_str_resize(buf, buflen);
8760
            ss = maygvl_copy_stream_read(stp, RSTRING_PTR(buf), l, off);
8820
            ss = maygvl_copy_stream_read(1, stp, RSTRING_PTR(buf), l, off);
8761 8821
            if (ss == -1)
8762 8822
                return Qnil;
8763 8823
            if (ss == 0)
test/ruby/test_io.rb (working copy)
78 78
    }
79 79
  end
80 80

  
81
  def trapping_usr1
82
    @usr1_rcvd  = 0
83
    trap(:USR1) { @usr1_rcvd += 1 }
84
    yield
85
    ensure
86
      trap(:USR1, "DEFAULT")
87
  end
88

  
81 89
  def test_pipe
82 90
    r, w = IO.pipe
83 91
    assert_instance_of(IO, r)
......
594 602
          result = t.value
595 603
          assert_equal(megacontent, result)
596 604
        }
605
        with_socketpair {|s1, s2|
606
          begin
607
            s1.nonblock = true
608
          rescue Errno::EBADF
609
            skip "nonblocking IO for pipe is not implemented"
610
          end
611
          trapping_usr1 do
612
            nr = 10
613
            pid = fork do
614
              s1.close
615
              IO.select([s2])
616
              Process.kill(:USR1, Process.ppid)
617
              s2.read
618
            end
619
            s2.close
620
            nr.times do
621
              assert_equal megacontent.bytesize, IO.copy_stream("megasrc", s1)
622
            end
623
            assert_equal(1, @usr1_rcvd)
624
            s1.close
625
            _, status = Process.waitpid2(pid)
626
            assert status.success?, status.inspect
627
          end
628
        }
597 629
      end
598 630
    }
599 631
  end