Project

General

Profile

Bug #4844 ยป pool.rb

emakris (Ernie Makrkis), 06/07/2011 11:19 PM

 
# -*- encoding: utf-8 -*-
require 'resque'
require 'resque/pool/version'
require 'resque/pool/logging'
require 'resque/pool/pooled_worker'
require 'fcntl'
require 'yaml'

module Resque
class Pool
include Logging
attr_reader :config
attr_reader :workers

# CONSTANTS {{{
SIG_QUEUE_MAX_SIZE = 5
DEFAULT_WORKER_INTERVAL = 5
QUEUE_SIGS = [ :QUIT, :INT, :TERM, :USR1, :USR2, :CONT, :HUP, :WINCH, ]
CHUNK_SIZE=(16 * 1024)
# }}}

def initialize(config)
init_config(config)
@workers = {}
procline "(initialized)"
end

# Config: after_prefork {{{

# The `after_prefork` hook will be run in workers if you are using the
# preforking master worker to save memory. Use this hook to reload
# database connections and so forth to ensure that they're not shared
# among workers.
#
# Call with a block to set the hook.
# Call with no arguments to return the hook.
def self.after_prefork(&block)
block ? (@after_prefork = block) : @after_prefork
end

# Set the after_prefork proc.
def self.after_prefork=(after_prefork)
@after_prefork = after_prefork
end

def call_after_prefork!
self.class.after_prefork && self.class.after_prefork.call
end

# }}}
# Config: class methods to start up the pool using the default config {{{

@config_files = ["resque-pool.yml", "config/resque-pool.yml"]
class << self; attr_accessor :config_files; end
def self.choose_config_file
if ENV["RESQUE_POOL_CONFIG"]
ENV["RESQUE_POOL_CONFIG"]
else
@config_files.detect { |f| File.exist?(f) }
end
end

def self.run
if GC.respond_to?(:copy_on_write_friendly=)
GC.copy_on_write_friendly = true
end
Resque::Pool.new(choose_config_file).start.join
end

# }}}
# Config: load config and config file {{{

def init_config(config)
unless config
raise ArgumentError,
"No configuration found. Please setup config/resque-pool.yml"
end
if config.kind_of? String
@config_file = config.to_s
else
@config = config.dup
end
load_config
end

def load_config
@config_file and @config = YAML.load_file(@config_file)
environment and @config[environment] and config.merge!(@config[environment])
config.delete_if {|key, value| value.is_a? Hash }
end

def environment
if defined? Rails
Rails.env
else
ENV['RACK_ENV'] || ENV['RAILS_ENV'] || ENV['RESQUE_ENV']
end
end

# }}}

# Sig handlers and self pipe management {{{

def self_pipe; @self_pipe ||= [] end
def sig_queue; @sig_queue ||= [] end

def init_self_pipe!
self_pipe.each { |io| io.close rescue nil }
self_pipe.replace(IO.pipe)
self_pipe.each { |io| io.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC) }
end

def init_sig_handlers!
QUEUE_SIGS.each { |sig| trap_deferred(sig) }
trap(:CHLD) { |_| awaken_master }
end

def awaken_master
begin
self_pipe.last.write_nonblock('.') # wakeup master process from select
rescue Errno::EAGAIN, Errno::EINTR
# pipe is full, master should wake up anyways
retry
end
end

# defer a signal for later processing in #join (master process)
def trap_deferred(signal)
trap(signal) do |sig_nr|
if sig_queue.size < SIG_QUEUE_MAX_SIZE
sig_queue << signal
awaken_master
else
log "ignoring SIG#{signal}, queue=#{sig_queue.inspect}"
end
end
end

def reset_sig_handlers!
QUEUE_SIGS.each {|sig| trap(sig, "DEFAULT") }
end

