Feature #8073 » rinda.multicast.2.patch
| lib/rinda/ring.rb (working copy) | ||
|---|---|---|
| 
     require 'drb/drb' 
   | 
||
| 
     require 'rinda/rinda' 
   | 
||
| 
     require 'thread' 
   | 
||
| 
     require 'ipaddr' 
   | 
||
| 
     module Rinda 
   | 
||
| ... | ... | |
| 
         include DRbUndumped 
   | 
||
| 
         ## 
   | 
||
| 
         # Special renewer for the RingServer to allow shutdown 
   | 
||
| 
         class Renewer # :nodoc: 
   | 
||
| 
           include DRbUndumped 
   | 
||
| 
           ## 
   | 
||
| 
           # Set to false to shutdown future requests using this Renewer 
   | 
||
| 
           attr_accessor :renew 
   | 
||
| 
           def initialize # :nodoc: 
   | 
||
| 
             @renew = true 
   | 
||
| 
           end 
   | 
||
| 
           def renew # :nodoc: 
   | 
||
| 
             @renew ? 1 : true 
   | 
||
| 
           end 
   | 
||
| 
         end 
   | 
||
| 
         ## 
   | 
||
| 
         # Advertises +ts+ on the UDP broadcast address at +port+. 
   | 
||
| 
         def initialize(ts, port=Ring_PORT) 
   | 
||
| 
         def initialize(ts, addresses=[Socket::INADDR_ANY], port=Ring_PORT) 
   | 
||
| 
           @port = port 
   | 
||
| 
           if Integer === addresses then 
   | 
||
| 
             addresses, @port = [Socket::INADDR_ANY], addresses 
   | 
||
| 
           end 
   | 
||
| 
           @renewer = Renewer.new 
   | 
||
| 
           @ts = ts 
   | 
||
| 
           @soc = UDPSocket.open 
   | 
||
| 
           @soc.bind('', port) 
   | 
||
| 
           @w_service = write_service 
   | 
||
| 
           @r_service = reply_service 
   | 
||
| 
           @sockets = addresses.map do |address| 
   | 
||
| 
             make_socket address 
   | 
||
| 
           end 
   | 
||
| 
           @w_services = write_services 
   | 
||
| 
           @r_service  = reply_service 
   | 
||
| 
         end 
   | 
||
| 
         ## 
   | 
||
| 
         # Creates a thread that picks up UDP packets and passes them to do_write 
   | 
||
| 
         # for decoding. 
   | 
||
| 
         # Creates a socket at +address+ 
   | 
||
| 
         def write_service 
   | 
||
| 
           Thread.new do 
   | 
||
| 
             loop do 
   | 
||
| 
               msg = @soc.recv(1024) 
   | 
||
| 
               do_write(msg) 
   | 
||
| 
         def make_socket address 
   | 
||
| 
           addrinfo = Addrinfo.udp address, @port 
   | 
||
| 
           socket = Socket.new(addrinfo.pfamily, addrinfo.socktype, 
   | 
||
| 
                               addrinfo.protocol) 
   | 
||
| 
           if addrinfo.ipv4_multicast? or addrinfo.ipv6_multicast? then 
   | 
||
| 
             if Socket.const_defined? :SO_REUSEPORT then 
   | 
||
| 
               socket.setsockopt(:SOCKET, :SO_REUSEPORT, true) 
   | 
||
| 
             else 
   | 
||
| 
               socket.setsockopt(:SOCKET, :SO_REUSEADDR, true) 
   | 
||
| 
             end 
   | 
||
| 
             if addrinfo.ipv4_multicast? then 
   | 
||
| 
               mreq = IPAddr.new(addrinfo.ip_address).hton + 
   | 
||
| 
                 IPAddr.new('0.0.0.0').hton 
   | 
||
| 
               socket.setsockopt(:IPPROTO_IP, :IP_ADD_MEMBERSHIP, mreq) 
   | 
||
| 
             else 
   | 
||
| 
               mreq = IPAddr.new(addrinfo.ip_address).hton + [0].pack('I') 
   | 
