Project

General

Profile

Actions

Feature #21262

open

Proposal: `Ractor::Port`

Added by ko1 (Koichi Sasada) 4 days ago. Updated 2 days ago.

Status:
Assigned
Target version:
[ruby-core:121621]

Description

Proposal: Ractor::Port

In concurrent Ruby applications using Ractors, safely and efficiently communicating results between Ractors is a common challenge. We propose Ractor::Port as a lightweight, safe, and ergonomic abstraction to simplify communication patterns, especially in request-response models.

# usage example

port = Ractor::Port.new

Ractor.new port do |port|
  port << 42
  port << 43
end

# Success: wait for sending
port.receive #=> 42

Ractor.new port do |port|
  port.receive # Error: only the creator Ractor can receive from this port.
end

port.receive #=> 43

This is a similar concept to "Channel", but it is tightly coupled with the creator Ractor and no other Ractors can receive anything from that port.

In that sense, it is conceptually closer to a socket file descriptor (e.g., a destination and port number pair in TCP/IP).

We can implement Port with Ractor.receive_if like this:

class Ractor::Port
  def initialize
    @r = Ractor.current
    @tag = genid()
  end

  def send obj
    @r << [@tag, obj]
  end

  def receive
    raise unless @r == Ractor.current

    Ractor.receive_if do |(tag, result)
      if tag == @tag
        return result
      end
    end
  end
end

With Ractor::Port, we can deprecate Ractor.receive_if, Ractor.yield, and Ractor#take. Ports act as clear, self-contained endpoints for message passing, which makes these older primitives redundant. Furthermore, Port-based communication is significantly easier to implement and reason about—especially when considering synchronization challenges around Ractor.select and rendezvous semantics.

Background: Limitations of current communication patterns

Let's discuss how to make server-like service ractors.

No response server

We can make server-like Ractors like this:

# EX1

def fib(n) = n > 1 : fib(n-2) + fib(n-1) : 1

# A ractor calculate fib(n)
fib_srv = Ractor.new do
  while true
    param = Ractor.receive
    result = fib(param)
  end
end

fib_srv << 10

In this case, the main Ractor requests fib_srv to calculate fib(10).
However, currently, there is no way to retrieve the result.

Return value to the sender ractor

There are several possible approaches.

First, we can send the sender Ractor along with the parameter, and ask the server to send the result back to the sender.

# EX2

fib_srv = Ractor.new do
  while true
    param, sender = Ractor.receive
    result = fib(param)
    sender << result
  end
end

fib_srv << [10, Ractor.current]

do_some_work()

Ractor.receive #=> fib(10)

This approach works well in simple cases.

However, with EX2, handling multiple concurrent responses becomes difficult. The results are pushed into the same mailbox, and since Ractor.receive retrieves messages without discriminating the source, it's unclear which server returned which result.

# EX3

def fact(n) = n > 1 : fact(n-1) * n

fib_srv = Ractor.new do
  while true
    param, sender = Ractor.receive
    result = fib(param)
    sender << result
  end
end

fact_srv = Ractor.new do
  while true
    param, sender = Ractor.receive
    result = fact(param)
    sender << result
  end
end


fib_srv << [10, Ractor.current]
fib_srv << [20, Ractor.current]
fact_srv << [10, Ractor.current]
fact_srv << [20, Ractor.current]

do_some_work()

Ractor.receive
#=> fib(10) or fact(10), which?
#   If the servers uses Ractors more (calculate them in parallel),
#   fib(20) and fact(20) can be returned.

Because Ractor.receive retrieves all messages indiscriminately, developers must add their own tagging logic to distinguish results. While tagging (as shown in EX4) helps, it introduces additional complexity and brittleness.

Responses with request ID

The following code returns a result with request id (a pair of the name of server and a parameter).

# EX4

fib_srv = Ractor.new do
  while true
    param, sender = Ractor.receive
    result = fib(param)
    sender << [[:fib, param], result]
  end
end

fact_srv = Ractor.new do
  while true
    param, sender = Ractor.receive
    result = fact(param)
    sender << [[:fact, param], result]
  end
end

fib_srv << [10, Ractor.current]
fib_srv << [20, Ractor.current]
fact_srv << [10, Ractor.current]
fact_srv << [20, Ractor.current]

do_some_work()

Ractor.receive_if do |id, result|
  case id
  in [:fib, n]
    p "fib(#{n}) = #{result}"
  in [:fact, n]
    p "fact(#{n}) = #{result}"
  end
end

# or if you want to use specific results, like:

