copy_stream_interrupt_handling-2.patch

Akira Tanaka, 06/02/2011 08:20 PM

Download (9.42 KB)

View differences:

vm_core.h (working copy)
656 656
void rb_thread_start_timer_thread(void);
657 657
void rb_thread_stop_timer_thread(void);
658 658
void rb_thread_reset_timer_thread(void);
659
void *rb_thread_call_with_gvl(void *(*func)(void *), void *data1);
660 659
int ruby_thread_has_gvl_p(void);
661 660
VALUE rb_make_backtrace(void);
662 661
typedef int rb_backtrace_iter_func(void *, VALUE, int, VALUE);
io.c (working copy)
14 14
#include "ruby/ruby.h"
15 15
#include "ruby/io.h"
16 16
#include "dln.h"
17
#include "internal.h"
17 18
#include <ctype.h>
18 19
#include <errno.h>
19 20

  
......
8513 8514
    VALUE th;
8514 8515
};
8515 8516

  
8517
static void *
8518
exec_interrupts(void *arg)
8519
{
8520
    VALUE th = (VALUE)arg;
8521
    rb_thread_execute_interrupts(th);
8522
    return NULL;
8523
}
8524

  
8525
/*
8526
 * returns TRUE if the preceding system call was interrupted
8527
 * so we can continue.  If the thread was interrupted, we
8528
 * reacquire the GVL to execute interrupts before continuing.
8529
 */
