zlib.release_gvl.2.patch

Move entire run loop outside GVL - Eric Hodel, 06/21/2012 03:56 PM

Download (5.3 KB)

View differences:

ext/zlib/zlib.c (working copy)
72 72

  
73 73
struct zstream;
74 74
struct zstream_funcs;
75
struct zstream_run_args;
75 76
static void zstream_init(struct zstream*, const struct zstream_funcs*);
76 77
static void zstream_expand_buffer(struct zstream*);
77 78
static void zstream_expand_buffer_into(struct zstream*, unsigned long);
......
564 565
    inflateReset, inflateEnd, inflate,
565 566
};
566 567

  
568
struct zstream_run_args {
569
    struct zstream * z;
570
    int flush;
571
    int interrupt;
572
};
567 573

  
568 574
static voidpf
569 575
zlib_mem_alloc(voidpf opaque, uInt items, uInt size)
......
655 661
    }
656 662
}
657 663

  
664
static int
665
zstream_expand_buffer_without_gvl(struct zstream *z)
666
{
667
    char * new_str;
668
    long inc;
669

  
670
    if (RSTRING_LEN(z->buf) - z->buf_filled >= ZSTREAM_AVAIL_OUT_STEP_MAX) {
671
	/* to keep other threads from freezing */
672
	z->stream.avail_out = ZSTREAM_AVAIL_OUT_STEP_MAX;
673
    }
674
    else {
675
	inc = z->buf_filled / 2;
676
	if (inc < ZSTREAM_AVAIL_OUT_STEP_MIN) {
677
	    inc = ZSTREAM_AVAIL_OUT_STEP_MIN;
678
	}
679

  
680
	new_str = realloc(RSTRING(z->buf)->as.heap.ptr, z->buf_filled + inc);
681

  
682
	if (!new_str)
683
	    return 0;
684

  
685
	RSTRING(z->buf)->as.heap.ptr = new_str;
686
	RSTRING(z->buf)->as.heap.len =
687
	    RSTRING(z->buf)->as.heap.aux.capa = z->buf_filled + inc;
688

  
689
	z->stream.avail_out = (inc < ZSTREAM_AVAIL_OUT_STEP_MAX) ?
690
	    (int)inc : ZSTREAM_AVAIL_OUT_STEP_MAX;
691
    }
692
    z->stream.next_out = (Bytef*)RSTRING_PTR(z->buf) + z->buf_filled;
693

  
694
    return 1;
695
}
696

  
658 697
static void
659 698
zstream_append_buffer(struct zstream *z, const Bytef *src, long len)
660 699
{
......
871 910
    return Qnil;
872 911
}
873 912

  
913
static VALUE
914
zstream_run_func(void *ptr) {
915
    struct zstream_run_args *args = (struct zstream_run_args *)ptr;
916
    int err, flush = args->flush;
917
    struct zstream *z = args->z;
918
    uInt n;
919

  
920
    while (!args->interrupt) {
921
	n = z->stream.avail_out;
922
	err = z->func->run(&z->stream, flush);
923
	z->buf_filled += n - z->stream.avail_out;
924

  
925
	if (err == Z_STREAM_END) {
926
	    z->flags &= ~ZSTREAM_FLAG_IN_STREAM;
927
	    z->flags |= ZSTREAM_FLAG_FINISHED;
928
	    break;
929
	}
930

  
931
	if (err != Z_OK)
932
	    break;
933

  
934
	if (z->stream.avail_out > 0) {
935
	    z->flags |= ZSTREAM_FLAG_IN_STREAM;
936
	    break;
937
	}
938

  
939
	if (!zstream_expand_buffer_without_gvl(z)) {
940
	    err = Z_MEM_ERROR; /* realloc failed */
941
	    break;
942
	}
943
    }
944

  
945
    return (VALUE)err;
946
}
947

  
948
/*
949
 * There is no safe way to interrupt z->run->func().
950
 */
