Project

General

Profile

Actions

Feature #21121

closed

Ractor channels

Feature #21121: Ractor channels
2

Added by luke-gru (Luke Gruber) about 1 year ago. Updated 11 months ago.

Status:
Closed
Assignee:
-
Target version:
-
[ruby-core:120906]

Description

Motivation:

It would be nice be able to Ractor.yield in a non-blocking way. Right now a Ractor.yield blocks until another ractor calls r.take on the yielding ractor.
This is bad in the following scenario:

main = Ractor.current

rs = 10.times.map do
  Ractor.new(main) do |m|
    ret = []
    loop do
      # do a bunch of work that takes a while, then:
      begin
        obj = m.take
      rescue Ractor::ClosedError
      end
      if obj
        ret << obj
      else
        break
      end
    end
    ret
  end
end

50.times do |i|
  Ractor.yield(i) # this will block until some ractor calls take on us, but it could be a while if there is processing before the `take` call.
end
main.close_outgoing

# Ideally, we could do some work in main ractor that takes some time while the other ractors do their processing. But we're blocking right now
# during all the calls to `Ractor.yield`.

# Finally, get the results
while rs.any?
  r, obj = Ractor.select(*rs)
  $stderr.puts "Ractor #{r} got #{obj}"
  rs.delete(r)
end

I'd like other ractors to be able to do work in a "fire and forget" kind of way. It isn't possible right now due to the limitations of Ractor.yield and Ractor#take.
What we need is some kind of yield buffer so the yielder doesn't block. I propose that Ractor channels would allow this type of programming to work.

Example using channels:

chan = Ractor::Channel.new # We could specify buffer size like Go or it could be dynamically growable

rs = 10.times.map do
  Ractor.new(chan) do |c|
    ret = []
    loop do
      obj, _closed = c.receive # returns `[nil, true]` if `c` is closed and its buffer is empty. It blocks if the buffer is empty and `c` is not closed.
      if obj
        ret << obj
      else
        break
      end
    end
    ret
  end
end

50.times do |i|
  chan.send(i) # non-blocking, fills a buffer and only wakes a receiver if there is one
end
chan.close

# Do some processing while ractors do their work.

# Then, collect the results
while rs.any?
  r, obj = Ractor.select(*rs)
  puts "Ractor #{r} got #{obj}"
  rs.delete(r)
end

With an API similar to this, we could have "fire and forget" processing with ractors.


Related issues 1 (0 open1 closed)

Related to Ruby - Feature #21262: Proposal: `Ractor::Port`Closedko1 (Koichi Sasada)Actions

Updated by tenderlovemaking (Aaron Patterson) about 1 year ago Actions #1 [ruby-core:120910]

I think adding a channel object would be very helpful. I'm looking through shared data structures in Rails, trying to understand what we need to change for Ractor safety. One popular data structure is a Concurrent::Map, and there are various instances used as caches. In order to implement a similar data structure with Ractors I end up writing my own channel:

class Ractor::Channel
  def self.new
    Ractor.new do
      loop do
        Ractor.yield Ractor.receive
      end
    end
  end
end

class Map
  def initialize
    @r = Ractor.new {
      cache = { "int" => "integer", "bool" => "boolean" }
      loop do
        channel, key = Ractor.receive

        val = cache[key] ||= "not found #{key}"

        # Send back to requester
        channel.send val
      end
    }

    freeze
  end

  def [] key
    c = Ractor::Channel.new
    @r.send([c, key])
    c.take
  end
end

map = Map.new
p map["int"]

I would really like it if channels were built-in.

Updated by luke-gru (Luke Gruber) about 1 year ago Actions #2 [ruby-core:120935]

I made a PoC branch here: https://github.com/luke-gru/ruby/commits/ractor_channels but it's not totally ready yet.

Updated by shan (Shannon Skipper) about 1 year ago Actions #3 [ruby-core:121073]

This seems really handy. I wonder if it would be worth considering having an optional self-closing block variant?

Ractor::Channel.new do |channel|
  # Automatically ensure `channel.close_incoming` and `channel.close_outgoing` on block close.
end

An example with a block (not implementing the buffer argument) might look something like:

class Ractor
  module Channel
    def self.new(&block)
      channel = Ractor.new do
        loop do
          Ractor.yield Ractor.receive
        end
      end

      if block_given?
        message = block.call(channel)

        channel.close_incoming
        channel.close_outgoing

        return message
      end

      channel
    end
  end
end

class Map
  def initialize
    @reactor = Ractor.new do
      cache = {}

      loop do
        channel, payload = Ractor.receive
        channel << cache.public_send(*payload)
      end
    end

    freeze
  end

  def []=(key, value)
    pipe __method__, key, value
  end

  %i[[] delete key? value?].each do |meth|
    define_method(meth) { |key| pipe meth, key }
  end

  %i[clear empty? keys length size to_a to_h values].each do |meth|
    define_method(meth) { pipe meth }
  end

  private

  def pipe(*payload)
    Ractor::Channel.new do |channel|
      @reactor << [channel, payload]
      channel.take
    end
  end
end

Updated by luke-gru (Luke Gruber) 12 months ago Actions #4 [ruby-core:121303]

Ruby internals uses this same trick to simplify getting require calls to work in non-main ractors. The code is here: https://github.com/ruby/ruby/blob/master/ractor.c#L4009. It simply starts a new thread on the main ractor, calls the require there and then sends a message over that pseudo-channel to inform the calling ractor that it's done. I don't know if that was where you got the idea or great minds just think alike :)

Obviously it's a bit of a hack and is less performant than if there were proper channels in the language, but it gets the job done for now.

Updated by ko1 (Koichi Sasada) 11 months ago Actions #5

Updated by matz (Yukihiro Matsumoto) 11 months ago Actions #6 [ruby-core:121656]

  • Status changed from Open to Closed

Superseded by #21266

Actions

Also available in: PDF Atom