Project

General

Profile

Actions

Feature #21930

closed

Add Ractor#empty? method to check for pending messages without blocking

Feature #21930: Add Ractor#empty? method to check for pending messages without blocking

Added by synacker (Mikhail Milovidov) 6 days ago. Updated 2 days ago.

Status:
Feedback
Assignee:
-
Target version:
-
[ruby-core:124895]

Description

Summary
In concurrent Ractor‑based architectures, there’s a critical need to check whether a Ractor has pending messages without blocking. Currently, this is not possible with the standard API

Motivation
The Ractor API provides a powerful mechanism for communication between system OS threads. However, in high‑load systems that use cooperative multitasking, the current Ractor#receive method presents limitations:

  • It blocks the current thread until a message arrives.

  • It doesn’t offer a non‑blocking way to check the message queue.

  • This makes it difficult to integrate Ractors with cooperative scheduling frameworks (e.g., Async, Fiber‑based systems).

As a result, developers must either:

  • Accept thread blocking (hurting responsiveness).

  • Implement complex workarounds with timeouts or auxiliary queues.

Proposed solution
Add Ractor#empty? to the Ractor API. The method should:

  • Return true if there are no pending messages in the Ractor’s main queue.

  • Return false if there is at least one message available for processing.

  • Not block the calling thread under any circumstances.

  • Be safe to call from any Ractor (including the current one).

Demonstration code
Below is a proof‑of‑concept showing how Ractor#empty? enables cooperative multitasking with the Async gem:

require 'async'

class TimeCommand
    attr_reader :id

    def initialize(id)
        @id = id
    end

    def task
        1.upto(3) do |i|      
            sleep(1)
            puts "[cmd #{@id}] step #{i} @ #{Time.now}"
        end
    end
end

class Worker
    def initialize
        @ractor = Ractor.new do
            loop do
                Sync do |task|
                    in_queue = Async::Queue.new
                    queue_task = task.async do |subtask|
                        while command = in_queue.dequeue
                            subtask.async do |child_task|
                                command.task
                            end
                        end
                    end

                    task.async(transient: true) do |main_task|
                        loop do
                            commands = []
                            if queue_task.children? || !in_queue.empty?
                                main_task.yield
                                commands.append Ractor.receive while !Ractor.current.empty?
                            else
                                commands.append Ractor.receive
                            end
                            
                            unless commands.empty?
                                puts "Worker received batch of #{commands.size} commands."
                                commands.each { |command| in_queue.enqueue(command) }
                            end
                        end
                    end
                end
            end
            
        end
    end

    def send(command)
        @ractor.send(command, move: true)
    end

    def wait
        @ractor.join
    end
end

worker = Worker.new

1000.times do |i|
    100.times do |j|
        worker.send TimeCommand.new(i * 10 + j)
    end
    sleep(1)
end

worker.wait

Key observations:
With Ractor#empty?, developers can:

  • Integrate Ractors with cooperative multitasking frameworks (e.g., Async) more naturally.

  • Avoid thread blocking when checking for incoming messages.

  • Batch process messages efficiently (collect all pending messages in one go).

  • Improve responsiveness in high‑concurrency scenarios by yielding control back to the scheduler when no work is available.

Benefits

  • Enables better integration with modern Ruby concurrency tools.

  • Reduces need for complex workarounds.

  • Improves performance in message‑driven architectures.

  • Maintains Ractor’s thread‑safety guarantees.

Actions

Also available in: PDF Atom