Project

General

Profile

Actions

Feature #21869

open

Add receive_all Method to Ractor API for Message Batching

Feature #21869: Add receive_all Method to Ractor API for Message Batching

Added by synacker (Mikhail Milovidov) 15 days ago. Updated 12 days ago.

Status:
Open
Assignee:
-
Target version:
-
[ruby-core:124711]

Description

Summary
The Ractor API provides an excellent mechanism for inter‑thread communication, but it currently lacks a built‑in message batching technique. I propose adding a receive_all method to enable batch processing of messages, which can significantly improve performance in high‑load scenarios.

Motivation
In distributed queued systems, processing messages one‑by‑one (as with the current receive method) can introduce unnecessary overhead. Batch processing allows:

Reduced context‑switching overhead.

More efficient I/O operations (e.g., fewer file writes).

Better throughput in high‑concurrency environments.

Proposed Solution
Add a receive_all method to the Ractor API that:

Returns all available messages in the Ractor’s mailbox at once (as an array).

Demonstration Code
Below is a benchmark comparing individual receive vs. batch receive_all:

require 'benchmark'
class RactorsTest

  def initialize(count)
    @count = count
    @ractor1 = Ractor.new(count, 'output1.txt') do |count, filename|
      File.open(filename, 'w') do |file|
        while count.positive?
          message = receive
          file.write("Ractor 1 received message: #{message}\n")
          file.flush
          count -= 1
        end
      end
    end
    
    @ractor2 = Ractor.new(count, 'output2.txt') do |count, filename|
      File.open(filename, 'w') do |file|
        while count.positive?
          messages = receive_all
          messages.each do |message|
            file.write("Ractor 2 received message: #{message}\n")
          end
          count -= messages.length
          file.flush
        end
      end
    end
  end

  def run1
    @count.times do |i|
      @ractor1.send("Message #{i + 1}")
    end
    @ractor1.join
  end

  def run2
    @count.times do |i|
      @ractor2.send("Message #{i + 1}")
    end
    @ractor2.join
  end
end

records = 1_000_000

test = RactorsTest.new(records)

p [:once, Benchmark.realtime { test.run1 }.round(2)]
p [:all, Benchmark.realtime { test.run2 }.round(2)]

Benchmark Results
On my system, receive_all shows ~4x improvement over individual receive:

Key Observations:
Ractor1 (using receive): Processes each message individually, resulting in frequent I/O calls.

Ractor2 (using receive_all): Processes all queued messages at once, minimizing I/O overhead

Updated by ko1 (Koichi Sasada) 13 days ago · Edited Actions #2 [ruby-core:124752]

Does it block when the queue is empty or returns []? (I think the example expects blocking)

Updated by Eregon (Benoit Daloze) 13 days ago Actions #3 [ruby-core:124757]

synacker (Mikhail Milovidov) wrote:

More efficient I/O operations (e.g., fewer file writes).

Is it? In your example you call file.write for each message in both cases.

But you also call file.flush after each file.write in ractor1 and only only once per batch in ractor2.
Could you benchmark without the file.flushs? I suspect the difference is much smaller then.

I understand the idea that batching helps in this case where you want to explicitly flush, but that's a pretty specific example, e.g. it's uncommon to even call IO#flush at all in Ruby.
One could also flush after N messages/bytes in ractor1.

Updated by synacker (Mikhail Milovidov) 12 days ago · Edited Actions #4 [ruby-core:124759]

ko1 (Koichi Sasada) wrote in #note-2:

Does it block when the queue is empty or returns []? (I think the example expects blocking)

Yes, it blocks if the queue empty. The method receive_all accepts a limit parameter:

  1. limit > 0: collects up to limit messages (may return fewer if fewer are queued). Blocks if the queue is empty.
  2. limit == 0: returns an empty array immediately (no blocking)
  3. limit < 0 or nil (default): returns all messages from the queue or blocks if the queue is empty

Eregon (Benoit Daloze) wrote in #note-3:

I understand the idea that batching helps in this case where you want to explicitly flush, but that's a pretty specific example, e.g. it's uncommon to even call IO#flush at all in Ruby.

The example demonstrates how you can collect messages during long I/O operations and batch them together to reduce the number of subsequent I/O calls. The file.flush call simulates a long I/O operation - it could equally represent a database call or something like that

Updated by synacker (Mikhail Milovidov) 12 days ago Actions #5 [ruby-core:124760]

Eregon (Benoit Daloze) wrote in #note-3:

Is it? In your example you call file.write for each message in both cases.

But you also call file.flush after each file.write in ractor1 and only only once per batch in ractor2.

This is also a realistic scenario. To guarantee messages are saved to file, you'd normally need to call flush after each message - but that's inefficient when processing single messages.

Actions

Also available in: PDF Atom