Project

General

Profile

Bug #3523 ยป test_em_fiber4f.rb

Program to reproduce c0000029 crash (depends on EventMachine extension) - spatulasnout (B Kelly), 07/15/2010 09:45 PM

 
#!/usr/bin/env ruby
#

########
######## c0000029 error:
######## ---------------
########
######## This file reproduces a c0000029 crash on
######## ruby 1.9.2dev (2010-07-14) [i386-mswin32_100]
######## (svn revision 28648)
########
######## $ svn co http://svn.ruby-lang.org/repos/ruby/branches/ruby_1_9_2
######## $ win32\configure.bat --prefix=m:/dev/ruby-build/v1_9_2-pure --program-suffix=19 --disable-install-doc
########
######## Note, the crash does NOT occur on trunk (1.9.3dev),
######## nor does the crash occur if cont.c is back-ported
######## from trunk to 1.9.2dev.
########
######## The one non-core dependency is the EventMachine
######## library. (NOTE: see below regarding EventMachine build.)
########
####################
########
######## There are two comment blocks below marked c0000029 (A) and
######## c0000029 (B).
########
######## With both (A) and (B) enabled, the crash occurs when the
######## program exits.
######## With (A) disabled, but (B) enabled, the crash does not occur.
######## With (A) enabled, but (B) disabled, the crash happens sooner.
######## (Both (A) and (B) disabled is not a valid combination.)
########
######## My OS version is Windows 7 - 64bit.
########
####################
########
######## Notes on building EventMachine:
########
######## The EventMachine version was pulled from git on 2010-07-15.
########
######## $ git clone git://github.com/eventmachine/eventmachine
########
######## NOTE: In order to build EventMachine on windows with
######## the Visual Studio compiler, I needed to add #undef's
######## for max and min in the file eventmachine/ext/ed.cpp:
########
######## diff --git a/ext/ed.cpp b/ext/ed.cpp
######## index 06c8f6c..cacb76b
######## --- a/ext/ed.cpp
######## +++ b/ext/ed.cpp
######## @@ -19,6 +19,10 @@ See the file COPYING for complete licensing information.
########
######## #include "project.h"
########
######## +#ifdef OS_WIN32
######## +#undef min
######## +#undef max
######## +#endif
########
########
######## /********************
########

require 'eventmachine'
require 'socket' # for unpack_sockaddr_in
require 'fiber'


module ZZZZ end

class ZZZZ::Logger
LOGLEVEL_ERROR = 0
LOGLEVEL_WARN = 1
LOGLEVEL_INFO = 2
LOGLEVEL_DEBUG = 3
attr_accessor :loglevel_debug, :loglevel_info, :logio, :prefix
def initialize
@loglevel_info = true
@loglevel_debug = false
@logio = nil
@prefix = ""
end

def loglevel=(lev)
@loglevel_info = (lev >= LOGLEVEL_INFO)
@loglevel_debug = (lev >= LOGLEVEL_DEBUG)
end

def info(msg, options={})
log("[INFO]", msg, options) if @loglevel_info
end

def warn(msg, options={})
log("[WARN]", msg, options)
end
def error(msg, options={})
log("[ERR!]", msg, options)
end
def dbg(msg, options={})
log("[DBG_]", msg, options) if @loglevel_debug
end

protected

def log(loglevel_tag, msg, options={})
tstamp = Time.now.strftime("%Y-%m-%d %H:%M:%S.%L %a")
pid_str = "[%05d:%08d]" % [Process.pid, Fiber.current.object_id]
annotation = options[:annotation] || ""
line = "[#{tstamp}] #{pid_str} #{loglevel_tag} #{@prefix}#{annotation}#{msg}"
writelog(line)
end
def writelog(msg)
$stdout.puts msg
end
end


module ZZZZ
module PROTO

class RPCException < RuntimeError; end
class RPCNotConnectedError < RPCException; end

