Feature #17378
closedRactor#receive with filtering like other actor langauge
Description
With discussion with @marcandre (Marc-Andre Lafortune), we found good extension for Ractor.receive with block.
Ractor.receive do |msg|
  if msg is match to condition?
    true
  else
    false
  end
end
This block iterates incoming queue's values and the value is passed in msg.
If the passed value is matched you want to process, the block should return true and the value will be removed from the queue.
Otherwise (returning falsy value), the value remains in the queue, and other Ractor.receive accesses it again.
If there is not more values in queue, the interpreter sleeps until new values are received.
r = Ractor.new do |;r|
  loop do
    r, msg = Ractor.receive
    r << msg
    r << 1
    r << 2
    r << 3
  end
end
r.send([Ractor.current, :ping])
# incoming queue: [:ping, 1, 2, 3] (3 is at the last)
Ractor.receive #=> :ping # without a block
# incoming queue: [1, 2, 3]
p Ractor.receive{|msg| msg == 2}
#=> 2 
# incoming queue: [1, 3]
begin
  # exception in the block doesn't touch the queue
  p Ractor.receive{|msg| raise "err"}
rescue => e
  p e.message #=> "err"
end
# incoming queue: [1, 3]
p Ractor.receive #=> 1
# incoming queue: [3]
p Ractor.receive #=> 3
With multiple server, we can make it:
morning_server = Ractor.new do
  loop do
    task = Proc.new{|obj| "Good morning #{obj}"}
    r, req = Ractor.receive
    res = task.(req)
    r.send([Ractor.current, res])
  end
end
evening_server = Ractor.new do
  loop do
    task = Proc.new{|obj| "Good evening #{obj}"}
    r, req = Ractor.receive
    res = task.(req)
    r.send([Ractor.current, res])
  end
end
morning_server << [Ractor.current, 'ko1']
morning_server << [Ractor.current, 'ko2']
morning_server << [Ractor.current, 'ko3']
evening_server << [Ractor.current, 'ko1']
evening_server << [Ractor.current, 'ko2']
evening_server << [Ractor.current, 'ko3']
def receive r
  Ractor.receive{|(from, msg)|
    r == from
  }[1]
end
p receive(morning_server) #=> "Good morning ko1"
p receive(evening_server) #=> "Good evening ko1"
p receive(morning_server) #=> "Good morning ko2"
p receive(evening_server) #=> "Good evening ko2"
p receive(morning_server) #=> "Good morning ko3"
p receive(evening_server) #=> "Good evening ko3"
  
        
          
          Updated by ko1 (Koichi Sasada) almost 5 years ago
          
          
        
        
      
      Implementation is: https://github.com/ruby/ruby/pull/3862
        
          
          Updated by ko1 (Koichi Sasada) almost 5 years ago
          
          
        
        
      
      - Description updated (diff)
 
        
          
          Updated by Eregon (Benoit Daloze) almost 5 years ago
          
          
        
        
      
      - Description updated (diff)
 
        
          
          Updated by Eregon (Benoit Daloze) almost 5 years ago
          
          
        
        
      
      - Related to Feature #17365: Can not respond (well) to a Ractor added
 
        
          
          Updated by matz (Yukihiro Matsumoto) almost 5 years ago
          
          
        
        
      
      The receive method with a block suggest the block takes an incoming message as a block parameter. So I agree with the proposal if the name is receive_if.
Matz.
        
          
          Updated by marcandre (Marc-Andre Lafortune) almost 5 years ago
          
          
        
        
      
      When discussing with @ko1 (Koichi Sasada) and @Eregon (Benoit Daloze), I had an idea for a better API.
I'd like to be able to call Ractor.receive this way:
def example
  loop do
    Ractor.receive do |message|
      case message
      in [:cmd1, arg] then task1(arg)
      in [:cmd2] then return task2
      end
    end
  end
end
It does not matter what task1 returns; the value of the block is not used to determine filtering.
task1 and task2 can also call Ractor.receive, for example:
def task1(arg)
  other_ractor.send(:do_something, arg)
  Ractor.receive do |message|
    case message
    in [:do_something_response, :success, response] then process(response)
    in [:do_something_response, :failure] then raise 'Oh oh'
    end
  end
end
The filtering is implicit in these examples, like in Elixir/Erlang.
If there is no match, the case in raises a NoMatchingPatternError. This is a way to signal that message was not consumed. Ractor.receive rescues the exception and saves the message.
For example, if some :cmd2 is sent just when task1 is doing it's Ractor.receive, it will not match the case, and the message is not consumed. Just after that, the response from other_ractor arrives and task1 finishes, the :cmd2 will be received in example.
This has the benefits:
- 
task1andtask2can return anything, includingnilorfalse(otherwise can be error prone) - you can 
return/breakout ofreceive - implicit handling of exception case
 - very close to Elixir/Erlang semantics
 - also would make 
case/inmore popular! 
There would be an additional way to reject a message, with a second parameter with meta information (client ractor and unique request ID):
Ractor.receive do |message, req|
  cmd, arg = message
  case cmd
  when :cmd1 then task1(arg)
  when :cmd2 then return task2
  else
    req.reject # => same effect as `raise NoMatchingPatternError`
  end
end
The req object would have methods returning the ractor that sent the request, as well as a unique id. This makes responding correctly very easy. Another thread could be communicating with the same ractor and responses could not be mixed:
def task1(arg)
  req = other_ractor.send(:do_something, arg)
  Ractor.receive do |message|
    case message
    in [^req, :success, response] then process(response)
    in [^req, :failure] then raise 'Oh oh'
    end
  end
