Bug #8125 » rinda.rb.8215.patch
lib/rinda/rinda.rb (working copy) | ||
---|---|---|
# TupleSpaceProxy allows a remote Tuplespace to appear as local.
|
||
class TupleSpaceProxy
|
||
##
|
||
# A Port ensures that a moved tuple arrives properly at its destination
|
||
# and does not get lost.
|
||
#
|
||
# See https://bugs.ruby-lang.org/issues/8125
|
||
class Port # :nodoc:
|
||
attr_reader :value
|
||
def self.deliver
|
||
port = new
|
||
begin
|
||
yield port
|
||
ensure
|
||
port.close
|
||
end
|
||
port.value
|
||
end
|
||
def initialize
|
||
@open = true
|
||
@value = nil
|
||
end
|
||
##
|
||
# Don't let the DRb thread push to it when remote sends tuple
|
||
def close
|
||
@open = false
|
||
end
|
||
##
|
||
# Stores +value+ and ensure it does not get marshaled multiple times.
|
||
def push value
|
||
raise 'port closed' unless @open
|
||
@value = value
|
||
nil # avoid Marshal
|
||
end
|
||
end
|
||
##
|
||
# Creates a new TupleSpaceProxy to wrap +ts+.
|
||
... | ... | |
# Takes +tuple+ from the proxied TupleSpace. See TupleSpace#take.
|
||
def take(tuple, sec=nil, &block)
|
||
port = []
|
||
@ts.move(DRbObject.new(port), tuple, sec, &block)
|
||
port[0]
|
||
Port.deliver do |port|
|
||
@ts.move(DRbObject.new(port), tuple, sec, &block)
|
||
end
|
||
end
|
||
##
|
test/rinda/test_rinda.rb (working copy) | ||
---|---|---|
@ts.take({'head' => 1, 'tail' => 2}, 0))
|
||
end
|
||
@server = DRb.primary_server || DRb.start_service
|
||
def test_take_bug_8215
|
||
service = DRb.start_service(nil, @ts_base)
|
||
uri = service.uri
|
||
c1 = fork do
|
||
DRb.start_service
|
||
ro = DRbObject.new_with_uri(uri)
|
||
ts = Rinda::TupleSpaceProxy.new(ro)
|
||
th = Thread.new do
|
||
p :waiting
|
||
ts.take([:test_take, nil])
|
||
end
|
||
Kernel.sleep 0.1
|
||
th.raise Interrupt # causes loss of the taken tuple
|
||
ts.write([:barrier, :continue])
|
||
Kernel.sleep
|
||
end
|
||
@ts_base.take([:barrier, :continue])
|
||
c2 = fork do
|
||
DRb.start_service
|
||
ro = DRbObject.new_with_uri(uri)
|
||
ts = Rinda::TupleSpaceProxy.new(ro)
|
||
ts.write([:test_take, 42])
|
||
end
|
||
status = Process.wait c2
|
||
assert_equal [[:test_take, 42]], @ts_base.read_all([:test_take, nil]),
|
||
'[bug:8215] tuple lost'
|
||
ensure
|
||
Process.kill "TERM", c2 if c2 && status.nil?
|
||
Process.kill "TERM", c1 if c1
|
||
service.stop_service
|
||
end
|
||
end
|
||
end
|