class RPC < EventMachine::Connection
RPC_VERSION = 1

# Optional fiber support:
# Call once from EventMachine's fiber, for example,
# at the beginning of EM.run { }
def self.em_fiber_init
@em_fiber = Fiber.current
# @fiberpool = ZZZZ::FiberPool.new
end

def self.em_fiber
@em_fiber
end
def self.run_in_fiber(&block)
# @fiberpool.run_in_fiber(&block)
nextf = Fiber.current
newf = Fiber.new do
block.call
nextf.transfer
end
newf.transfer
end

def initialize(*args)
# warn "#{self.class.name} initialize"
super(*args)
@rpc_recv_state = :st_recv_handshake
end

def post_init
super
@remote_port, @remote_ip = (Socket.unpack_sockaddr_in(self.get_peername) rescue [0,"0.0.0.0"])
end
def rpc_owner_init(owner, logger)
@rpc_owner = owner
@logger = logger
end

def rpc_initiate_handshake
send_data "X"
end

def unbind
@logger.dbg("#{self.class.name} unbind") if @logger
@rpc_recv_state = :st_recv_null
@rpc_owner.rpc_connection_unbind(self) if @rpc_owner
end
def receive_data(dat)
@rpc_recv_state = :st_recv_rpc_run
@rpc_owner.rpc_connection_established(self)
end

def rpc_connected?
@rpc_recv_state == :st_recv_rpc_run
end
def rpc_send_frame
end

def rpc_have_frame_to_send?
@rpc_pf_out_dirty
end

def rpc_flush
rpc_send_frame if rpc_have_frame_to_send?
end

def rpc_append_msg(*msg)
end

private

def st_recv_null
# no-op: disconnected state
end
def dispatch_rpc_messages(messages)
end
end


module RPCMessageDispatcher
MSGPORT_ROOT = 0
def init_message_dispatch(logger, &root_handler)
logger.dbg("#{self.class.name} - init_message_dispatch: root_handler=#{root_handler.inspect}")
@@id2rpc ||= {}
@msg_portmap = {}
@dispatch_complete_callback = nil
self.rpc_bind_reply_port(MSGPORT_ROOT, root_handler)
self.rpc_owner_init(self, logger)
self.rpc_initiate_handshake
end
def rpc_bind_reply_port(reply_port_num, reply_proc)
@msg_portmap[reply_port_num] = reply_proc
end
def rpc_unbind_reply_port(reply_port_num)
@msg_portmap.delete reply_port_num
end

protected

# called by RPCClientConn when RPC protocol handshake complete
def rpc_connection_established(rpc)
@logger.dbg("#{self.class.name} - rpc_connection_established")
@@id2rpc[self.object_id] = self
rpc_dispatch(MSGPORT_ROOT, MSGPORT_ROOT, :rpc_begin)
rpc_flush
end
# called by RPCClientConn when RPC connection terminates
def rpc_connection_unbind(rpc)
@logger.dbg("#{self.class.name} - rpc_connection_unbind")
rpc_dispatch(MSGPORT_ROOT, MSGPORT_ROOT, :rpc_unbind)
@@id2rpc.delete self.object_id
end

def rpc_dispatch(from_remote_port, to_local_port, msgname, *args)
@logger.dbg("#{self.class.name} - rpc_dispatch: from=#{from_remote_port.inspect} to=#{to_local_port.inspect} #{msgname} #{args.inspect}")
########
######## c0000029: (A)
######## -------------
######## Commenting out the run_in_fiber wrapper here makes the
######## crash NOT happen, as long as (B) is enabled below.
########
RPC.run_in_fiber do
handler = @msg_portmap[to_local_port]
fake_replyport = true
@logger.dbg("#{self.class.name} - rpc_dispatch: calling handler: #{handler}")
handler.call(fake_replyport, msgname, *args)
end
end
def rpc_dispatch_complete
# @logger.dbg("#{self.class.name} - rpc_dispatch_complete: have_frame=#{rpc_have_frame_to_send?}")
@dispatch_complete_callback.call if @dispatch_complete_callback
# Flush any rpc's we know of that have pending frames
@@id2rpc.each_value {|r| r.rpc_flush}
end
end