end
Matz, what do you think of this API?
Note: there are remaining questions on how to handle receive/receive_if with block and multi-thread; hopefully we can keep this issue focused on the API assuming we have a such a method.
        
          
          Updated by ko1 (Koichi Sasada) almost 5 years ago
          
          
        
        
      
      - Status changed from Open to Closed
 
Applied in changeset git|a9a7f4d8b8ec30abc7a47ce700edc7209ae12279.
Ractor#receive_if to receive only matched messages
Instead of Ractor.receive, Ractor.receive_if can provide a pattern
by a block and you can choose the receiving message.
[Feature #17378]
        
          
          Updated by ko1 (Koichi Sasada) almost 5 years ago
          
          
        
        
      
      I have several concern about Ractor#receive proposed at #note-6.
- 
NoMatchingPatternErrorcan be raised in tasks unexpectedly (maybe by bug) and it can cause something strange behavior (like falsy onreceive_if). - 2nd parameter 
reqis not clear and I believe people forget to call it correctly. - Not sure how to re-insert (
req.reject), for example when other threads consumes the incoming queue. 
There is no time to discuss enough for ruby 3.0.
Maybe we need a way to write the following code:
Ractor.receive do |msg|
  case msg
  when pat1
    [Ractor.receive_commit]
    task1(msg)
  when pat2
    [Ractor.receive_commit]
    task2(msg)
  when
end
[Ractor.receive_commit] is a pseudo operation that remove the message from incoming queue.
The above code means "if matched to the pattern, remove the msg from the incoming queue and do the corresponding task, otherwise the msg should be remained in the incoming queue and try to check next message".
To make it with Ractor.receive_if, the following code is maybe similar:
Ractor.receive_if do
  case msg
  when pat1
    break -> { task1(msg) }
  when pat2
    break -> { task2(msg) }
  end
end.call
But nobody want to write such code.
I hope macro will translate something to it.
#note-6's approach with similar notation is:
Ractor.receive do |msg|
  [Ractor.receive_commit]
  
  case msg
  in pat1
    task1(msg)
  in pat2
    task2(msg)
  else
    [Ractor.receive_reject] # or call `req.reject` explicitly
  end
end
[Ractor.receive_reject] is re-insert the message to the incoming queue.
        
          
          Updated by marcandre (Marc-Andre Lafortune) almost 5 years ago
          
          
        
        
      
      ko1 (Koichi Sasada) wrote in #note-8:
I have several concern about
Ractor#receiveproposed at #note-6.
NoMatchingPatternErrorcan be raised in tasks unexpectedly (maybe by bug) and it can cause something strange behavior (like falsy onreceive_if).
I agree that is an issue. It would be much better if we had NoMatchingPatternError#depth #17406. We could intercept only depth 0.
- 2nd parameter
 reqis not clear and I believe people forget to call it correctly.
I think this API would be less used, but people can shoot themselves in the foot.
- Not sure how to re-insert (
 req.reject), for example when other threads consumes the incoming queue.
I don't understand this issue. Seems same as returning false from receive_if
There is no time to discuss enough for ruby 3.0.
😢
        
          
          Updated by Eregon (Benoit Daloze) almost 5 years ago
          
          
        
        
      
      I think a good way to fix this would be to have some variant of pattern matching, that automatically wraps each in body in a lambda.
That would allow matching patterns, executing some code, and then executing the body.
This was actually already proposed by @pitr.ch in https://bugs.ruby-lang.org/issues/14912#change-78398
Such a case expression would also return an object, which could be useful for many other cases too, and seems a much cleaner design than #17406.
Removing eagerly seems like it will inherently be problematic if there are multiple Thread calling Ractor.receive {}.
Re-adding a message in the Queue seems quite hacky, and it might mess up the message ordering, which would be a serious bug.
        
          
          Updated by marcandre (Marc-Andre Lafortune) almost 5 years ago
          
          
        
        
      
      Eregon (Benoit Daloze) wrote in #note-10:
Removing eagerly seems like it will inherently be problematic if there are multiple Thread calling
Ractor.receive {}.
It is imperative that a message is not processed twice though. You can change "removing" to "reserving" if that helps. Effect is same.
Re-adding a message in the Queue seems quite hacky, and it might mess up the message ordering, which would be a serious bug.
Not a bug. Ordering can not be made deterministic with multi-thread & filtering:
Message B is sent for thread B.
Independently, message A-1 and then A-2 are sent to be consumed by thread A.
Thread A calls receive_if, B is reserved and yielded.
Thread B calls receive_if, A-1 is reserved and yielded.
Thread C calls receive_if, A-2 is reserved and yielded.
Thread A, C rejects message => message A-2 is yielded and accepted by A.
I believe this is unavoidable and not a real issue. If order is important, it will be up to the programmers to design their protocol accordingly, for example by passing a not_before_id parameter (that can be used in filtering).
That being said, replace "Re-adding" by "putting back in its original place" if it helps. Order would be more often maintained, but can not always be done.
        
          
          Updated by ko1 (Koichi Sasada) almost 5 years ago
          
          
        
        
      
      marcandre (Marc-Andre Lafortune) wrote in #note-9:
- 2nd parameter
 reqis not clear and I believe people forget to call it correctly.I think this API would be less used, but people can shoot themselves in the foot.
I think most of case it will be called if people using send/receive in various case.
To determine that we need to observe the usages.
- Not sure how to re-insert (
 req.reject), for example when other threads consumes the incoming queue.I don't understand this issue. Seems same as returning
falsefromreceive_if
No problem because receive_if doesn't remove a message.