Project

General

Profile

Actions

Feature #21121

open

Ractor channels

Added by luke-gru (Luke Gruber) 15 days ago. Updated 6 days ago.

Status:
Open
Assignee:
-
Target version:
-
[ruby-core:120906]

Description

Motivation:

It would be nice be able to Ractor.yield in a non-blocking way. Right now a Ractor.yield blocks until another ractor calls r.take on the yielding ractor.
This is bad in the following scenario:

main = Ractor.current

rs = 10.times.map do
  Ractor.new(main) do |m|
    ret = []
    loop do
      # do a bunch of work that takes a while, then:
      begin
        obj = m.take
      rescue Ractor::ClosedError
      end
      if obj
        ret << obj
      else
        break
      end
    end
    ret
  end
end

50.times do |i|
  Ractor.yield(i) # this will block until some ractor calls take on us, but it could be a while if there is processing before the `take` call.
end
main.close_outgoing

# Ideally, we could do some work in main ractor that takes some time while the other ractors do their processing. But we're blocking right now
# during all the calls to `Ractor.yield`.

# Finally, get the results
while rs.any?
  r, obj = Ractor.select(*rs)
  $stderr.puts "Ractor #{r} got #{obj}"
  rs.delete(r)
end

I'd like other ractors to be able to do work in a "fire and forget" kind of way. It isn't possible right now due to the limitations of Ractor.yield and Ractor#take.
What we need is some kind of yield buffer so the yielder doesn't block. I propose that Ractor channels would allow this type of programming to work.

Example using channels:

chan = Ractor::Channel.new # We could specify buffer size like Go or it could be dynamically growable

rs = 10.times.map do
  Ractor.new(chan) do |c|
    ret = []
    loop do
      obj, _closed = c.receive # returns `[nil, true]` if `c` is closed and its buffer is empty. It blocks if the buffer is empty and `c` is not closed.
      if obj
        ret << obj
      else
        break
      end
    end
    ret
  end
end

50.times do |i|
  chan.send(i) # non-blocking, fills a buffer and only wakes a receiver if there is one
end
chan.close

# Do some processing while ractors do their work.

# Then, collect the results
while rs.any?
  r, obj = Ractor.select(*rs)
  puts "Ractor #{r} got #{obj}"
  rs.delete(r)
end

With an API similar to this, we could have "fire and forget" processing with ractors.

Updated by tenderlovemaking (Aaron Patterson) 15 days ago

I think adding a channel object would be very helpful. I'm looking through shared data structures in Rails, trying to understand what we need to change for Ractor safety. One popular data structure is a Concurrent::Map, and there are various instances used as caches. In order to implement a similar data structure with Ractors I end up writing my own channel:

class Ractor::Channel
  def self.new
    Ractor.new do
      loop do
        Ractor.yield Ractor.receive
      end
    end
  end
end

class Map
  def initialize
    @r = Ractor.new {
      cache = { "int" => "integer", "bool" => "boolean" }
      loop do
        channel, key = Ractor.receive

        val = cache[key] ||= "not found #{key}"

        # Send back to requester
        channel.send val
      end
    }

    freeze
  end

  def [] key
    c = Ractor::Channel.new
    @r.send([c, key])
    c.take
  end
end

map = Map.new
p map["int"]

I would really like it if channels were built-in.

Updated by luke-gru (Luke Gruber) 11 days ago

I made a PoC branch here: https://github.com/luke-gru/ruby/commits/ractor_channels but it's not totally ready yet.

Updated by shan (Shannon Skipper) 6 days ago

This seems really handy. I wonder if it would be worth considering having an optional self-closing block variant?

Ractor::Channel.new do |channel|
  # Automatically ensure `channel.close_incoming` and `channel.close_outgoing` on block close.
end

An example with a block (not implementing the buffer argument) might look something like:

class Ractor
  module Channel
    def self.new(&block)
      channel = Ractor.new do
        loop do
          Ractor.yield Ractor.receive
        end
      end

      if block_given?
        message = block.call(channel)

        channel.close_incoming
        channel.close_outgoing

        return message
      end

      channel
    end
  end
end

class Map
  def initialize
    @reactor = Ractor.new do
      cache = {}

      loop do
        channel, payload = Ractor.receive
        channel << cache.public_send(*payload)
      end
    end

    freeze
  end

  def []=(key, value)
    pipe __method__, key, value
  end

  %i[[] delete key? value?].each do |meth|
    define_method(meth) { |key| pipe meth, key }
  end

  %i[clear empty? keys length size to_a to_h values].each do |meth|
    define_method(meth) { pipe meth }
  end

  private

  def pipe(*payload)
    Ractor::Channel.new do |channel|
      @reactor << [channel, payload]
      channel.take
    end
  end
end
Actions

Also available in: Atom PDF

Like1
Like0Like0Like0