class RPCServerConn < RPC
include RPCMessageDispatcher
end

class RPCClientConn < RPC
include RPCMessageDispatcher
end
end # PROTO
end # ZZZZ



module ZZZZ
module PROTO

class RPCClient
attr_reader :root
attr_accessor :rpc_begin_hook, :rpc_unbind_hook, :root_msg_handler
def initialize(logger)
@logger = logger
@rpc_conn = nil
@root = nil
@rpc_begin_hook = nil
@rpc_unbind_hook = nil
@root_msg_handler = nil
end
def connect_async(rpc_host, rpc_port, &dispatch_complete_hook)
@logger.dbg "#{self.class.name} connect..."
EventMachine::connect(rpc_host, rpc_port, ZZZZ::PROTO::RPCClientConn) do |rpc_conn|
remote_port, remote_ip = (Socket.unpack_sockaddr_in(rpc_conn.get_peername) rescue [0,"0.0.0.0"])
@logger.dbg("#{self.class.name} - rpc conn established - #{remote_ip}:#{remote_port}")

@rpc_conn = rpc_conn
@rpc_conn.init_message_dispatch(@logger, &method(:handle_rpc_root_msg))
if dispatch_complete_hook
@rpc_conn.install_dispatch_complete_hook(&dispatch_complete_hook)
end

@logger.dbg("#{self.class.name} - rpc conn block end")
end
end
def disconnect_async
@root = nil
if @rpc_conn
@rpc_conn.rpc_flush
@rpc_conn.close_connection_after_writing
end
end
def rpc_ready?
!! @root
end
protected

def handle_rpc_root_msg(replyport, msg, *args)
@logger.dbg("#{self.class.name} - handle_rpc_root_msg: #{msg} #{args.inspect}")
case msg
when :rpc_begin then rpc_rpc_begin(replyport)
when :rpc_unbind then rpc_rpc_unbind
else
if @root_msg_handler
@root_msg_handler.call(replyport, msg, *args)
else
raise("unknown message: #{msg}")
end
end
end

# called by RPCMessageDispatcher when ready for first message
def rpc_rpc_begin(remote_root)
@root = remote_root
@rpc_begin_hook.call(@root) if @rpc_begin_hook
end

# called by RPCMessageDispatcher when connection terminates
def rpc_rpc_unbind
@logger.dbg("#{self.class.name} - rpc_unbind")
@root = nil
@rpc_conn = nil
@rpc_unbind_hook.call if @rpc_unbind_hook
end
end

end # PROTO
end # ZZZZ


require 'rbconfig'
require 'test/unit'


module TestEMFiber2

class SyncWaiter
def initialize
@waiters = []
end
# Must not be called on the EventMachine fiber
def waitfor(&testproc)
return if testproc.call # quick test up front
cf = Fiber.current
emf = ZZZZ::PROTO::RPC.em_fiber
raise("SyncWaiter: waitfor called on EventMachine thread") if cf == emf
begin
@waiters << cf
begin
emf.transfer
end until testproc.call
ensure
@waiters.delete cf
end
end
# Typically called from EventMachine fiber
def service_sync_waiters
@waiters.each {|wf| wf.transfer}
end
end


# NOTE: Although we start a server, we only accept
# a single client connection.
#
class MockWindowServer
def initialize(logger)
@logger = logger
@rpc_conn = nil
@client = nil
end

# A port of zero means choose an ephemeral port.
# The port number chosen is the return value.
def start_server(extern_interface_ip, port=0)
s = EventMachine::start_server(extern_interface_ip, port, ZZZZ::PROTO::RPCServerConn) do |client_rpc_conn|
remote_port, remote_ip = Socket.unpack_sockaddr_in(client_rpc_conn.get_peername)

