Feature #14736

Thread selector for flexible cooperative fiber based concurrency

Added by ioquatix (Samuel Williams) 12 months ago. Updated 10 months ago.

Target version:


Ruby concurrency can be greatly enhanced by concurrent, deterministic IO.

Fibers have been shown many times to be a great abstraction for this purpose. The retain normal code flow and don't require any kind of Thread synchronisation. They are enjoyable to write code with because you don't have to concern yourself with thread synchronisation or other complicated issues.

The basic idea is that if some operation would block, it yields the Fiber, and other Fibers within the thread can continue to execute.

There are a number of ways to implement this. Here is a proof of concept to amend the existing rb_io_wait_readable/rb_io_wait_writable.

This design minimally affects the Ruby implementation and allows flexibility for selector implementation. With a small amount of work, we can support EventMachine (65 million downloads), NIO4r (21 million downloads). It would be trivial to back port.

This PR isn't complete but I am seeking feedback. If it's a good idea, I will do my best to see it through to completion, including support for EventMachine and NIO4r.


port_scanner_threadlet.rb (925 Bytes) port_scanner_threadlet.rb normalperson (Eric Wong), 06/13/2018 01:03 AM


Updated by ioquatix (Samuel Williams) 12 months ago

As part of this, I've started working on

Updated by ioquatix (Samuel Williams) 11 months ago

I've been playing around with port scanners. Implemented in Go (goroutines), Python (asyncio) and Ruby (async).

I wrote up the results here:

It was just an attempt to gauge the performance of the different implementations. It's by no means an authoritative comparison.

What I found interesting was that Ruby (async) was faster than Python (async) by about 2x. Go was faster again by about 2x. However, Go can use multiple CPU cores, and so because it utilised ~5 hardware threads, it was in effect about 10x faster.

I found that quite fascinating.

I don't believe we can easily adopt a model like goroutines in Ruby. However, it might be possible to adapt some of the good ideas from it.

Updated by normalperson (Eric Wong) 11 months ago wrote:

I've been playing around with port scanners. Implemented in Go
(goroutines), Python (asyncio) and Ruby (async).

I wrote up the results here:

Attached is the implementation for Threadlet/auto-fiber/wachamacallit
rebased against ruby trunk r63641:

On a busy Linux VM, Threadlet was close to your Go implementation
in speed (timing results were unstable, however) and Ruby async
was around 3x slower behind (even with timing instabilities).

I kept on getting errors with the Python3 version
("Event loop is closed") so I never let it finish

