Project

General

Profile

Actions

Feature #21930

closed

Add Ractor#empty? method to check for pending messages without blocking

Feature #21930: Add Ractor#empty? method to check for pending messages without blocking

Added by synacker (Mikhail Milovidov) 6 days ago. Updated 2 days ago.

Status:
Feedback
Assignee:
-
Target version:
-
[ruby-core:124895]

Description

Summary
In concurrent Ractor‑based architectures, there’s a critical need to check whether a Ractor has pending messages without blocking. Currently, this is not possible with the standard API

Motivation
The Ractor API provides a powerful mechanism for communication between system OS threads. However, in high‑load systems that use cooperative multitasking, the current Ractor#receive method presents limitations:

  • It blocks the current thread until a message arrives.

  • It doesn’t offer a non‑blocking way to check the message queue.

  • This makes it difficult to integrate Ractors with cooperative scheduling frameworks (e.g., Async, Fiber‑based systems).

As a result, developers must either:

  • Accept thread blocking (hurting responsiveness).

  • Implement complex workarounds with timeouts or auxiliary queues.

Proposed solution
Add Ractor#empty? to the Ractor API. The method should:

  • Return true if there are no pending messages in the Ractor’s main queue.

  • Return false if there is at least one message available for processing.

  • Not block the calling thread under any circumstances.

  • Be safe to call from any Ractor (including the current one).

Demonstration code
Below is a proof‑of‑concept showing how Ractor#empty? enables cooperative multitasking with the Async gem:

require 'async'

class TimeCommand
    attr_reader :id

    def initialize(id)
        @id = id
    end

    def task
        1.upto(3) do |i|      
            sleep(1)
            puts "[cmd #{@id}] step #{i} @ #{Time.now}"
        end
    end
end

class Worker
    def initialize
        @ractor = Ractor.new do
            loop do
                Sync do |task|
                    in_queue = Async::Queue.new
                    queue_task = task.async do |subtask|
                        while command = in_queue.dequeue
                            subtask.async do |child_task|
                                command.task
                            end
                        end
                    end

                    task.async(transient: true) do |main_task|
                        loop do
                            commands = []
                            if queue_task.children? || !in_queue.empty?
                                main_task.yield
                                commands.append Ractor.receive while !Ractor.current.empty?
                            else
                                commands.append Ractor.receive
                            end
                            
                            unless commands.empty?
                                puts "Worker received batch of #{commands.size} commands."
                                commands.each { |command| in_queue.enqueue(command) }
                            end
                        end
                    end
                end
            end
            
        end
    end

    def send(command)
        @ractor.send(command, move: true)
    end

    def wait
        @ractor.join
    end
end

worker = Worker.new

1000.times do |i|
    100.times do |j|
        worker.send TimeCommand.new(i * 10 + j)
    end
    sleep(1)
end

worker.wait

Key observations:
With Ractor#empty?, developers can:

  • Integrate Ractors with cooperative multitasking frameworks (e.g., Async) more naturally.

  • Avoid thread blocking when checking for incoming messages.

  • Batch process messages efficiently (collect all pending messages in one go).

  • Improve responsiveness in high‑concurrency scenarios by yielding control back to the scheduler when no work is available.

Benefits

  • Enables better integration with modern Ruby concurrency tools.

  • Reduces need for complex workarounds.

  • Improves performance in message‑driven architectures.

  • Maintains Ractor’s thread‑safety guarantees.

Updated by synacker (Mikhail Milovidov) 6 days ago Actions #1

  • Description updated (diff)

Updated by nobu (Nobuyoshi Nakada) 5 days ago Actions #3 [ruby-core:124897]

  • Status changed from Open to Feedback

This sounds like leading to a typical TOC/TOU problem.

As for your example, why does Worker ractor handle both of main_task and dispatch alone, instead of launching each ractors?

Updated by synacker (Mikhail Milovidov) 5 days ago · Edited Actions #4 [ruby-core:124898]

nobu (Nobuyoshi Nakada) wrote in #note-3:

This sounds like leading to a typical TOC/TOU problem.

I appreciate the concern about a potential TOC/TOU issue, but I believe it doesn’t apply in this specific case. Consider the following pattern:

messages = []
while !Ractor.current.empty?
  messages << Ractor.receive
end
process_batch(messages) if messages.any?

