Feature #21121
openRactor channels
Description
Motivation:¶
It would be nice be able to Ractor.yield
in a non-blocking way. Right now a Ractor.yield
blocks until another ractor calls r.take
on the yielding ractor.
This is bad in the following scenario:
main = Ractor.current
rs = 10.times.map do
Ractor.new(main) do |m|
ret = []
loop do
# do a bunch of work that takes a while, then:
begin
obj = m.take
rescue Ractor::ClosedError
end
if obj
ret << obj
else
break
end
end
ret
end
end
50.times do |i|
Ractor.yield(i) # this will block until some ractor calls take on us, but it could be a while if there is processing before the `take` call.
end
main.close_outgoing
# Ideally, we could do some work in main ractor that takes some time while the other ractors do their processing. But we're blocking right now
# during all the calls to `Ractor.yield`.
# Finally, get the results
while rs.any?
r, obj = Ractor.select(*rs)
$stderr.puts "Ractor #{r} got #{obj}"
rs.delete(r)
end
I'd like other ractors to be able to do work in a "fire and forget" kind of way. It isn't possible right now due to the limitations of Ractor.yield
and Ractor#take
.
What we need is some kind of yield
buffer so the yielder doesn't block. I propose that Ractor channels would allow this type of programming to work.
Example using channels:¶
chan = Ractor::Channel.new # We could specify buffer size like Go or it could be dynamically growable
rs = 10.times.map do
Ractor.new(chan) do |c|
ret = []
loop do
obj, _closed = c.receive # returns `[nil, true]` if `c` is closed and its buffer is empty. It blocks if the buffer is empty and `c` is not closed.
if obj
ret << obj
else
break
end
end
ret
end
end
50.times do |i|
chan.send(i) # non-blocking, fills a buffer and only wakes a receiver if there is one
end
chan.close
# Do some processing while ractors do their work.
# Then, collect the results
while rs.any?
r, obj = Ractor.select(*rs)
puts "Ractor #{r} got #{obj}"
rs.delete(r)
end
With an API similar to this, we could have "fire and forget" processing with ractors.
Updated by tenderlovemaking (Aaron Patterson) 15 days ago
I think adding a channel object would be very helpful. I'm looking through shared data structures in Rails, trying to understand what we need to change for Ractor safety. One popular data structure is a Concurrent::Map, and there are various instances used as caches. In order to implement a similar data structure with Ractors I end up writing my own channel:
class Ractor::Channel
def self.new
Ractor.new do
loop do
Ractor.yield Ractor.receive
end
end
end
end
class Map
def initialize
@r = Ractor.new {
cache = { "int" => "integer", "bool" => "boolean" }
loop do
channel, key = Ractor.receive
val = cache[key] ||= "not found #{key}"
# Send back to requester
channel.send val
end
}
freeze
end
def [] key
c = Ractor::Channel.new
@r.send([c, key])
c.take
end
end
map = Map.new
p map["int"]
I would really like it if channels were built-in.
Updated by luke-gru (Luke Gruber) 11 days ago
I made a PoC branch here: https://github.com/luke-gru/ruby/commits/ractor_channels but it's not totally ready yet.
Updated by shan (Shannon Skipper) 6 days ago
This seems really handy. I wonder if it would be worth considering having an optional self-closing block variant?
Ractor::Channel.new do |channel|
# Automatically ensure `channel.close_incoming` and `channel.close_outgoing` on block close.
end
An example with a block (not implementing the buffer
argument) might look something like:
class Ractor
module Channel
def self.new(&block)
channel = Ractor.new do
loop do
Ractor.yield Ractor.receive
end
end
if block_given?
message = block.call(channel)
channel.close_incoming
channel.close_outgoing
return message
end
channel
end
end
end
class Map
def initialize
@reactor = Ractor.new do
cache = {}
loop do
channel, payload = Ractor.receive
channel << cache.public_send(*payload)
end
end
freeze
end
def []=(key, value)
pipe __method__, key, value
end
%i[[] delete key? value?].each do |meth|
define_method(meth) { |key| pipe meth, key }
end
%i[clear empty? keys length size to_a to_h values].each do |meth|
define_method(meth) { pipe meth }
end
private
def pipe(*payload)
Ractor::Channel.new do |channel|
@reactor << [channel, payload]
channel.take
end
end
end