p fib20:  Ractor.receive_if{|id, result| id => [:fib, 20];  result}
p fact10: Ractor.receive_if{|id, result| id => [:fact, 10]; result}
p fact20: Ractor.receive_if{|id, result| id => [:fact, 20]; result}
p fib10:  Ractor.receive_if{|id, result| id => [:fib, 10];  result}

This approach closely resembles pattern matching in Erlang or Elixir, where responses are tagged and matched structurally.

However, this solution still has an issue: if do_some_work() uses Ractor.receive, it may accidentally consume any message. In other words, Ractor.receive can only be safely used when you're certain that no other code is using it.

(Another trivial issue is, different servers can return same identity, like [:fact, num] returned by NewsPaper server. It is confusing).

Using channels

To solve this issue, we can make a channel with different Ractors.

Channels can be implemented using Ractors, as illustrated below.

# EX5

# Servers are completely same to EX3

fib_srv = Ractor.new do
  while true
    param, sender = Ractor.receive
    result = fib(param)
    sender << result
  end
end

fact_srv = Ractor.new do
  while true
    param, sender = Ractor.receive
    result = fact(param)
    sender << result
  end
end

# Create a new channel using a Ractor
def new_channel
  Ractor.new do
    while true
      Ractor.yield Ractor.receive
    end
  end
end


fib_srv << [10, fib10_ch = new_channel]
fib_srv << [20, fib20_ch = new_channel]
fact_srv << [10, fact10_ch = new_channel]
fact_srv << [20, fact20_ch = new_channel]

do_some_work()

p fib20: fib20_ch.take   # wait for fib(20)
p fact10: fact10_ch.take # wait for fact(10)
p fib10: fib10_ch.take   # wait for fib(10)
p fact20: fact10_ch.take # wait for fact(20)

# or 
chs = [fib10_ch, fib20_ch, fact10_ch, fact20_ch]

while !chs.empty?
  ch, result = Ractor.select(*chs) # wait for multiple channels
  p ch, result
  chs.delete ch
end

Channel approach solves the issue of EX4. The above implementation introduce some overhead to create channel ractors, but we can introduce special implementation to reduce this Ractor creation overhead.

However, in the Actor model, the communication pattern is to send a message to a specific actor. In contrast, channels are used to send messages through a shared conduit, without caring which receiver (if any) handles the message. Also, channels can have some overhead, as discussed below.

Summary of background

Currently, when implementing request-response patterns with Ractors, developers face challenges in tracking results, managing identifiers, and avoiding message conflicts. Existing primitives like receive_if, take, or channels implemented with Ractors are either error-prone or inefficient.

Proposal

Introduce Ractor::Port as an alternative to channels. It is a natural extension of the Actor model. In fact, it is thin wrapper of current send/receive model as illustrated at the top of this proposal.

With the Ractor::Port, we can rewrite above examples with it.

# EX6

# Completely same as EX3's servers

fib_srv = Ractor.new do
  while true
    param, sender = Ractor.receive
    result = fib(param)
    sender << result
  end
end

fact_srv = Ractor.new do
  while true
    param, sender = Ractor.receive
    result = fact(param)
    sender << result
  end
end

fib_srv << [10, fib10_port = Ractor::Port.new]
fib_srv << [20, fib20_port = Ractor::Port.new]
fact_srv << [10, fact10_port = Ractor::Port.new]
fact_srv << [20, fact20_port = Ractor::Port.new]

do_some_work()

p fib10_port.receive #=> fib(10)
p fib20_port.receive #=> fib(20)
p fact10_port.receive #=> fact(10)
p fact20_port.receive #=> fact(20)

# or

ports = [fib10_port, fib20_port, fact10_port, fact20_port]

while !ports.empty?
  port, result = Ractor.select(*ports)
  case port
  when fib10_port
    p fib10: result
  ...
  else
    raise "This should not happen (BUG)."
  end

  ports.delete(port)
end

Ractor::Port resolves key pain points in message passing between Ractors:

  • It guarantees that incoming messages are only delivered to the intended Ractor, preventing tag collisions.
  • It enables message routing without relying on global receive blocks (Ractor.receive), which are prone to unintended consumption.
  • It replaces more complex primitives like .receive_if, .yield, and #take with a simpler, composable abstraction.
  • It maps cleanly to the Actor model semantics Ruby intends to support with Ractors.

While the pattern looks similar to using channels, the semantics and guarantees are different in meaningful ways.