In this code:

  • The «check» (empty?) and the «use» (receive) are tightly coupled in a loop.
  • Even if a new message arrives after the empty? check but before the receive call, the loop will catch it on the next iteration.
  • The batch simply grows by one more message — no data is lost, and no invalid state is entered.
  • This pattern is by design: the goal is to collect all available messages at the moment of polling, not to make an atomic decision based on a single state snapshot.

Thus, Ractor#empty? doesn’t introduce a new race condition — it enables a safe and efficient polling mechanism that’s already common in concurrent systems (e.g., event loops).

nobu (Nobuyoshi Nakada) wrote in #note-3:

As for your example, why does Worker ractor handle both of main_task and dispatch alone, instead of launching each ractors?

You asked why the Worker Ractor handles both main_task and dispatch logic instead of launching a separate Ractor per task. Here’s why creating one Ractor per task is impractical:

  1. Key drawbacks of one‑Ractor‑per‑task:
  • High overhead. Creating a Ractor is significantly more expensive than sending a message or scheduling a Fiber. For 10 000 tasks, spawning 10 000 Ractors would cause massive memory and scheduling overhead.
  • Resource exhaustion. The OS and Ruby VM have limits on concurrent threads/processes. Unbounded Ractor creation risks crashing the system or exhausting system resources.
  • Complex coordination. Managing 10 000+ Ractors (joining, error handling, monitoring, logging) becomes a complex task in itself, adding significant operational burden.
  1. Why the Worker Ractor acts as a managed pool:
    The current design uses a bounded number of Ractors (typically one per CPU core) and leverages Fibers for cooperative multitasking within each Ractor. Specifically:
  • It receives a stream of commands (TimeCommand objects) via the Ractor message queue.
  • It uses the Async gem to run their task method concurrently within the same Ractor, using Fibers to achieve lightweight concurrency.

This approach follows a well‑established architectural pattern for high‑load systems:

  • Create a fixed number of Ractors, typically matching the number of CPU cores (or a small multiple of it), to avoid OS/VM resource exhaustion.
  • Within each Ractor, use cooperative multitasking (Fibers, event loops, coroutines) to handle many concurrent operations efficiently.
  • Use Ractor#empty? as a scheduler hint — to batch work and yield control when idle — not as a security or state‑decision primitive.

This pattern balances high concurrency with bounded resource usage, making it suitable for production workloads.

Thank you again for the thoughtful questions — they help clarify the design rationale. Let me know if you’d like me to elaborate on any point!

Updated by synacker (Mikhail Milovidov) 3 days ago Actions #5 [ruby-core:124907]

nobu (Nobuyoshi Nakada) wrote in #note-3:

As for your example, why does Worker ractor handle both of main_task and dispatch alone, instead of launching each ractors?

I compared two strategies for handling concurrent tasks:

  1. run1: Ractors + fibers
  2. run2: only ractors

Here’s the benchmark code that executed 10000 tasks using both strategies and measured real execution time::

require 'async'
require 'benchmark'

class TimeCommand
    attr_reader :id

    def initialize(id)
        @id = id
    end

    def task
        1.upto(3) do |i|      
            sleep(1)
            "[cmd #{@id}] step #{i} @ #{Time.now}"
        end
    end
end

class StopCommand
end

class Worker
    attr_reader :iterations
    
    def initialize(iterations: 1000)
        @iterations = iterations
    end

    def run1
        ractor = Ractor.new do
            loop do
                run = true
                Sync do |task|
                    in_queue = Async::Queue.new
                    queue_task = task.async do |subtask|
                        while command = in_queue.dequeue
                            subtask.async do |child_task|
                                command.task
                            end
                            if command.is_a?(StopCommand)
                                break
                            end
                        end
                    end

                    task.async(transient: true) do |main_task|
                        loop do
                            commands = []
                            if queue_task.children? || !in_queue.empty?
                                main_task.yield
                                commands.append Ractor.receive while !Ractor.current.empty?
                            else
                                commands.append Ractor.receive
                            end

                            unless commands.empty?
                                commands.each { |command| in_queue.enqueue(command) }
                            end

                            if commands.any? { |cmd| cmd.is_a?(StopCommand) }
                                run = false
                                in_queue.enqueue(StopCommand.new)
                                break
                            end
                            
                        end
                    end
                end
                break unless run
            end
        end

        @iterations.times do |i|
            ractor.send(TimeCommand.new(i + 1))
        end
        ractor.send(StopCommand.new)
        ractor.join
    end

    def run2
        ractor = Ractor.new do
            ractors = []
            loop do
                command = Ractor.receive
                if command.is_a?(StopCommand)
                    break
                else
                    ractors << Ractor.new(command) do |cmd|
                        cmd.task
                    end
                end
            end
            ractors.each(&:join)
        end

        @iterations.times do |i|
            ractor.send(TimeCommand.new(i + 1))
        end
        ractor.send(StopCommand.new)
        ractor.join
    end