||
| 
               socket.setsockopt(:IPPROTO_IPV6, :IPV6_JOIN_GROUP, mreq) 
   | 
||
| 
             end 
   | 
||
| 
           end 
   | 
||
| 
           socket.bind(addrinfo) 
   | 
||
| 
           socket 
   | 
||
| 
         end 
   | 
||
| 
         ## 
   | 
||
| 
         # Creates threads that pick up UDP packets and passes them to do_write for 
   | 
||
| 
         # decoding. 
   | 
||
| 
         def write_services 
   | 
||
| 
           @sockets.map do |s| 
   | 
||
| 
             Thread.new(s) do |socket| 
   | 
||
| 
               loop do 
   | 
||
| 
                 msg = socket.recv(1024) 
   | 
||
| 
                 do_write(msg) 
   | 
||
| 
               end 
   | 
||
| 
             end 
   | 
||
| 
           end 
   | 
||
| 
         end 
   | 
||
| ... | ... | |
| 
         def reply_service 
   | 
||
| 
           Thread.new do 
   | 
||
| 
             loop do 
   | 
||
| 
             thread = Thread.current 
   | 
||
| 
             thread[:continue] = true 
   | 
||
| 
             while thread[:continue] do 
   | 
||
| 
               do_reply 
   | 
||
| 
             end 
   | 
||
| 
           end 
   | 
||
| ... | ... | |
| 
         # address of the local TupleSpace. 
   | 
||
| 
         def do_reply 
   | 
||
| 
           tuple = @ts.take([:lookup_ring, DRbObject]) 
   | 
||
| 
           tuple = @ts.take([:lookup_ring, DRbObject], @renewer) 
   | 
||
| 
           Thread.new { tuple[1].call(@ts) rescue nil} 
   | 
||
| 
         rescue 
   | 
||
| 
         end 
   | 
||
| 
         ## 
   | 
||
| 
         # Shuts down the RingServer 
   | 
||
| 
         def shutdown 
   | 
||
| 
           @renewer.renew = false 
   | 
||
| 
           @w_services.each do |thread| 
   | 
||
| 
             thread.kill 
   | 
||
| 
           end 
   | 
||
| 
           @sockets.each do |socket| 
   | 
||
| 
             socket.close 
   | 
||
| 
           end 
   | 
||
| 
           @r_service[:continue] = false 
   | 
||
| 
         end 
   | 
||
| 
       end 
   | 
||
| 
       ## 
   | 
||
| ... | ... | |
| 
         attr_accessor :broadcast_list 
   | 
||
| 
         ## 
   | 
||
| 
         # Maximum number of hops for sent multicast packets (if using a multicast 
   | 
||
| 
         # address in the broadcast list).  The default is 1 (same as UDP 
   | 
||
| 
         # broadcast). 
   | 
||
| 
         attr_accessor :multicast_hops 
   | 
||
| 
         ## 
   | 
||
| 
         # The interface index to send IPv6 multicast packets from. 
   | 
||
| 
         attr_accessor :multicast_interface 
   | 
||
| 
         ## 
   | 
||
| 
         # The port that RingFinger will send query packets to. 
   | 
||
| 
         attr_accessor :port 
   | 
||
| ... | ... | |
| 
           @port = port 
   | 
||
| 
           @primary = nil 
   | 
||
| 
           @rings = [] 
   | 
||
| 
           @multicast_hops = 1 
   | 
||
| 
           @multicast_interface = 0 
   | 
||
| 
         end 
   | 
||
| 
         ## 
   | 
||
| ... | ... | |
| 
           msg = Marshal.dump([[:lookup_ring, DRbObject.new(block)], timeout]) 
   | 
||
| 
           @broadcast_list.each do |it| 
   | 
||
| 
             soc = UDPSocket.open 
   | 
||
| 
             begin 
   | 
||
| 
               soc.setsockopt(Socket::SOL_SOCKET, Socket::SO_BROADCAST, true) 
   | 
||
| 
               soc.send(msg, 0, it, @port) 
   | 
||
| 
             rescue 
   | 
||
| 
               nil 
   | 