8516 8530
static int
8517
maygvl_copy_stream_wait_read(struct copy_stream_struct *stp)
8531
maygvl_copy_stream_continue_p(int has_gvl, struct copy_stream_struct *stp)
8532
{
8533
    switch (errno) {
8534
      case EINTR:
8535
#if defined(ERESTART)
8536
      case ERESTART:
8537
#endif
8538
	if (rb_thread_interrupted(stp->th))
8539
            if (has_gvl)
8540
                rb_thread_execute_interrupts(stp->th);
8541
            else
8542
                rb_thread_call_with_gvl(exec_interrupts, (void *)stp->th);
8543
	return TRUE;
8544
    }
8545
    return FALSE;
8546
}
8547

  
8548
static int
8549
maygvl_select(int has_gvl, int n, rb_fdset_t *rfds, rb_fdset_t *wfds, rb_fdset_t *efds, struct timeval *timeout)
8550
{
8551
    if (has_gvl)
8552
        return rb_thread_fd_select(n, rfds, wfds, efds, timeout);
8553
    else
8554
        return rb_fd_select(n, rfds, wfds, efds, timeout);
8555
}
8556

  
8557
static int
8558
maygvl_copy_stream_wait_read(int has_gvl, struct copy_stream_struct *stp)
8518 8559
{
8519 8560
    int ret;
8520
    rb_fd_zero(&stp->fds);
8521
    rb_fd_set(stp->src_fd, &stp->fds);
8522
    ret = rb_fd_select(rb_fd_max(&stp->fds), &stp->fds, NULL, NULL, NULL);
8561

  
8562
    do {
8563
	rb_fd_zero(&stp->fds);
8564
	rb_fd_set(stp->src_fd, &stp->fds);
8565
        ret = maygvl_select(has_gvl, rb_fd_max(&stp->fds), &stp->fds, NULL, NULL, NULL);
8566
    } while (ret == -1 && maygvl_copy_stream_continue_p(has_gvl, stp));
8567

  
8523 8568
    if (ret == -1) {
8524 8569
        stp->syserr = "select";
8525 8570
        stp->error_no = errno;
......
8532 8577
nogvl_copy_stream_wait_write(struct copy_stream_struct *stp)
8533 8578
{
8534 8579
    int ret;
8535
    rb_fd_zero(&stp->fds);
8536
    rb_fd_set(stp->dst_fd, &stp->fds);
8537
    ret = rb_fd_select(rb_fd_max(&stp->fds), NULL, &stp->fds, NULL, NULL);
8580

  
8581
    do {
8582
	rb_fd_zero(&stp->fds);
8583
	rb_fd_set(stp->dst_fd, &stp->fds);
8584
        ret = rb_fd_select(rb_fd_max(&stp->fds), NULL, &stp->fds, NULL, NULL);
8585
    } while (ret == -1 && maygvl_copy_stream_continue_p(0, stp));
8586

  
8538 8587
    if (ret == -1) {
8539 8588
        stp->syserr = "select";
8540 8589
        stp->error_no = errno;
......
8596 8645

  
8597 8646
#ifdef USE_SENDFILE
8598 8647
static int
8599
maygvl_copy_stream_wait_readwrite(struct copy_stream_struct *stp)
8648
maygvl_copy_stream_wait_readwrite(int has_gvl, struct copy_stream_struct *stp)
8600 8649
{
8601 8650
    int ret;
8602 8651
    rb_fd_zero(&stp->fds);
8603 8652
    rb_fd_set(stp->src_fd, &stp->fds);
8604 8653
    rb_fd_set(stp->dst_fd, &stp->fds);
8605
    ret = rb_fd_select(rb_fd_max(&stp->fds), &stp->fds, NULL, NULL, NULL);
8654
    ret = maygvl_select(has_gvl, rb_fd_max(&stp->fds), &stp->fds, NULL, NULL, NULL);
8606 8655
    if (ret == -1) {
8607 8656
        stp->syserr = "select";
8608 8657
        stp->error_no = errno;
......
8681 8730
        }
8682 8731
    }
8683 8732
    if (ss == -1) {
8733
	if (maygvl_copy_stream_continue_p(0, stp))
8734
	    goto retry_sendfile;
8684 8735
        switch (errno) {
8685 8736
	  case EINVAL:
8686 8737
#ifdef ENOSYS
......
8691 8742
#if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN
8692 8743
	  case EWOULDBLOCK:
8693 8744
#endif
8694
            if (maygvl_copy_stream_wait_readwrite(stp) == -1)
8695
                return -1;
8696
            if (rb_thread_interrupted(stp->th))
8745
            if (maygvl_copy_stream_wait_readwrite(0, stp) == -1)
8697 8746
                return -1;
8698 8747
            goto retry_sendfile;
8699 8748
        }
......
8706 8755
#endif
8707 8756

  
8708 8757
static ssize_t
8709
maygvl_copy_stream_read(struct copy_stream_struct *stp, char *buf, size_t len, off_t offset)
8758
maygvl_read(int has_gvl, int fd, void *buf, size_t count)
8759
{
8760
    if (has_gvl)
8761
        return rb_read_internal(fd, buf, count);
8762
    else
8763
        return read(fd, buf, count);
8764
}
8765

  
8766
static ssize_t
8767
maygvl_copy_stream_read(int has_gvl, struct copy_stream_struct *stp, char *buf, size_t len, off_t offset)
8710 8768
{
8711 8769
    ssize_t ss;
8712 8770
  retry_read:
8713
    if (offset == (off_t)-1)
8714
        ss = read(stp->src_fd, buf, len);
8771
    if (offset == (off_t)-1) {
8772
        ss = maygvl_read(has_gvl, stp->src_fd, buf, len);
8773
    }
8715 8774
    else {
8716 8775
#ifdef HAVE_PREAD
8717 8776
        ss = pread(stp->src_fd, buf, len, offset);
......
8724 8783
        return 0;
8725 8784
    }
8726 8785
    if (ss == -1) {
8786
	if (maygvl_copy_stream_continue_p(has_gvl, stp))
8787
	    goto retry_read;
8727 8788
        switch (errno) {
8728 8789
	  case EAGAIN:
8729 8790
#if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN
8730 8791
	  case EWOULDBLOCK:
8731 8792
#endif
8732
            if (maygvl_copy_stream_wait_read(stp) == -1)
8793
            if (maygvl_copy_stream_wait_read(has_gvl, stp) == -1)
8733 8794
                return -1;
8734 8795
            goto retry_read;
8735 8796
#ifdef ENOSYS
......
8753 8814
    while (len) {
8754 8815
        ss = write(stp->dst_fd, buf+off, len);
8755 8816
        if (ss == -1) {
8817
	    if (maygvl_copy_stream_continue_p(0, stp))
8818
		continue;
8756 8819
            if (errno == EAGAIN || errno == EWOULDBLOCK) {
8757 8820
                if (nogvl_copy_stream_wait_write(stp) == -1)
8758 8821
                    return -1;
......
8807 8870
            len = sizeof(buf);
8808 8871
        }
8809 8872
        if (use_pread) {
8810
            ss = maygvl_copy_stream_read(stp, buf, len, src_offset);
8873
            ss = maygvl_copy_stream_read(0, stp, buf, len, src_offset);
8811 8874
            if (0 < ss)
8812 8875
                src_offset += ss;
8813 8876
        }
8814 8877
        else {
8815
            ss = maygvl_copy_stream_read(stp, buf, len, (off_t)-1);
8878
            ss = maygvl_copy_stream_read(0, stp, buf, len, (off_t)-1);
8816 8879
        }
8817 8880
        if (ss <= 0) /* EOF or error */
8818 8881
            return;
......
8823 8886

  
8824 8887
        if (!use_eof)
8825 8888
            copy_length -= ss;
8826

  
8827
        if (rb_thread_interrupted(stp->th))
8828
            return;
8829 8889
    }
8830 8890
}
8831 8891

  
......
8886 8946
            ssize_t ss;
8887 8947
            rb_thread_wait_fd(stp->src_fd);
8888 8948
            rb_str_resize(buf, buflen);
8889
            ss = maygvl_copy_stream_read(stp, RSTRING_PTR(buf), l, off);
8949
            ss = maygvl_copy_stream_read(1, stp, RSTRING_PTR(buf), l, off);
8890 8950
            if (ss == -1)
8891 8951
                return Qnil;
8892 8952
            if (ss == 0)
thread.c (working copy)
46 46

  
47 47
#include "eval_intern.h"
48 48
#include "gc.h"
49
#include "internal.h"
49 50
#include "ruby/io.h"
50 51

  
51 52
#ifndef USE_NATIVE_THREAD_PRIORITY
......
1359 1360
}
1360 1361

  
1361 1362
void
1363
rb_thread_execute_interrupts(VALUE th)
1364
{
1365
    rb_threadptr_execute_interrupts_rec((rb_thread_t *)th, 0);
1366
}
1367

  
1368
void
1362 1369
rb_gc_mark_threads(void)
1363 1370
{
1364 1371
    rb_bug("deprecated function rb_gc_mark_threads is called");
internal.h (working copy)
27 27

  
28 28
VALUE rb_big_uminus(VALUE x);
29 29

  
30
void rb_thread_execute_interrupts(VALUE th);
31
void *rb_thread_call_with_gvl(void *(*func)(void *), void *data1);
32

  
30 33
#if defined(__cplusplus)
31 34
#if 0
32 35
{ /* satisfy cc-mode */
test/ruby/test_io.rb (working copy)
78 78
    }
79 79
  end
80 80

  
81
  def trapping_usr1
82
    @usr1_rcvd  = 0
83
    trap(:USR1) { @usr1_rcvd += 1 }
84
    yield
85
    ensure
86
      trap(:USR1, "DEFAULT")
87
  end
88

  
81 89
  def test_pipe
82 90
    r, w = IO.pipe
83 91
    assert_instance_of(IO, r)
......
594 602
          result = t.value
595 603
          assert_equal(megacontent, result)
596 604
        }
605
        with_socketpair {|s1, s2|
606
          begin
607
            s1.nonblock = true
608
          rescue Errno::EBADF
609
            skip "nonblocking IO for pipe is not implemented"
610
          end
611
          trapping_usr1 do
612
            nr = 10
613
            pid = fork do
614
              s1.close
615
              IO.select([s2])
616
              Process.kill(:USR1, Process.ppid)
617
              s2.read
618
            end
619
            s2.close
620
            nr.times do
621
              assert_equal megacontent.bytesize, IO.copy_stream("megasrc", s1)
622
            end
623
            assert_equal(1, @usr1_rcvd)
624
            s1.close
625
            _, status = Process.waitpid2(pid)
626
            assert status.success?, status.inspect
627
          end
628
        }
597 629
      end
598 630
    }
599 631
  end