The advantages of using Ports include:

  • Safer than channels in practice
    • When using a Port, if #send succeeds, it means the destination Ractor is still alive (i.e., it's running).
    • In contrast, with a channel, there's no guarantee that any Ractor is still available to receive from it.
    • Of course, even with a port, there's no guarantee that the destination Ractor will actually process the message — it might ignore it.
    • But at least you don't need to worry about the Ractor having already terminated unexpectedly.
    • In other words, using a port eliminates one major failure case, making the communication model more predictable.
    • This is one of the reasons why Ruby went with the "Actor" model (hence the name Ractor), instead of the "CSP" model.
  • Faster than channels in both creation and message transmission
    • When creating a channel, we need to prepare a container data structure. When creating a port, it is lightweight data (a pair of Ractor and newly created ID).
    • On the channel transmission, we need copying a data to channel and a copying to the receiving ractor. On the port, it only needs to copy from the src ractor to the dst ractor. This issue becomes more significant due to Ractor-local garbage collection and isolation of object spaces.
  • Easy to implement. We only need to implement Port#receive to synchronize with other ractors.
    • #send/.receive is easy to implement because we only need to lock the receiving ractor.
    • .yield/#take is not easy to implement because we need to lock taking and receiving ractors because it is rendezvous style synchronization.
    • .select is DIFFICULT to support current spec. Now CI isn't stable yet.
    • A simpler spec reduces bugs, and maybe leads to faster implementation.

Disadvantages:

  1. It is not a well-known concept, especially for Go language users.
  2. We need additional abstraction like producer(s)-consumer(s) concurrent applications.

For (2), I want to introduce an example code. We can write a 1-producer, multiple-consumer pattern with a channel.

# channel version of 1 producer & consumers

ch = new_channel

RN = 10 # make 10 consumers

consumers = RN.times.map do
  Ractor.new ch do
    while param = ch.receive
      task(param)
    end
  end
end

tasks.each do |task|
  ch << task
end

With Port, we need to introduce a load balancing mechanism:

# Port version of 1 producer & consumers

control_port = Ractor::Port.new
consumers = RN.times.map do
  Ractor.new control_port control_port do |control_port|
    while true
      control_port << [:ready, Ractor.current] # register - ready
      param = Ractor.receive # it assumes task doesn't use Ractor.receive
      task(param)
    end
  end
end

tasks.each do |task|
  control_port.receive => [:ready, consumer]
  
  # send a task to a ready consumer
  consumer << task
end

Of course we can make a library for that (like OTP on Erlang).

Default port of Ractors

Each Ractor has a default port and Ractor#send is equal to Ractor.current.default_port#send. Of course, Ractor.receive is equal to Ractor.current.default_port.receive.

For the simple case, we can keep to use Ractor#send and Ractor.receive

Deprecation of Ractor#take and Ractor.yield

With the Port concept, we can focus solely on send and receive—that is, direct manipulation of a Ractor’s mailbox. Ports provide a clean and functional alternative to Ractor#take and Ractor.yield, making them unnecessary in most use cases.

Moreover, Ports are significantly easier to implement, as they require only locking the receiving Ractor, while yield/take involve complex rendezvous-style synchronization. By removing these primitives, we can simplify the specification and reduce implementation complexity—especially around features like Ractor.select, which are notoriously hard to get right.

Ractor.select with ports

We should wait for multiple port simultaneously so Ractor.select() should accept ports. Now Ractor.select() can also receiving and yielding the value, but if we remove the #take functionality, Ractor.select only need to support ports.

Wait for termination

Ractor#take is designed from an idea of getting termination result (like Thread#value). For this purpose, we can introduce Ractor#join or Ractor#value like Threads or we can keep the name Ractor#take for this purpose.

We can make Ractor#join as a following pseudo-code:

class Ractor
  def join # wait for the termination
    monitor port = Port.new
    port.receive
  ensure
    monitor nil # unregister / it should be discussed
  end
  
  # when this ractor terminates, send a message to the registered port
  def monitor port
    @monitor_port = port
  end
  
  private def atexit
    @monitor_port << termination_message
  end
end

# there are some questions.
# * can we register multiple ports?
# * should we support `#join` and `#value` like threads?
#   or should we support only `#join` to return the value?
# * or keep this name as `#take`?


Ractor.new do
  42
end.join #=> 42 (or true?)

It is very similar to monitor in Erlang or Elixir.
We can also make a supervisor in Erlang like that:

sv_port = Ractor::Port.new

rs = N.times.map do
  Ractor.new do
    do_something()
  end.monitor sv_port