def handle_sig_queue!
case signal = sig_queue.shift
when :USR1, :USR2, :CONT
log "#{signal}: sending to all workers"
signal_all_workers(signal)
when :HUP
log "HUP: reload config file"
load_config
maintain_worker_count
when :WINCH
log "WINCH: gracefully stopping all workers"
@config = {}
maintain_worker_count
when :QUIT
log "QUIT: graceful shutdown, waiting for children"
signal_all_workers(:QUIT)
reap_all_workers(0) # will hang until all workers are shutdown
:break
when :INT
log "INT: immediate shutdown (graceful worker shutdown)"
signal_all_workers(:QUIT)
:break
when :TERM
log "TERM: immediate shutdown (and immediate worker shutdown)"
signal_all_workers(:TERM)
:break
end
end

# }}}
# start, join, and master sleep {{{

def start
procline("(starting)")
init_self_pipe!
init_sig_handlers!
maintain_worker_count
procline("(started)")
log "**** started master at PID: #{Process.pid}"
log "**** Pool contains PIDs: #{all_pids.inspect}"
self
end

def join
loop do
reap_all_workers
break if handle_sig_queue! == :break
if sig_queue.empty?
master_sleep
monitor_memory_usage
maintain_worker_count
end
procline("managing #{all_pids.inspect}")
end
procline("(shutting down)")
#stop # gracefully shutdown all workers on our way out
log "**** master complete"
#unlink_pid_safe(pid) if pid
end

def master_sleep
begin
ready = IO.select([self_pipe.first], nil, nil, 1) or return
ready.first && ready.first.first or return
loop { self_pipe.first.read_nonblock(CHUNK_SIZE) }
rescue Errno::EAGAIN, Errno::EINTR
end
end

# }}}
# worker process management {{{

def reap_all_workers(waitpid_flags=Process::WNOHANG)
begin
loop do
wpid, status = Process.waitpid2(-1, waitpid_flags)
wpid or break
worker = delete_worker(wpid)
# TODO: close any file descriptors connected to worker, if any
log "** reaped #{status.inspect}, worker=#{worker.queues.join(",")}"
end
rescue Errno::EINTR
retry
rescue Errno::ECHILD
end
end

def delete_worker(pid)
worker = nil
workers.detect do |queues, pid_to_worker|
worker = pid_to_worker.delete(pid)
end
worker
end

def all_pids
workers.map {|q,workers| workers.keys }.flatten
end

def signal_all_workers(signal)
all_pids.each do |pid|
Process.kill signal, pid
end
end

def memory_usage(pid)
smaps_filename = "/proc/#{pid}/smaps"
#Grab actual memory usage from proc in MB
begin
mem_usage = `
if [ -f #{smaps_filename} ];
then
grep Private_Dirty #{smaps_filename} | awk '{s+=$2} END {printf("%d", s/1000)}'
else echo "0"
fi
`.to_i
rescue Errno::EINTR
retry
end
end
def process_exists?(pid)
begin
ps_line = `ps -p #{pid} --no-header`
rescue Errno::EINTR
retry
end
!ps_line.nil? && ps_line.strip != ''
end

def hard_kill_workers
@term_workers ||= []
#look for workers that didn't terminate
@term_workers.delete_if {|pid| !process_exists?(pid)}
#send the rest a -9
@term_workers.each {|pid| `kill -9 #{pid}`}
end

def add_killed_worker(pid)
@term_workers ||= []
@term_workers << pid if pid
end

def monitor_memory_usage
#only check every minute
if @last_mem_check.nil? || @last_mem_check < Time.now - 60
hard_kill_workers

all_pids.each do |pid|

total_usage = memory_usage(pid)
child_pid = find_child_pid(pid)
total_usage += memory_usage(child_pid) if child_pid
if total_usage > 250
log "Terminating worker #{pid} for using #{total_usage}MB memory"
stop_worker(pid)
elsif total_usage > 200
log "Gracefully shutting down worker #{pid} for using #{total_usage}MB memory"
stop_worker(pid, :QUIT)
end

end

@last_mem_check = Time.now
end
end

def hostname
begin
@hostname ||= `hostname`.strip
rescue Errno::EINTR
retry
end
end

