|
require "singleton"
|
|
require "socket"
|
|
|
|
ACCEPT_EVENT = 0x01
|
|
RECEIVE_EVENT = 0x02
|
|
WRITE_EVENT = 0x04
|
|
# Y_EVENT = 0x08
|
|
# Z_EVENT = 0x10
|
|
# Etc.
|
|
|
|
#
|
|
# halfsync-half-async-pattern
|
|
# based on POSA2 book reactor & active object pattern
|
|
# http://www.dre.vanderbilt.edu/~schmidt/PDF/HS-HA.pdf
|
|
# http://www.dre.vanderbilt.edu/~schmidt/PDF/Active-Objects.pdf
|
|
# http://www.dre.vanderbilt.edu/~schmidt/PDF/Reactor.pdf
|
|
# http://www.dre.vanderbilt.edu/~schmidt/PDF/Acc-Con.pdf
|
|
|
|
|
|
class Message
|
|
attr_reader :io_handle, :data
|
|
def initialize(io_handle, data)
|
|
@io_handle = io_handle
|
|
@data = data
|
|
end
|
|
end
|
|
|
|
class EchoServerTask
|
|
def initialize(high_water_mark)
|
|
@synchronized_queue = SynchronizedQueue.new(high_water_mark)
|
|
end
|
|
|
|
def get_synchronized_queue()
|
|
return @synchronized_queue
|
|
end
|
|
def activate(n)
|
|
for i in 1..n
|
|
|
|
Thread.start(i,@synchronized_queue){ |i, synchronized_queue|
|
|
Thread.current["name"] = "EchoServerTask #{i}";self.svc_run(synchronized_queue,i)}
|
|
end
|
|
end
|
|
|
|
def svc_run(synchronized_queue, i)
|
|
puts("Thread #{i} started")
|
|
n = 0
|
|
loop do
|
|
msg_packet = synchronized_queue.get()
|
|
puts("Task received #{msg_packet.inspect} to #{msg_packet.io_handle} as #{msg_packet.class}")
|
|
msg_packet.io_handle.sendmsg(Thread.current.object_id.to_s+":"+"#{n}"+":")
|
|
msg_packet.io_handle.sendmsg(msg_packet.data)
|
|
#msg_packet.io_handle.send(Thread.current.object_id.to_s+":"+"#{n}"+":",0)
|
|
#msg_packet.io_handle.send(msg_packet.data,0)
|
|
n = n+1
|
|
end
|
|
end
|
|
end
|
|
|
|
|
|
# Synchronized Queue
|
|
class SynchronizedQueue
|
|
def initialize(high_water_mark)
|
|
@high_water_mark = high_water_mark
|
|
@list = []
|
|
@mutex = Mutex.new
|
|
@not_empty = ConditionVariable.new()
|
|
@not_full = ConditionVariable.new()
|
|
end
|
|
|
|
def insert(method_request,timeout=nil) #blocking
|
|
@mutex.synchronize {
|
|
while(full_i()==true)
|
|
@not_full.wait(@mutex, timeout)
|
|
#@not_full.wait(@mutex)
|
|
end
|
|
insert_i(method_request)
|
|
}
|
|
end
|
|
|
|
def get(timeout=nil)
|
|
@mutex.synchronize {
|
|
while(empty_i()==true)
|
|
@not_empty.wait(@mutex, timeout)
|
|
end
|
|
return get_i()
|
|
}
|
|
end
|
|
|
|
|
|
private
|
|
def insert_i(method_request) #blocking
|
|
@list << method_request
|
|
@not_empty.signal if @list.length() > 0
|
|
end
|
|
|
|
def get_i()
|
|
element = @list.shift()
|
|
@not_full.signal if @list.length() < @high_water_mark
|
|
return element
|
|
end
|
|
|
|
def empty_i()
|
|
return @list.size()==0
|
|
end
|
|
|
|
def full_i()
|
|
return !(@list.size()< @high_water_mark)
|
|
end
|
|
end
|
|
|
|
# Reactor pattern start
|
|
class EventHandler
|
|
|
|
def initialize()
|
|
@io_handle = nil
|
|
end
|
|
|
|
def handle_accept_event()
|
|
nil # hook method
|
|
end
|
|
|
|
def handle_receive_event()
|
|
nil # hook method
|
|
end
|
|
|
|
def handle_write_event()
|
|
nil # hook method
|
|
end
|
|
|
|
def handle_close_event()
|
|
nil # virtual, hook method
|
|
end
|
|
|
|
def get_handle()
|
|
return @io_handle
|
|
end
|
|
end
|
|
|
|
class EchoAcceptor < EventHandler
|
|
def initialize(port, echo_server_task)
|
|
super()
|
|
@port = port
|
|
@echo_server_task = echo_server_task
|
|
@io_handle = TCPServer.new(port)
|
|
end
|
|
|
|
def handle_accept_event() # called by callback
|
|
connection_handle = @io_handle.accept()
|
|
echo_event_handler_obj = EchoServerHandler.new(connection_handle, @echo_server_task)
|
|
EchoReactor.instance.register_handler(echo_event_handler_obj, RECEIVE_EVENT)
|
|
end
|
|
|
|
def handle_close_event(connection_handle)
|
|
connection_handle.close()
|
|
EchoReactor.instance.remove_handler(connection_handle, ACCEPT_EVENT)
|
|
end
|
|
end
|
|
|
|
class EchoServerHandler < EventHandler
|
|
def initialize(io_handle, echo_server_task)
|
|
@io_handle = io_handle
|
|
@echo_server_task = echo_server_task
|
|
@data = ""
|
|
end
|
|
|
|
def handle_input()
|
|
begin
|
|
mesg, sender_sockaddr, rflags, *controls = @io_handle.recv(100000)
|
|
STDOUT.puts("Received message", mesg.inspect)
|
|
rescue Errno::ECONNRESET,Errno::ECONNABORTED,Errno::ETIMEDOUT=> e
|
|
EchoReactor.instance.remove_handler(@io_handle, RECEIVE_EVENT)
|
|
puts("Connetion abborted")
|
|
return
|
|
end
|
|
if mesg == "" #connection close from peer side
|
|
EchoReactor.instance.remove_handler(@io_handle, RECEIVE_EVENT)
|
|
puts("Connetion closed by peer side")
|
|
return
|
|
elsif mesg == nil
|
|
# lets asume the connection had been disconnected from peer side
|
|
EchoReactor.instance.remove_handler(@io_handle, RECEIVE_EVENT)
|
|
puts("Connetion disconnected by by peer side")
|
|
return
|
|
else
|
|
@data = @data + mesg
|
|
cr = @data.rindex("\r")
|
|
nl = @data.rindex("\n")
|
|
crnl = @data.rindex("\r\n")
|
|
|
|
if cr == nil
|
|
cr = 0
|
|
else
|
|
cr = cr+1
|
|
end
|
|
if nl == nil
|
|
nl = 0
|
|
else
|
|
nl = nl+1
|
|
end
|
|
if crnl == nil
|
|
crnl = 0
|
|
else
|
|
crnl = crnl+2
|
|
end
|
|
n = [cr,nl,crnl].max()
|
|
message = @data[0,n].to_s # determine the message until \r, \n or \r\n
|
|
@data = @data[n,@data.length()].to_s
|
|
if message != ""
|
|
# pack the message
|
|
msg_packet = Message.new(@io_handle,message)
|
|
# put the message into the shared queue
|
|
self.put(msg_packet)
|
|
end
|
|
end
|
|
end
|
|
|
|
def put(message)
|
|
# uses the proxy to put the message into the queue()
|
|
@echo_server_task.get_synchronized_queue.insert(message)
|
|
end
|
|
|
|
def handle_close_event(handle)
|
|
handle.close() # close connection
|
|
end
|
|
end
|
|
|
|
# Echo reactor supports write handles, however this is not used here
|
|
class EchoReactor
|
|
include Singleton
|
|
def initialize()
|
|
@mutex = Mutex.new()
|
|
@accept_callback_hash = Hash.new()
|
|
@receive_callback_hash = Hash.new()
|
|
@write_callback_hash = Hash.new()
|
|
@read_pipe, @write_pipe = IO.pipe #created a pipe for handling updates to the callback hashes
|
|
puts("Generated pipe: read pipe #{@read_pipe}, write pipe #{@write_pipe}")
|
|
end
|
|
|
|
def handle_events()
|
|
loop do
|
|
accept_io_object_list = nil
|
|
receive_io_object_list = nil
|
|
write_io_object_list = nil
|
|
@mutex.synchronize {
|
|
# Determine all regsitered io_opjects for select
|
|
accept_io_object_list = @accept_callback_hash.keys
|
|
receive_io_object_list = @receive_callback_hash.keys
|
|
write_io_object_list = @write_callback_hash.keys
|
|
}
|
|
read_io_object_list, write_io_object_list = SynchronousEventDemultiplexer.instance.select(accept_io_object_list + receive_io_object_list << @read_pipe, write_io_object_list)
|
|
read_io_object_list.each do |read_io_object|
|
|
puts("read_io_object.inspect: #{read_io_object.inspect}")
|
|
puts("#{read_io_object}, #{@accept_callback_hash[read_io_object]},#{@receive_callback_hash[read_io_object]}")
|
|
@accept_callback_hash[read_io_object].handle_accept_event() if @accept_callback_hash[read_io_object] != nil
|
|
@receive_callback_hash[read_io_object].handle_input() if @receive_callback_hash[read_io_object] != nil
|
|
puts("@read_pipe: #{@read_pipe.inspect}")
|
|
puts("read_io_object: #{read_io_object.inspect}")
|
|
if @read_pipe == read_io_object # just get rid of the IO object received and loop as events have been registered or removed
|
|
received = read_io_object.read(1)
|
|
puts("reading io: #{read_io_object}, #{received}")
|
|
end
|
|
end
|
|
write_io_object_list.each do |write_io_object|
|
|
puts("write_io_object.inspect: #{write_io_object.inspect}")
|
|
puts("#{write_io_object}, #{@write_callback_hash[write_io_object]}")
|
|
@write_callback_hash[write_io_object].handle_write_event() if @write_callback_hash[write_io_object] != nil
|
|
puts("write_io_object: #{write_io_object.inspect}")
|
|
end
|
|
end
|
|
end
|
|
|
|
def register_handler(handler_obj, event_type)
|
|
io_object = handler_obj.get_handle()
|
|
@mutex.synchronize {
|
|
@accept_callback_hash[io_object]=handler_obj if (event_type & ACCEPT_EVENT != 0)
|
|
@receive_callback_hash[io_object]=handler_obj if (event_type & RECEIVE_EVENT != 0)
|
|
@write_callback_hash[io_object]=handler_obj if (event_type & WRITE_EVENT != 0)
|
|
@write_pipe.write("a")
|
|
puts("accept callbacks #{@accept_callback_hash.keys}")
|
|
puts("receive callbacks #{@receive_callback_hash.keys}")
|
|
puts("write callbacks #{@write_callback_hash.keys}")
|
|
}
|
|
end
|
|
|
|
def remove_handler(handle, event_type)
|
|
io_object = handle
|
|
@mutex.synchronize {
|
|
puts("deleting Handle: #{handle}")
|
|
@accept_callback_hash.delete(io_object) if (event_type & ACCEPT_EVENT != 0)
|
|
@receive_callback_hash.delete(io_object) if (event_type & RECEIVE_EVENT != 0)
|
|
@write_callback_hash.delete(io_object) if (event_type & WRITE_EVENT != 0)
|
|
@write_pipe.write("d")
|
|
puts("accept callbacks #{@accept_callback_hash.keys}")
|
|
puts("receive callbacks #{@receive_callback_hash.keys}")
|
|
puts("write callbacks #{@write_callback_hash.keys}")
|
|
}
|
|
end
|
|
end
|
|
|
|
# Wrapper fascade pattern:
|
|
class SynchronousEventDemultiplexer
|
|
include Singleton
|
|
def select(read_input_io_object_list, write_input_io_object_list)
|
|
|
|
read_result_array, write_result_array, error_result_array= IO.select(read_input_io_object_list, write_input_io_object_list)
|
|
|
|
puts("read_result_array.inspect: #{read_result_array.inspect}")
|
|
return read_result_array, write_result_array
|
|
end
|
|
end
|
|
# Reactor pattern end
|
|
|
|
debug =false
|
|
if debug
|
|
Thread.current["name"] = "main"
|
|
Thread.new{
|
|
# for debugging puposes
|
|
loop do
|
|
Thread.current["name"] = "ThreadList"
|
|
Thread.list.each do |thread|
|
|
puts("Threads: #{thread.inspect} : #{thread[:name]} : #{thread.status()}")
|
|
end
|
|
end
|
|
}
|
|
end
|
|
# create newline echo on Port 10001
|
|
puts("Starting")
|
|
|
|
echo_server_task = EchoServerTask.new(20) #create synchronized queue with 20 elements
|
|
echo_server_task.activate(5)# create thread pool of 5 Tasks
|
|
|
|
echo_acceptor_10001_obj = EchoAcceptor.new(10001, echo_server_task)
|
|
EchoReactor.instance.register_handler(echo_acceptor_10001_obj, ACCEPT_EVENT)
|
|
|
|
|
|
# start events
|
|
EchoReactor.instance.handle_events()
|