end

while termination_notice = sv_port.receive
  p termination_notice
end

# With Ractor#take, we can write similar code if there is no Ractor.yield

rs = N.times.map do
  Ractor.new do
    do_something()
  end
end

while r, msg = Ractor.select(*rs)
  p [r, msg]
end

Discussion

send with tag (symbols)

If we force users to send a tagged message every time, we can achieve the same effect as Port concept, because a Port can be thought of as a combination of a tag and a destination Ractor.

r = Ractor.new do
  loop do
    tag, msg = Ractor.receive # return 2 values
    case tag
    when :TAG
      p [tag, msg]
    else
      # ignore
    end
  end
end

r.send :TAG, 42
r.send :TAGE, 84 # this typo and the message is silently ignored

However it has two issues:

  • If we make a typo in tag name, the message will be silently ignored.
  • The tag name may conflict with unrelated codes (libraries)

Ractor.yield and Ractor#take with channel ractor

If we want to leave the .yield and #take, we can emulate them with channel ractor.

class Ractor
  def initialize
    @yield_ractor = Ractor.new do
      takers = []
      while tag, msg = Ractor.receive
        case tag
        when :register
          @takers << msg
        when :unregister
          @takers.delete msg
        when :yield
          @takers.pop << msg
        end
      end
    end
  end

  def self.yield obj
    @yield_ractor << [:yield, obj]
  end

  def take
    @yield_ractor << [:register, port = Ractor::Port.new]
    port.receive
  ensure
    @yield_ractor << [:unregister, port]
  end
end

Opening and closing the port

This proposal doesn't contain opening and closing the port, but we can discuss about it. To introduce this attribute, we need to manage which ports (tags) are opening.

Implementation

Now the native implementation is not finished, but we can implement it using the Ractor.receive_if mechanism, so we estimate that only a few weeks of work are needed to complete it.

Summary

This proposal introduces the following features and deprecations.

  • Ractor::Port
    • Port#send(msg) – sends a message to the creator of the port.
    • Port#receive – receives a message from the port.
    • A port is a lightweight data structure (a pair of a Ractor and a tag).
  • Ractor#join or Ractor#value – to wait for Ractor termination (like Thread#join)
  • Ractor#monitor – to observe when another Ractor terminates
  • Deprecations:
    • Ractor#take
    • Ractor.yield
    • Ractor.receive_if

Thank you for reading this long proposal. If you have any use cases that cannot be addressed with Ractor::Port, I'd love to hear them.

P.S. Thanks to mame for reviewing this proposal and suggesting that I use ChatGPT to improve the writing.


Related issues 1 (0 open1 closed)

Related to Ruby - Feature #21121: Ractor channelsClosedActions
Actions #1

Updated by ko1 (Koichi Sasada) 4 days ago

Actions #2

Updated by ko1 (Koichi Sasada) 3 days ago

  • Description updated (diff)
Actions #3

Updated by ko1 (Koichi Sasada) 3 days ago

  • Description updated (diff)

Updated by tenderlovemaking (Aaron Patterson) 3 days ago

I like this idea, and I think we should do it, but I think we still need a channel or a queue for the producer / consumer model.

Here's an example of producer / consumer model with threads (1 producer, N consumers):

# Consumers
consumers = N.times.map {
  Thread.new {
    while work = queue.pop
      do_work(work)
    end
  }
}

# Producer thread runs in parallel with consumers
Thread.new do
  loop do
    queue << find_work
  end
end

consumers.each(&:join)

In the above example, the producer is allowed to run in parallel with the consumer threads. As long as the producer keeps the queue filled, no consumer threads will starve for work. I'm not sure how we accomplish the same thing with Ports.

If I rewrite the above example with ports, I think it looks as follows:

# Port version of 1 producer & consumers

control_port = Ractor::Port.new
consumers = RN.times.map do
  Ractor.new control_port do |control_port|
    while true
      control_port << [:ready, Ractor.current] # register - ready
      param = Ractor.receive # it assumes task doesn't use Ractor.receive
      task(param)
    end
  end
end

loop do
  control_port.receive => [:ready, consumer]
  
  # Consumer starves until `find_work` completes
  consumer << find_work
end

The problem with the above code is that the consumer Ractor will starve until find_work returns. Additionally, we're not able to "find work" in parallel with the consumer threads.

I think we could rewrite like this:

# Port version of 1 producer & consumers