def stop_worker(pid, signal=:TERM)
begin
worker = Resque.working.find do |w|
host, worker_pid, queues = w.id.split(':')
w if worker_pid.to_i == pid.to_i && host == hostname
end
if worker
encoded_job = worker.job
verb = signal == :QUIT ? 'Graceful' : 'Forcing'
total_time = Time.now - Time.parse(encoded_job['run_at']) rescue 0
log "#{verb} shutdown while processing: #{encoded_job} -- ran for #{'%.2f' % total_time}s"
end

Process.kill signal, pid
if signal == :TERM
add_killed_worker(pid)
add_killed_worker(find_child_pid(pid))
end
rescue Errno::EINTR
retry
end
end

def find_child_pid(parent_pid)
begin
p = `ps --ppid #{parent_pid} -o pid --no-header`.to_i
p == 0 ? nil : p
rescue Errno::EINTR
retry
end
end

def orphaned_worker_count
if @last_orphaned_check.nil? || @last_orphaned_check < Time.now - 60
if @orphaned_pids.nil?
printf_line = '%d %d\n'
begin
pids_with_parents = `ps -Af | grep resque | grep -v grep | grep -v resque-web | grep -v master | awk '{printf("%d %d\\n", $2, $3)}'`.split("\n")
rescue Errno::EINTR
retry
end
pids = pids_with_parents.collect {|x| x.split[0].to_i}
parents = pids_with_parents.collect {|x| x.split[1].to_i}
pids.delete_if {|x| parents.include?(x)}
pids.delete_if {|x| all_pids.include?(x)}
@orphaned_pids = pids
elsif @orphaned_pids.size > 0
@orphaned_pids.delete_if do |pid|
begin
ps_out = `ps --no-heading p #{pid}`
ps_out.nil? || ps_out.strip == ''
rescue Errno::EINTR
retry
end
end
end
@last_orphaned_check = Time.now
log "Current orphaned pids: #{@orphaned_pids}" if @orphaned_pids.size > 0
end
@orphaned_pids.size
end

# }}}
# ???: maintain_worker_count, all_known_queues {{{

def maintain_worker_count
orphaned_offset = orphaned_worker_count / all_known_queues.size
all_known_queues.each do |queues|
delta = worker_delta_for(queues) - orphaned_offset
spawn_missing_workers_for(queues, delta) if delta > 0
quit_excess_workers_for(queues, delta) if delta < 0
end
end

def all_known_queues
config.keys | workers.keys
end

# }}}
# methods that operate on a single grouping of queues {{{
# perhaps this means a class is waiting to be extracted

def spawn_missing_workers_for(queues, delta)
delta.times { spawn_worker!(queues) } if delta > 0
end

def quit_excess_workers_for(queues, delta)
if delta < 0
queue_pids = pids_for(queues)
if queue_pids.size >= delta.abs
queue_pids[0...delta.abs].each {|pid| Process.kill("QUIT", pid)}
else
queue_pids.each {|pid| Process.kill("QUIT", pid)}
end
end
end

def worker_delta_for(queues)
config.fetch(queues, 0) - workers.fetch(queues, []).size
end

def pids_for(queues)
workers[queues].keys
end

def spawn_worker!(queues)
worker = create_worker(queues)
pid = fork do
log "*** Starting worker #{worker}"
call_after_prefork!
reset_sig_handlers!
#self_pipe.each {|io| io.close }
begin
worker.work(ENV['INTERVAL'] || DEFAULT_WORKER_INTERVAL) # interval, will block
rescue Errno::EINTR
log "Caught interrupted system call Errno::EINTR. Retrying."
retry
end
end
workers[queues] ||= {}
workers[queues][pid] = worker
end

def create_worker(queues)
queues = queues.to_s.split(',')
worker = PooledWorker.new(*queues)
worker.verbose = ENV['LOGGING'] || ENV['VERBOSE']
worker.very_verbose = ENV['VVERBOSE']
worker
end

# }}}

end
end
    (1-1/1)