Feature #21930
closedAdd Ractor#empty? method to check for pending messages without blocking
Description
Summary
In concurrent Ractor‑based architectures, there’s a critical need to check whether a Ractor has pending messages without blocking. Currently, this is not possible with the standard API
Motivation
The Ractor API provides a powerful mechanism for communication between system OS threads. However, in high‑load systems that use cooperative multitasking, the current Ractor#receive method presents limitations:
-
It blocks the current thread until a message arrives.
-
It doesn’t offer a non‑blocking way to check the message queue.
-
This makes it difficult to integrate Ractors with cooperative scheduling frameworks (e.g., Async, Fiber‑based systems).
As a result, developers must either:
-
Accept thread blocking (hurting responsiveness).
-
Implement complex workarounds with timeouts or auxiliary queues.
Proposed solution
Add Ractor#empty? to the Ractor API. The method should:
-
Return true if there are no pending messages in the Ractor’s main queue.
-
Return false if there is at least one message available for processing.
-
Not block the calling thread under any circumstances.
-
Be safe to call from any Ractor (including the current one).
Demonstration code
Below is a proof‑of‑concept showing how Ractor#empty? enables cooperative multitasking with the Async gem:
require 'async'
class TimeCommand
attr_reader :id
def initialize(id)
@id = id
end
def task
1.upto(3) do |i|
sleep(1)
puts "[cmd #{@id}] step #{i} @ #{Time.now}"
end
end
end
class Worker
def initialize
@ractor = Ractor.new do
loop do
Sync do |task|
in_queue = Async::Queue.new
queue_task = task.async do |subtask|
while command = in_queue.dequeue
subtask.async do |child_task|
command.task
end
end
end
task.async(transient: true) do |main_task|
loop do
commands = []
if queue_task.children? || !in_queue.empty?
main_task.yield
commands.append Ractor.receive while !Ractor.current.empty?
else
commands.append Ractor.receive
end
unless commands.empty?
puts "Worker received batch of #{commands.size} commands."
commands.each { |command| in_queue.enqueue(command) }
end
end
end
end
end
end
end
def send(command)
@ractor.send(command, move: true)
end
def wait
@ractor.join
end
end
worker = Worker.new
1000.times do |i|
100.times do |j|
worker.send TimeCommand.new(i * 10 + j)
end
sleep(1)
end
worker.wait
Key observations:
With Ractor#empty?, developers can:
-
Integrate Ractors with cooperative multitasking frameworks (e.g., Async) more naturally.
-
Avoid thread blocking when checking for incoming messages.
-
Batch process messages efficiently (collect all pending messages in one go).
-
Improve responsiveness in high‑concurrency scenarios by yielding control back to the scheduler when no work is available.
Benefits
-
Enables better integration with modern Ruby concurrency tools.
-
Reduces need for complex workarounds.
-
Improves performance in message‑driven architectures.
-
Maintains Ractor’s thread‑safety guarantees.