Feature #14736

Thread selector for flexible cooperative fiber based concurrency

Added by ioquatix (Samuel Williams) about 2 months ago. Updated 11 days ago.

Target version:


Ruby concurrency can be greatly enhanced by concurrent, deterministic IO.

Fibers have been shown many times to be a great abstraction for this purpose. The retain normal code flow and don't require any kind of Thread synchronisation. They are enjoyable to write code with because you don't have to concern yourself with thread synchronisation or other complicated issues.

The basic idea is that if some operation would block, it yields the Fiber, and other Fibers within the thread can continue to execute.

There are a number of ways to implement this. Here is a proof of concept to amend the existing rb_io_wait_readable/rb_io_wait_writable.

This design minimally affects the Ruby implementation and allows flexibility for selector implementation. With a small amount of work, we can support EventMachine (65 million downloads), NIO4r (21 million downloads). It would be trivial to back port.

This PR isn't complete but I am seeking feedback. If it's a good idea, I will do my best to see it through to completion, including support for EventMachine and NIO4r.

port_scanner_threadlet.rb (925 Bytes) port_scanner_threadlet.rb normalperson (Eric Wong), 06/13/2018 01:03 AM


#2 [ruby-core:87477] Updated by ioquatix (Samuel Williams) 12 days ago

I've been playing around with port scanners. Implemented in Go (goroutines), Python (asyncio) and Ruby (async).

I wrote up the results here:

It was just an attempt to gauge the performance of the different implementations. It's by no means an authoritative comparison.

What I found interesting was that Ruby (async) was faster than Python (async) by about 2x. Go was faster again by about 2x. However, Go can use multiple CPU cores, and so because it utilised ~5 hardware threads, it was in effect about 10x faster.

I found that quite fascinating.

I don't believe we can easily adopt a model like goroutines in Ruby. However, it might be possible to adapt some of the good ideas from it.

#3 [ruby-core:87483] Updated by normalperson (Eric Wong) 11 days ago wrote:

I've been playing around with port scanners. Implemented in Go
(goroutines), Python (asyncio) and Ruby (async).

I wrote up the results here:

Attached is the implementation for Threadlet/auto-fiber/wachamacallit
rebased against ruby trunk r63641:

On a busy Linux VM, Threadlet was close to your Go implementation
in speed (timing results were unstable, however) and Ruby async
was around 3x slower behind (even with timing instabilities).

I kept on getting errors with the Python3 version
("Event loop is closed") so I never let it finish

I needed to deal with EPIPE because the system I tested on had RDS (16385)
enabled in the kernel which was triggering EPIPE (I don't know Go or Python):

 diff --git a/examples/port_scanner/port_scanner.go b/examples/port_scanner/port_scanner.go
 index 45f2d1c..ad0f049 100755
 --- a/examples/port_scanner/port_scanner.go
 +++ b/examples/port_scanner/port_scanner.go
 @@ -55,7 +55,7 @@ func checkPortOpen(ip string, port int, timeout time.Duration) {
        } else if strings.Contains(err.Error(), "refused") {
            // fmt.Println(port, "closed", err.Error())
        } else {
 -          panic(err)
 +          fmt.Println(port, "err", err.Error())
 diff --git a/examples/port_scanner/ b/examples/port_scanner/
 index 372f0b3..ca9d41a 100755
 --- a/examples/port_scanner/
 +++ b/examples/port_scanner/
 @@ -22,6 +22,8 @@ class PortScanner:
  # print("{} closed".format(port))
 except asyncio.TimeoutError:
 print("{} timeout".format(port))
 +            except SystemError:
 +                print("{} error".format(port))

 def start(self, timeout=1.0):
 diff --git a/examples/port_scanner/port_scanner.rb b/examples/port_scanner/port_scanner.rb
 index 0e4160e..3ac0109 100755
 --- a/examples/port_scanner/port_scanner.rb
 +++ b/examples/port_scanner/port_scanner.rb
 @@ -25,6 +25,8 @@ class PortScanner
  # puts "#{port} closed"
 rescue Async::TimeoutError
 puts "#{port} timeout"
 +  rescue SystemCallError => e
 +    puts "#{port} #{e.message}"

 async def start(timeout = 1.0)

Also available in: Atom PDF