# frozen_string_literal: true
#
# Reproduction of the gvltools RESUMED-hook illegal-allocation crash on
# ruby_master (4.1.0dev) via the fiber scheduler
#
# Tunables: REACTORS FIBERS PRODUCERS ALLOC MAXTIME

require "async"
require "gvltools"

REACTORS  = Integer(ENV.fetch("REACTORS", 8))    # pre-existing reactor threads (each its own scheduler)
FIBERS    = Integer(ENV.fetch("FIBERS", 32))      # fibers per reactor, each parked on a queue pop
PRODUCERS = Integer(ENV.fetch("PRODUCERS", 16))   # threads pushing -> cross-thread #unblock storm
ALLOC     = Integer(ENV.fetch("ALLOC", 400))
MAXTIME   = Float(ENV.fetch("MAXTIME", 5))
HEARTBEAT = ENV.fetch("HEARTBEAT", "repro.heartbeat")

$ops = 0
$ready = 0
stop = false

# Heavy, varied allocation -> GC pressure + fast slot reuse of any object freed
# while a reactor thread was mid-resume with a stale machine stack.
def churn(n)
  a = []
  i = 0
  while i < n
    a << (+"x" * ((i & 31) + 8))
    a << [i, i + 1, i + 2]
    a << { i => i.to_s }
    i += 1
  end
  a.length
end

# One queue per reactor; fibers in reactor r block on queues[r].pop.
queues = REACTORS.times.map { Thread::Queue.new }
mutex  = Mutex.new

now = -> { Process.clock_gettime(Process::CLOCK_MONOTONIC) }

heartbeat = Thread.new do
  until stop
    File.write(HEARTBEAT, "ops=#{$ops} ready=#{$ready} t=#{now.call.round(3)}\n")
    sleep 0.1
  end
end

# 1. PRE-EXISTING reactor threads (created BEFORE gvltools enable -> no STARTED
#    event). Each runs Async's default scheduler with FIBERS fibers parked on a
#    Thread::Queue#pop (-> Scheduler#block, reactor parks in the selector).
reactors = REACTORS.times.map do |r|
  Thread.new do
    q = queues[r]
    Async do |task|
      mutex.synchronize { $ready += 1 }
      FIBERS.times do
        task.async do
          until stop
            q.pop                # blocks via the fiber scheduler
            churn(ALLOC)
            $ops += 1
          end
        end
      end
    end
  end
end

# Wait until every reactor's fibers are set up and parked on the selector.
sleep 0.01 until mutex.synchronize { $ready } == REACTORS
sleep 0.1

# 2. start_metrics_thread: enable gvltools while all reactor threads are parked.
GVLTools::LocalTimer.enable
GVLTools::GlobalTimer.enable
GVLTools::WaitingThreads.enable

metrics = Thread.new do
  until stop
    before = GVLTools::GlobalTimer.monotonic_time
    _ = GVLTools::GlobalTimer.monotonic_time - before
    _ = GVLTools::WaitingThreads.count
    churn(ALLOC / 4)
  end
end

# 3. Producer threads (also pre-existing): push into the queues from OTHER
#    threads -> Scheduler#unblock storm -> reactor threads bounce the GVL.
producers = PRODUCERS.times.map do |p|
  Thread.new do
    r = p % REACTORS
    until stop
      queues[r].push(1)
      r = (r + 1) % REACTORS
      churn(ALLOC / 16)
    end
  end
end

deadline = now.call + MAXTIME
churn(ALLOC) while now.call < deadline

stop = true
REACTORS.times { |r| FIBERS.times { queues[r].push(0) } }  # let parked pops return so fibers can exit
threads = reactors + producers + [metrics, heartbeat]
threads.each { |t| t.join rescue nil }
puts "survived ops=#{$ops} reactors=#{REACTORS} fibers=#{FIBERS}"