control_port = Ractor::Port.new
consumers = RN.times.map do
  Ractor.new control_port do |control_port|
    while true
      control_port << [:ready, Ractor.current] # register - ready
      param = Ractor.receive # it assumes task doesn't use Ractor.receive
      task(param)
    end
  end
end

queue = Queue.new
Thread.new {
  loop do
    queue << find_work
  end
}

# block this thread until there is work in the queue
while work = queue.pop

  # block until there is a Ractor that can take the work
  control_port.receive => [:ready, consumer]

  # Give the work to the Ractor
  consumer << work
end

But the above code seems more complex than if Ractors support a Queue or Channel.

Updated by ko1 (Koichi Sasada) 3 days ago

The problem with the above code is that the consumer Ractor will starve until find_work returns.

I think it is same on your thread+queue example. Am I missing something?

Additionally, we're not able to "find work" in parallel with the consumer threads.

Why don't you use ractor to run "find work" if you want to run in parallel with the main Ractor?

Maybe it is easy to discuss with good examples.

Updated by tenderlovemaking (Aaron Patterson) 3 days ago

ko1 (Koichi Sasada) wrote in #note-5:

The problem with the above code is that the consumer Ractor will starve until find_work returns.

I think it is same on your thread+queue example. Am I missing something?

Is it? In my thread+queue example, there is nothing blocking the producer thread from adding work to the work queue:

Thread.new do
  loop do
    queue << find_work
  end
end

With the Ractor example, we don't find_work until a Ractor is ready to take work:

loop do
  # Blocked here until consumer is ready, but we could be executing `find_work` while waiting for consumer to be ready
  control_port.receive => [:ready, consumer]
  
  # Consumer starves until `find_work` completes
  consumer << find_work
end

Additionally, we're not able to "find work" in parallel with the consumer threads.

Why don't you use ractor to run "find work" if you want to run in parallel with the main Ractor?

How would the Ractor running "find work" communicate the work to the consumers?

Maybe it is easy to discuss with good examples.

Sorry, I thought my example is clear. I'll try to make a running program to demonstrate.

Updated by ko1 (Koichi Sasada) 2 days ago · Edited

Now I understand that find_work takes a time and if no consumers are ready, we can't run find_work if there is no ready consumers.

We need to introduce an additional ractor as a channel. In other words, it should be a load-balancer or a worker-pool manager.
On the exlixir, it has gen_stage for this purpose. I think we can also introduce something similar.

We can emulate channel/queue:

class Ractor
  class Channel
    def initialize
      @r = Ractor.new do
        deq_ports = []
        enq_objs = []

        while cmd, obj = Ractor.receive
          case cmd
          when :enq
            enq_objs << obj
            submit
          when :deq
            deq_ports << obj
            submit
          else
            raise 'unknown'
          end
        end
      end
    end

    private def submit
      while !deq_ports.empty? && !enq_objs.empty?
        begin
          port = deq_ports.shift
          obj = enq_objs.shift
          port << obj
        rescue Racotr::ClosedPortError
          # just ignore for closed port
          enq_objs.unshift obj
        end
      end
    end

    def enq obj
      @r << [:enq, obj]
    end

    def deq
      deq_port do |port|
        port.receive
      end
    end

    def deq_port
      @r << [:deq, port = Ractor::Port.new]
      yield port
    ensure
      port.close
    end
  end
end

ch = Ractor::Channel.new

Ractor.new ch do |ch|
  ch.deq #=> 42
end

ch << 42

We need to create many channels to receive specific results, which is easily achievable with "Port".
However, in load-balancing or worker-pool scenarios, only a few channels are needed in an application, so the creation overhead is negligible. Therefore, using Ractors for this purpose is acceptable.

BTW writing this code, I realized that closing port seems good functionality to have.

Updated by ko1 (Koichi Sasada) 2 days ago · Edited

The important point is that we have limited "blocking" synchronization (introduced by cross-Ractor communication) to receive, and we can wait for multiple such events using Ractor.select.
If we introduce a Channel as another primitive, it adds another blocking mechanism and makes Ractor.select more complex again.

Updated by tenderlovemaking (Aaron Patterson) 2 days ago

ko1 (Koichi Sasada) wrote in #note-8:

The important point is that we have limited "blocking" synchronization (introduced by cross-Ractor communication) to receive, and we can wait for multiple such events using Ractor.select.
If we introduce a Channel as another primitive, it adds another blocking mechanism and makes Ractor.select more complex again.

Great, yes. It makes sense. Thank you for explaining!

Actions

Also available in: Atom PDF

Like0
Like0Like0Like0Like0Like0Like0Like1Like1Like0