Index: lib/rinda/rinda.rb =================================================================== --- lib/rinda/rinda.rb (revision 39880) +++ lib/rinda/rinda.rb (working copy) @@ -206,6 +206,50 @@ module Rinda # TupleSpaceProxy allows a remote Tuplespace to appear as local. class TupleSpaceProxy + ## + # A Port ensures that a moved tuple arrives properly at its destination + # and does not get lost. + # + # See https://bugs.ruby-lang.org/issues/8125 + + class Port # :nodoc: + attr_reader :value + + def self.deliver + port = new + + begin + yield(port) + ensure + port.close + end + + port.value + end + + def initialize + @open = true + @value = nil + end + + ## + # Don't let the DRb thread push to it when remote sends tuple + + def close + @open = false + end + + ## + # Stores +value+ and ensure it does not get marshaled multiple times. + + def push value + raise 'port closed' unless @open + + @value = value + + nil # avoid Marshal + end + end ## # Creates a new TupleSpaceProxy to wrap +ts+. @@ -225,9 +269,9 @@ module Rinda # Takes +tuple+ from the proxied TupleSpace. See TupleSpace#take. def take(tuple, sec=nil, &block) - port = [] - @ts.move(DRbObject.new(port), tuple, sec, &block) - port[0] + Port.deliver do |port| + @ts.move(DRbObject.new(port), tuple, sec, &block) + end end ## Index: test/rinda/test_rinda.rb =================================================================== --- test/rinda/test_rinda.rb (revision 39880) +++ test/rinda/test_rinda.rb (working copy) @@ -477,7 +477,42 @@ class TupleSpaceProxyTest < Test::Unit:: @ts.take({'head' => 1, 'tail' => 2}, 0)) end - @server = DRb.primary_server || DRb.start_service + def test_take_bug_8215 + service = DRb.start_service(nil, @ts_base) + + uri = service.uri + + take = fork do + DRb.start_service + ro = DRbObject.new_with_uri(uri) + ts = Rinda::TupleSpaceProxy.new(ro) + th = Thread.new do + ts.take([:test_take, nil]) + end + Kernel.sleep(0.1) + th.raise(Interrupt) # causes loss of the taken tuple + ts.write([:barrier, :continue]) + Kernel.sleep + end + + @ts_base.take([:barrier, :continue]) + + write = fork do + DRb.start_service + ro = DRbObject.new_with_uri(uri) + ts = Rinda::TupleSpaceProxy.new(ro) + ts.write([:test_take, 42]) + end + + status = Process.wait(take) + + assert_equal([[:test_take, 42]], @ts_base.read_all([:test_take, nil]), + '[bug:8215] tuple lost') + ensure + Process.kill("TERM", take) if take && status.nil? + Process.kill("TERM", write) if write + service.stop_service + end end end