||
| 
             ensure 
   | 
||
| 
               soc.close 
   | 
||
| 
             end 
   | 
||
| 
             send_message it, msg 
   | 
||
| 
           end 
   | 
||
| 
           sleep(timeout) 
   | 
||
| 
         end 
   | 
||
| ... | ... | |
| 
           @primary 
   | 
||
| 
         end 
   | 
||
| 
         ## 
   | 
||
| 
         # Creates a socket for +address+ with the appropriate multicast options 
   | 
||
| 
         # for multicast addresses. 
   | 
||
| 
         def make_socket address # :nodoc: 
   | 
||
| 
           addrinfo = Addrinfo.udp address, @port 
   | 
||
| 
           soc = Socket.new(addrinfo.pfamily, addrinfo.socktype, addrinfo.protocol) 
   | 
||
| 
           if addrinfo.ipv4_multicast? then 
   | 
||
| 
             soc.setsockopt(:IPPROTO_IP, :IP_MULTICAST_LOOP, true) 
   | 
||
| 
             soc.setsockopt(:IPPROTO_IP, :IP_MULTICAST_TTL, 
   | 
||
| 
                            [@multicast_hops].pack('c')) 
   | 
||
| 
           elsif addrinfo.ipv6_multicast? then 
   | 
||
| 
             soc.setsockopt(:IPPROTO_IPV6, :IPV6_MULTICAST_LOOP, true) 
   | 
||
| 
             soc.setsockopt(:IPPROTO_IPV6, :IPV6_MULTICAST_HOPS, 
   | 
||
| 
                            [@multicast_hops].pack('I')) 
   | 
||
| 
             soc.setsockopt(:IPPROTO_IPV6, :IPV6_MULTICAST_IF, 
   | 
||
| 
                            [@multicast_interface].pack('I')) 
   | 
||
| 
           else 
   | 
||
| 
             soc.setsockopt(:SOL_SOCKET, :SO_BROADCAST, true) 
   | 
||
| 
           end 
   | 
||
| 
           soc.connect addrinfo 
   | 
||
| 
           soc 
   | 
||
| 
         end 
   | 
||
| 
         def send_message address, message # :nodoc: 
   | 
||
| 
           soc = make_socket(address) 
   | 
||
| 
           soc.send(message, 0) 
   | 
||
| 
         rescue 
   | 
||
| 
           nil 
   | 
||
| 
         ensure 
   | 
||
| 
           soc.close if soc 
   | 
||
| 
         end 
   | 
||
| 
       end 
   | 
||
| 
       ## 
   | 
||
| test/rinda/test_rinda.rb (working copy) | ||
|---|---|---|
| 
     require 'drb/drb' 
   | 
||
| 
     require 'drb/eq' 
   | 
||
| 
     require 'rinda/ring' 
   | 
||
| 
     require 'rinda/tuplespace' 
   | 
||
| 
     require 'singleton' 
   | 
||
| ... | ... | |
| 
       @server = DRb.primary_server || DRb.start_service 
   | 
||
| 
     end 
   | 
||
| 
     class TestRingServer < Test::Unit::TestCase 
   | 
||
| 
       def setup 
   | 
||
| 
         @port = Rinda::Ring_PORT 
   | 
||
| 
         @ts = Rinda::TupleSpace.new 
   | 
||
| 
         @rs = Rinda::RingServer.new @ts, [], @port 
   | 
||
| 
       end 
   | 
||
| 
       def teardown 
   | 
||
| 
         @rs.shutdown 
   | 
||
| 
       end 
   | 
||
| 
       def test_make_socket_unicast 
   | 
||
| 
         v4 = @rs.make_socket '127.0.0.1' 
   | 
||
| 
         assert_equal '127.0.0.1', v4.local_address.ip_address 
   | 
||
| 
         assert_equal @port,       v4.local_address.ip_port 
   | 
||
| 
       end 
   | 
||
| 
       def test_make_socket_ipv4_multicast 
   | 
||
| 
         v4mc = @rs.make_socket '239.0.0.1' 
   | 