if @rpc_conn
@logger.error("#{self.class.name} - already have client connection, dropping connection from #{remote_ip}:#{remote_port}")
client_rpc_conn.close_connection
else
@logger.dbg("#{self.class.name} - client tcp conn established - #{remote_ip}:#{remote_port}")

@rpc_conn = client_rpc_conn
@rpc_conn.init_message_dispatch(@logger) do |replyport, msg, *args|
handle_root_msg(replyport, msg, *args)
end

@logger.dbg("#{self.class.name} - client tcp conn block end")
end

end
port, iface = Socket.unpack_sockaddr_in(EventMachine.get_sockname(s))
@logger.info("#{self.class.name} server listening on interface #{iface} tcp port #{port} (proto ver #{ZZZZ::PROTO::RPC::RPC_VERSION})")
port
end

private

def handle_root_msg(replyport, msg, *args)
@logger.dbg("#{self.class.name} - handle_root_msg: #{msg} #{args.inspect}")
case msg
when nil then handle_root_error(args)
when :rpc_begin then rpc_begin(replyport)
when :rpc_unbind then rpc_unbind
else raise("unknown message: #{msg}")
end
end

def handle_root_error(errdata)
@logger.error("#{self.class.name} - received error at root: #{errdata.inspect}")
end

# called by RPCMessageDispatcher when ready for first message
def rpc_begin(remote_root)
@client = remote_root
@logger.dbg("#{self.class.name} - rpc_begin: nothing to do, awaiting msg from client...")
end

# called by RPCMessageDispatcher when connection terminates
def rpc_unbind
@logger.dbg("#{self.class.name} - rpc_unbind")
@client = nil
@rpc_conn = nil
end
end

class MockApp
def initialize(logger)
@logger = logger
@waiter = SyncWaiter.new
@vis = nil
@vis_client = ZZZZ::PROTO::RPCClient.new(@logger)
@vis_client.rpc_begin_hook = lambda {|root| @vis = root}
@vis_client.rpc_unbind_hook = lambda {@vis = nil; EventMachine.stop}
end
def waitfor(&testproc)
@waiter.waitfor(&testproc)
end

def startup(vis_host, vis_port)
start_app_timer
@vis_client.connect_async(vis_host, vis_port)
waitfor {@vis}
@logger.dbg("#{self.class.name} - startup: got vis=#{@vis.inspect}")
end

protected
def start_app_timer
EM.add_periodic_timer(0.1) {app_timer_callback}
end

def app_timer_callback
$stderr.puts "[fib=#{Fiber.current.object_id}] app_timer_callback {"
@waiter.service_sync_waiters
$stderr.puts "[fib=#{Fiber.current.object_id}] app_timer_callback }"
end
end


class TestEMFiber2 < Test::Unit::TestCase
VIS_HOST = "127.0.0.1"

def test_em_fiber
@logger = ZZZZ::Logger.new
@logger.prefix = "FibTest: "
@logger.loglevel_debug = true
@logger.loglevel_info = true

vis_logger = @logger.dup
vis_logger.prefix = "MockVis: "

app_logger = @logger.dup
app_logger.prefix = "MockApp: "

EventMachine.run {
ZZZZ::PROTO::RPC.em_fiber_init
@vis_server = MockWindowServer.new(vis_logger)
vis_port = @vis_server.start_server(VIS_HOST, 0)
@app = MockApp.new(app_logger)
########
######## c0000029: (B)
######## -------------
######## Commenting out the run_in_fiber wrapper here makes the
######## crash happen sooner, as long as (A) is enabled above.
########
ZZZZ::PROTO::RPC.run_in_fiber do
warn "huzzah"
@app.startup(VIS_HOST, vis_port)
EventMachine.stop
end
}
end
end # TestEMFiber2

end # module TestEMFiber2


    (1-1/1)