951
static void
952
zstream_unblock_func(void *ptr) {
953
    struct zstream_run_args *args = (struct zstream_run_args *)ptr;
954

  
955
    args->interrupt = 1;
956
}
957

  
874 958
static void
875 959
zstream_run(struct zstream *z, Bytef *src, long len, int flush)
876 960
{
877
    uInt n;
961
    struct zstream_run_args args;
878 962
    int err;
879 963
    volatile VALUE guard = Qnil;
880 964

  
965
    args.z = z;
966
    args.flush = flush;
967
    args.interrupt = 0;
968

  
881 969
    if (NIL_P(z->input) && len == 0) {
882 970
	z->stream.next_in = (Bytef*)"";
883 971
	z->stream.avail_in = 0;
......
896 984
	zstream_expand_buffer(z);
897 985
    }
898 986

  
899
    for (;;) {
900
	/* VC allocates err and guard to same address.  accessing err and guard
901
	   in same scope prevents it. */
902
	RB_GC_GUARD(guard);
903
	n = z->stream.avail_out;
904
	err = z->func->run(&z->stream, flush);
905
	z->buf_filled += n - z->stream.avail_out;
906
	rb_thread_schedule();
987
loop:
988
    err = (int)rb_thread_blocking_region(
989
	    zstream_run_func, (void *)&args,
990
	    zstream_unblock_func, (void *)&args);
991

  
992
    if (flush != Z_FINISH && err == Z_BUF_ERROR
993
	    && z->stream.avail_out > 0) {
994
	z->flags |= ZSTREAM_FLAG_IN_STREAM;
995
    }
907 996

  
908
	if (err == Z_STREAM_END) {
909
	    z->flags &= ~ZSTREAM_FLAG_IN_STREAM;
910
	    z->flags |= ZSTREAM_FLAG_FINISHED;
911
	    break;
997
    zstream_reset_input(z);
998

  
999
    if (err != Z_OK && err != Z_STREAM_END) {
1000
	if (z->stream.avail_in > 0) {
1001
	    zstream_append_input(z, z->stream.next_in, z->stream.avail_in);
912 1002
	}
913
	if (err != Z_OK) {
914
	    if (flush != Z_FINISH && err == Z_BUF_ERROR
915
		&& z->stream.avail_out > 0) {
916
		z->flags |= ZSTREAM_FLAG_IN_STREAM;
917
		break;
1003
	if (err == Z_NEED_DICT) {
1004
	    VALUE self = (VALUE)z->stream.opaque;
1005
	    VALUE dicts = rb_ivar_get(self, id_dictionaries);
1006
	    VALUE dict = rb_hash_aref(dicts, rb_uint2inum(z->stream.adler));
1007
	    if (!NIL_P(dict)) {
1008
		rb_inflate_set_dictionary(self, dict);
1009
		goto loop;
918 1010
	    }
919
	    zstream_reset_input(z);
920
	    if (z->stream.avail_in > 0) {
921
		zstream_append_input(z, z->stream.next_in, z->stream.avail_in);
922
	    }
923
	    if (err == Z_NEED_DICT) {
924
		VALUE self = (VALUE)z->stream.opaque;
925
		VALUE dicts = rb_ivar_get(self, id_dictionaries);
926
		VALUE dict = rb_hash_aref(dicts, rb_uint2inum(z->stream.adler));
927
		if (!NIL_P(dict)) {
928
		    rb_inflate_set_dictionary(self, dict);
929
		    continue;
930
		}
931
	    }
932
	    raise_zlib_error(err, z->stream.msg);
933
	}
934
	if (z->stream.avail_out > 0) {
935
	    z->flags |= ZSTREAM_FLAG_IN_STREAM;
936
	    break;
937 1011
	}
938
	zstream_expand_buffer(z);
1012
	raise_zlib_error(err, z->stream.msg);
939 1013
    }
940 1014

  
941
    zstream_reset_input(z);
942 1015
    if (z->stream.avail_in > 0) {
943 1016
	zstream_append_input(z, z->stream.next_in, z->stream.avail_in);
944 1017
        guard = Qnil; /* prevent tail call to make guard effective */