||
| 
         if Socket.const_defined? :SO_REUSEPORT then 
   | 
||
| 
           assert v4mc.getsockopt(:SOCKET, :SO_REUSEPORT).bool 
   | 
||
| 
         else 
   | 
||
| 
           assert v4mc.getsockopt(:SOCKET, :SO_REUSEADDR).bool 
   | 
||
| 
         end 
   | 
||
| 
         assert_equal '239.0.0.1', v4mc.local_address.ip_address 
   | 
||
| 
         assert_equal @port,       v4mc.local_address.ip_port 
   | 
||
| 
       end 
   | 
||
| 
       def test_make_socket_ipv6_multicast 
   | 
||
| 
         begin 
   | 
||
| 
           v6mc = @rs.make_socket 'ff02::1' 
   | 
||
| 
         rescue Errno::EADDRNOTAVAIL 
   | 
||
| 
           return # IPv6 address for multicast not available 
   | 
||
| 
         end 
   | 
||
| 
         if Socket.const_defined? :SO_REUSEPORT then 
   | 
||
| 
           assert v6mc.getsockopt(:SOCKET, :SO_REUSEPORT).bool 
   | 
||
| 
         else 
   | 
||
| 
           assert v6mc.getsockopt(:SOCKET, :SO_REUSEADDR).bool 
   | 
||
| 
         end 
   | 
||
| 
         assert_equal 'ff02::1',  v6mc.local_address.ip_address 
   | 
||
| 
         assert_equal @port, v6mc.local_address.ip_port 
   | 
||
| 
       end 
   | 
||
| 
       def test_shutdown 
   | 
||
| 
         @rs.shutdown 
   | 
||
| 
         assert_nil @rs.do_reply, 'otherwise should hang forever' 
   | 
||
| 
       end 
   | 
||
| 
     end 
   | 
||
| 
     class TestRingFinger < Test::Unit::TestCase 
   | 
||
| 
       def setup 
   | 
||
| 
         @rf = Rinda::RingFinger.new 
   | 
||
| 
         @rf.multicast_interface = 1 
   | 
||
| 
       end 
   | 
||
| 
       def test_make_socket_unicast 
   | 
||
| 
         v4 = @rf.make_socket '127.0.0.1' 
   | 
||
| 
         assert v4.getsockopt(:SOL_SOCKET, :SO_BROADCAST).bool 
   | 
||
| 
       end 
   | 
||
| 
       def test_make_socket_ipv4_multicast 
   | 
||
| 
         v4mc = @rf.make_socket '239.0.0.1' 
   | 
||
| 
         assert_equal 1, v4mc.getsockopt(:IPPROTO_IP, :IP_MULTICAST_LOOP).int 
   | 
||
| 
         assert_equal 1, v4mc.getsockopt(:IPPROTO_IP, :IP_MULTICAST_TTL).int 
   | 
||
| 
       end 
   | 
||
| 
       def test_make_socket_ipv6_multicast 
   | 
||
| 
         v6mc = @rf.make_socket 'ff02::1' 
   | 
||
| 
         assert_equal 1, v6mc.getsockopt(:IPPROTO_IPV6, :IPV6_MULTICAST_LOOP).int 
   | 
||
| 
         assert_equal 1, v6mc.getsockopt(:IPPROTO_IPV6, :IPV6_MULTICAST_HOPS).int 
   | 
||
| 
       end 
   | 
||
| 
       def test_make_socket_multicast_hops 
   | 
||
| 
         @rf.multicast_hops = 2 
   | 
||
| 
         v4mc = @rf.make_socket '239.0.0.1' 
   | 
||
| 
         assert_equal 2, v4mc.getsockopt(:IPPROTO_IP, :IP_MULTICAST_TTL).int 
   | 
||
| 
         v6mc = @rf.make_socket 'ff02::1' 
   | 
||
| 
         assert_equal 2, v6mc.getsockopt(:IPPROTO_IPV6, :IPV6_MULTICAST_HOPS).int 
   | 
||
| 
       end 
   | 
||
| 
     end 
   | 
||
| 
     end 
   | 
||