end

worker = Worker.new(iterations: 10000)

p [:fibers, Benchmark.realtime { worker.run1 }.round(2)]
p [:ractors, Benchmark.realtime { worker.run2 }.round(2)]

Output:

Benchmark result:

[:fibers, 6.29]
[:ractors, 58.73]

The results show a nearly 10‑fold performance difference. This benchmark demonstrates why the Worker Ractor uses a pooled design with Fibers instead of spawning a Ractor per task. But, for merging ractors with fibers need to add Ractor#empty? method.

Updated by ufuk (Ufuk Kayserilioglu) 3 days ago · Edited Actions #6 [ruby-core:124908]

@synacker (Mikhail Milovidov) Which version of Ruby are you testing with? Can you please send your ruby -v output for the benchmark results?

Updated by synacker (Mikhail Milovidov) 3 days ago Actions #7 [ruby-core:124915]

ufuk (Ufuk Kayserilioglu) wrote in #note-6:

@synacker (Mikhail Milovidov) Which version of Ruby are you testing with? Can you please send your ruby -v output for the benchmark results?

This is a ruby version from my pr (https://github.com/ruby/ruby/pull/16277) for this feature, because I used new Ractor#empty? method in run1 method:

ruby 4.1.0dev (2026-03-02T21:05:21Z feature-21930 805cf8c2d2) +PRISM [x86_64-linux]

Updated by nobu (Nobuyoshi Nakada) 2 days ago Actions #8 [ruby-core:124926]

synacker (Mikhail Milovidov) wrote in #note-4:

In this code:

  • The «check» (empty?) and the «use» (receive) are tightly coupled in a loop.
  • Even if a new message arrives after the empty? check but before the receive call, the loop will catch it on the next iteration.
  • The batch simply grows by one more message — no data is lost, and no invalid state is entered.
  • This pattern is by design: the goal is to collect all available messages at the moment of polling, not to make an atomic decision based on a single state snapshot.

It would work for your code, but may not for general purposes.
And if main_task finished immediately and no command is coming, it will be a busy loop.
I guess what you want is non-blocking (and maybe bulk) read, right?

Updated by byroot (Jean Boussier) 2 days ago Actions #9 [ruby-core:124929]

I don't have a string opinion on whether empty? is useful, that being said it's present on Thread::Queue and I support trying to mirror the API as much as possible.

But empty? alone isn't that helpful because of TOC/TOU problem as mentioned, so it only make sense if we also get a non blocking pop/push like Thread::Queue has. I think @etienne (Étienne Barrié) and @jhawthorn (John Hawthorn) were looking into that recently?

Updated by synacker (Mikhail Milovidov) 2 days ago Actions #10 [ruby-core:124930]

nobu (Nobuyoshi Nakada) wrote in #note-8:

synacker (Mikhail Milovidov) wrote in #note-4:

In this code:

  • The «check» (empty?) and the «use» (receive) are tightly coupled in a loop.
  • Even if a new message arrives after the empty? check but before the receive call, the loop will catch it on the next iteration.
  • The batch simply grows by one more message — no data is lost, and no invalid state is entered.
  • This pattern is by design: the goal is to collect all available messages at the moment of polling, not to make an atomic decision based on a single state snapshot.

It would work for your code, but may not for general purposes.
And if main_task finished immediately and no command is coming, it will be a busy loop.
I guess what you want is non-blocking (and maybe bulk) read, right?

Thank you for the feedback. Ractor#empty? isn’t a niche fix - it’s a general‑purpose primitive for efficient schedulers and Ractor‑Fiber integration. The code doesn’t cause a busy loop because Ractor.receive blocks when the queue is empty. This method enables non‑blocking batching, complementing my other PR (https://bugs.ruby-lang.org/issues/21869) to improve the Ractor API for using it with cooperative multitasking.

Actions

Also available in: PDF Atom