|
# frozen_string_literal: true
|
|
#
|
|
# Reproduction of the gvltools RESUMED-hook illegal-allocation crash on
|
|
# ruby_master (4.1.0dev) via the fiber scheduler
|
|
#
|
|
# Tunables: REACTORS FIBERS PRODUCERS ALLOC MAXTIME
|
|
|
|
require "async"
|
|
require "gvltools"
|
|
|
|
REACTORS = Integer(ENV.fetch("REACTORS", 8)) # pre-existing reactor threads (each its own scheduler)
|
|
FIBERS = Integer(ENV.fetch("FIBERS", 32)) # fibers per reactor, each parked on a queue pop
|
|
PRODUCERS = Integer(ENV.fetch("PRODUCERS", 16)) # threads pushing -> cross-thread #unblock storm
|
|
ALLOC = Integer(ENV.fetch("ALLOC", 400))
|
|
MAXTIME = Float(ENV.fetch("MAXTIME", 5))
|
|
HEARTBEAT = ENV.fetch("HEARTBEAT", "repro.heartbeat")
|
|
|
|
$ops = 0
|
|
$ready = 0
|
|
stop = false
|
|
|
|
# Heavy, varied allocation -> GC pressure + fast slot reuse of any object freed
|
|
# while a reactor thread was mid-resume with a stale machine stack.
|
|
def churn(n)
|
|
a = []
|
|
i = 0
|
|
while i < n
|
|
a << (+"x" * ((i & 31) + 8))
|
|
a << [i, i + 1, i + 2]
|
|
a << { i => i.to_s }
|
|
i += 1
|
|
end
|
|
a.length
|
|
end
|
|
|
|
# One queue per reactor; fibers in reactor r block on queues[r].pop.
|
|
queues = REACTORS.times.map { Thread::Queue.new }
|
|
mutex = Mutex.new
|
|
|
|
now = -> { Process.clock_gettime(Process::CLOCK_MONOTONIC) }
|
|
|
|
heartbeat = Thread.new do
|
|
until stop
|
|
File.write(HEARTBEAT, "ops=#{$ops} ready=#{$ready} t=#{now.call.round(3)}\n")
|
|
sleep 0.1
|
|
end
|
|
end
|
|
|
|
# 1. PRE-EXISTING reactor threads (created BEFORE gvltools enable -> no STARTED
|
|
# event). Each runs Async's default scheduler with FIBERS fibers parked on a
|
|
# Thread::Queue#pop (-> Scheduler#block, reactor parks in the selector).
|
|
reactors = REACTORS.times.map do |r|
|
|
Thread.new do
|
|
q = queues[r]
|
|
Async do |task|
|
|
mutex.synchronize { $ready += 1 }
|
|
FIBERS.times do
|
|
task.async do
|
|
until stop
|
|
q.pop # blocks via the fiber scheduler
|
|
churn(ALLOC)
|
|
$ops += 1
|
|
end
|
|
end
|
|
end
|
|
end
|
|
end
|
|
end
|
|
|
|
# Wait until every reactor's fibers are set up and parked on the selector.
|
|
sleep 0.01 until mutex.synchronize { $ready } == REACTORS
|
|
sleep 0.1
|
|
|
|
# 2. start_metrics_thread: enable gvltools while all reactor threads are parked.
|
|
GVLTools::LocalTimer.enable
|
|
GVLTools::GlobalTimer.enable
|
|
GVLTools::WaitingThreads.enable
|
|
|
|
metrics = Thread.new do
|
|
until stop
|
|
before = GVLTools::GlobalTimer.monotonic_time
|
|
_ = GVLTools::GlobalTimer.monotonic_time - before
|
|
_ = GVLTools::WaitingThreads.count
|
|
churn(ALLOC / 4)
|
|
end
|
|
end
|
|
|
|
# 3. Producer threads (also pre-existing): push into the queues from OTHER
|
|
# threads -> Scheduler#unblock storm -> reactor threads bounce the GVL.
|
|
producers = PRODUCERS.times.map do |p|
|
|
Thread.new do
|
|
r = p % REACTORS
|
|
until stop
|
|
queues[r].push(1)
|
|
r = (r + 1) % REACTORS
|
|
churn(ALLOC / 16)
|
|
end
|
|
end
|
|
end
|
|
|
|
deadline = now.call + MAXTIME
|
|
churn(ALLOC) while now.call < deadline
|
|
|
|
stop = true
|
|
REACTORS.times { |r| FIBERS.times { queues[r].push(0) } } # let parked pops return so fibers can exit
|
|
threads = reactors + producers + [metrics, heartbeat]
|
|
threads.each { |t| t.join rescue nil }
|
|
puts "survived ops=#{$ops} reactors=#{REACTORS} fibers=#{FIBERS}"
|