I needed to deal with EPIPE because the system I tested on had RDS (16385)
enabled in the kernel which was triggering EPIPE (I don't know Go or Python):

 diff --git a/examples/port_scanner/port_scanner.go b/examples/port_scanner/port_scanner.go
 index 45f2d1c..ad0f049 100755
 --- a/examples/port_scanner/port_scanner.go
 +++ b/examples/port_scanner/port_scanner.go
 @@ -55,7 +55,7 @@ func checkPortOpen(ip string, port int, timeout time.Duration) {
        } else if strings.Contains(err.Error(), "refused") {
            // fmt.Println(port, "closed", err.Error())
        } else {
 -          panic(err)
 +          fmt.Println(port, "err", err.Error())
 diff --git a/examples/port_scanner/ b/examples/port_scanner/
 index 372f0b3..ca9d41a 100755
 --- a/examples/port_scanner/
 +++ b/examples/port_scanner/
 @@ -22,6 +22,8 @@ class PortScanner:
  # print("{} closed".format(port))
 except asyncio.TimeoutError:
 print("{} timeout".format(port))
 +            except SystemError:
 +                print("{} error".format(port))

 def start(self, timeout=1.0):
 diff --git a/examples/port_scanner/port_scanner.rb b/examples/port_scanner/port_scanner.rb
 index 0e4160e..3ac0109 100755
 --- a/examples/port_scanner/port_scanner.rb
 +++ b/examples/port_scanner/port_scanner.rb
 @@ -25,6 +25,8 @@ class PortScanner
  # puts "#{port} closed"
 rescue Async::TimeoutError
 puts "#{port} timeout"
 +  rescue SystemCallError => e
 +    puts "#{port} #{e.message}"

 async def start(timeout = 1.0)

Updated by ioquatix (Samuel Williams) 10 months ago

normalperson (Eric Wong) that's awesome, great effort! I really appreciate you taking these PRs seriously and the effort you are putting into it.

I'd love to have a systematic comparison. Honestly, the port scanner isn't a great benchmark because port scanning is either limited by system resources (scanning localhost) or heavily network bound (scanning remote system).

Considering that async is pure ruby on a single thread, 3x slower is actually pretty good. If we merge this PR (Thread.selector), I can probably improve performance quite a bit.

Your implementation of Threadlet could be easily implemented in terms of the PR suggested here. But this PR allows flexibility for other implementations (like NIO4r). I still think that flexibility is pretty important, especially considering it will allow JRuby to implement it without too many changes.

The benefit of stackful coroutines is they make blocking completely transparent. If you look at the go code, it's a lot more complex, requires synchronisation, etc. The feedback I had about async was it was super easy to use. My priority is good API followed by performance.

Honestly, I really liked the performance of the go code, and the design is really great, but I don't think it's good for general purpose computing. Most people will find the required locking/synchronisation too complex.

Updated by ioquatix (Samuel Williams) 10 months ago

Some of the benefits of this PR are:

  • Makes it possible to implement different models for concurrency.
  • Easy to implement by other Ruby implementations.
  • Baseline interface for implementing scheduling, on which other schedulers can be constructed.
  • Minimal changes to MRI code (less bugs/less maintenance).
  • Doesn't introduce any new class, only one new public attr.

Updated by funny_falcon (Yura Sokolov) 10 months ago

Just remark: make test example to use Fiber.transfer.
If patch will be accepted, example will be copied, and usage of Fiber.yield is not composable with Enumerator.

Updated by ioquatix (Samuel Williams) 10 months ago

Are you saying that calling Fiber.yield is not valid within an enumerator?

Updated by funny_falcon (Yura Sokolov) 10 months ago

Yes. While usually Enumerator doesn't call to Fiber.yield, it is called if Enumerator is used as external iterator:

> def aga; yield 1; Fiber.yield 4; yield 8; end
> to_enum(:aga).to_a
Traceback (most recent call last):
        6: from /usr/bin/irb:11:in `<main>'
        5: from (irb):76
        4: from (irb):76:in `to_a'
        3: from (irb):76:in `each'
        2: from (irb):68:in `aga'
        1: from (irb):68:in `yield'
FiberError (can't yield from root fiber)
> e = to_enum(:aga)
=> #<Enumerator: main:aga>
=> 1
=> 4
=> 8
> [:a,:b,:c]
=> [[:a, 1], [:b, 4], [:c, 8]]

Updated by ioquatix (Samuel Williams) 10 months ago

For the first case, you naturally can't call Fiber.yield in that context... but this works:

#!/usr/bin/env ruby

require 'fiber'

fiber = do
    def aga; yield 1; Fiber.yield 4; yield 8; end
    puts to_enum(:aga).to_a

value = fiber.resume
puts "Got #{value}"
while fiber.alive?

# Prints:
# Got 4
# 1
# 8

This also works:

#!/usr/bin/env ruby

require 'fiber'

fiber = do
    def aga; yield 1; Fiber.yield 4; yield 8; end
    e = to_enum(:aga)


value = fiber.resume
puts "Got #{value.inspect}"
while fiber.alive?

# 1
# 4
# 8
# Got nil

The semantics of Fiber.yield from within the enumerator block depend on how it's being used. That's not something I was aware of. I see that it fails if you try to do this:

#!/usr/bin/env ruby

$LOAD_PATH << File.expand_path("../../lib", __dir__)

require 'async'
require 'async/io/stream'
require 'async/io/host_endpoint'
require 'async/io/protocol/line'

class Lines < Async::IO::Protocol::Line
    def each
        return to_enum unless block_given?

        while line = read_line
            yield line

input =$stdin)
) do
    # This works:
    # input.each do |line|
    #   puts "... #{line}"
    # end

    # This doesn't:
    enumerator = input.each
    while line =
        puts "... #{line}"

I can imagine, if you don't know the underlying task is asynchronous, that this might cause some frustration, fortunately the error is reasonably clear in this instance.

The problem is the nested fiber is not created in an Async::Task. I believe that with this PR in place, it would be possible to avoid this, because ALL Fiber created on the thread with a selector is asynchronous, so it shouldn't be a problem. I am interested in working further on this PR, and this is an interesting test case. Perhaps I will see if it can work or not.

Updated by funny_falcon (Yura Sokolov) 10 months ago

I've shown to_enum(:aga).to_a to present the place where I wasn't right.

But if you look at your own second example, you will see that it doesn't do what it should do
(if Fiber.yield is replaced with yield point of your scheduler, for example, with task.sleep(0.01) or,
because yield point should not affect`.

And you've already shown this in third example.

Scheduler should use Fiber.transfer, because it is really scheduling primitive in its nature,
and because it is almost unknown and almost nowhere used.

Updated by ioquatix (Samuel Williams) 10 months ago

Yes, I agree with what you say, and I agree with your conclusion, I was just giving an example where it failed with async which highlights the issue :)

Updated by ioquatix (Samuel Williams) 10 months ago

I have updated the PR to use transfer in the scheduler, and I've added an example showing that it is composable with Enumerator.

Also available in: Atom PDF