rinda.multicast.3.patch

Added missing () to match style - Eric Hodel, 03/13/2013 06:19 AM

Download (8.86 KB)

View differences:

lib/rinda/ring.rb (working copy)
4 4
require 'drb/drb'
5 5
require 'rinda/rinda'
6 6
require 'thread'
7
require 'ipaddr'
7 8

  
8 9
module Rinda
9 10

  
......
27 28
    include DRbUndumped
28 29

  
29 30
    ##
31
    # Special renewer for the RingServer to allow shutdown
32

  
33
    class Renewer # :nodoc:
34
      include DRbUndumped
35

  
36
      ##
37
      # Set to false to shutdown future requests using this Renewer
38

  
39
      attr_accessor :renew
40

  
41
      def initialize # :nodoc:
42
        @renew = true
43
      end
44

  
45
      def renew # :nodoc:
46
        @renew ? 1 : true
47
      end
48
    end
49

  
50
    ##
30 51
    # Advertises +ts+ on the UDP broadcast address at +port+.
31 52

  
32
    def initialize(ts, port=Ring_PORT)
53
    def initialize(ts, addresses=[Socket::INADDR_ANY], port=Ring_PORT)
54
      @port = port
55

  
56
      if Integer === addresses then
57
        addresses, @port = [Socket::INADDR_ANY], addresses
58
      end
59

  
60
      @renewer = Renewer.new
61

  
33 62
      @ts = ts
34
      @soc = UDPSocket.open
35
      @soc.bind('', port)
36
      @w_service = write_service
37
      @r_service = reply_service
63
      @sockets = addresses.map do |address|
64
        make_socket(address)
65
      end
66

  
67
      @w_services = write_services
68
      @r_service  = reply_service
38 69
    end
39 70

  
40 71
    ##
41
    # Creates a thread that picks up UDP packets and passes them to do_write
42
    # for decoding.
72
    # Creates a socket at +address+
43 73

  
44
    def write_service
45
      Thread.new do
46
        loop do
47
          msg = @soc.recv(1024)
48
          do_write(msg)
74
    def make_socket(address)
75
      addrinfo = Addrinfo.udp(address, @port)
76

  
77
      socket = Socket.new(addrinfo.pfamily, addrinfo.socktype,
78
                          addrinfo.protocol)
79

  
80
      if addrinfo.ipv4_multicast? or addrinfo.ipv6_multicast? then
81
        if Socket.const_defined?(:SO_REUSEPORT) then
82
          socket.setsockopt(:SOCKET, :SO_REUSEPORT, true)
83
        else
84
          socket.setsockopt(:SOCKET, :SO_REUSEADDR, true)
85
        end
86

  
87
        if addrinfo.ipv4_multicast? then
88
          mreq = IPAddr.new(addrinfo.ip_address).hton +
89
            IPAddr.new('0.0.0.0').hton
90

  
91
          socket.setsockopt(:IPPROTO_IP, :IP_ADD_MEMBERSHIP, mreq)
92
        else
93
          mreq = IPAddr.new(addrinfo.ip_address).hton + [0].pack('I')
94

  
95
          socket.setsockopt(:IPPROTO_IPV6, :IPV6_JOIN_GROUP, mreq)
96
        end
97
      end
98

  
99
      socket.bind(addrinfo)
100

  
101
      socket
102
    end
103

  
104
    ##
105
    # Creates threads that pick up UDP packets and passes them to do_write for
106
    # decoding.
107

  
108
    def write_services
109
      @sockets.map do |s|
110
        Thread.new(s) do |socket|
111
          loop do
112
            msg = socket.recv(1024)
113
            do_write(msg)
114
          end
49 115
        end
50 116
      end
51 117
    end
......
69 135

  
70 136
    def reply_service
71 137
      Thread.new do
72
        loop do
138
        thread = Thread.current
139
        thread[:continue] = true
140

  
141
        while thread[:continue] do
73 142
          do_reply
74 143
        end
75 144
      end
......
80 149
    # address of the local TupleSpace.
81 150

  
82 151
    def do_reply
83
      tuple = @ts.take([:lookup_ring, DRbObject])
152
      tuple = @ts.take([:lookup_ring, DRbObject], @renewer)
84 153
      Thread.new { tuple[1].call(@ts) rescue nil}
85 154
    rescue
86 155
    end
87 156

  
157
    ##
158
    # Shuts down the RingServer
159

  
160
    def shutdown
161
      @renewer.renew = false
162

  
163
      @w_services.each do |thread|
164
        thread.kill
165
      end
166

  
167
      @sockets.each do |socket|
168
        socket.close
169
      end
170

  
171
      @r_service[:continue] = false
172
    end
173

  
88 174
  end
89 175

  
90 176
  ##
......
131 217
    attr_accessor :broadcast_list
132 218

  
133 219
    ##
220
    # Maximum number of hops for sent multicast packets (if using a multicast
221
    # address in the broadcast list).  The default is 1 (same as UDP
222
    # broadcast).
223

  
224
    attr_accessor :multicast_hops
225

  
226
    ##
227
    # The interface index to send IPv6 multicast packets from.
228

  
229
    attr_accessor :multicast_interface
230

  
231
    ##
134 232
    # The port that RingFinger will send query packets to.
135 233

  
136 234
    attr_accessor :port
......
149 247
      @port = port
150 248
      @primary = nil
151 249
      @rings = []
250

  
251
      @multicast_hops = 1
252
      @multicast_interface = 0
152 253
    end
153 254

  
154 255
    ##
......
178 279

  
179 280
      msg = Marshal.dump([[:lookup_ring, DRbObject.new(block)], timeout])
180 281
      @broadcast_list.each do |it|
181
        soc = UDPSocket.open
182
        begin
183
          soc.setsockopt(Socket::SOL_SOCKET, Socket::SO_BROADCAST, true)
184
          soc.send(msg, 0, it, @port)
185
        rescue
186
          nil
187
        ensure
188
          soc.close
189
        end
282
        send_message(it, msg)
190 283
      end
191 284
      sleep(timeout)
192 285
    end
......
217 310
      @primary
218 311
    end
219 312

  
313
    ##
314
    # Creates a socket for +address+ with the appropriate multicast options
315
    # for multicast addresses.
316

  
317
    def make_socket(address) # :nodoc:
318
      addrinfo = Addrinfo.udp(address, @port)
319

  
320
      soc = Socket.new(addrinfo.pfamily, addrinfo.socktype, addrinfo.protocol)
321

  
322
      if addrinfo.ipv4_multicast? then
323
        soc.setsockopt(:IPPROTO_IP, :IP_MULTICAST_LOOP, true)
324
        soc.setsockopt(:IPPROTO_IP, :IP_MULTICAST_TTL,
325
                       [@multicast_hops].pack('c'))
326
      elsif addrinfo.ipv6_multicast? then
327
        soc.setsockopt(:IPPROTO_IPV6, :IPV6_MULTICAST_LOOP, true)
328
        soc.setsockopt(:IPPROTO_IPV6, :IPV6_MULTICAST_HOPS,
329
                       [@multicast_hops].pack('I'))
330
        soc.setsockopt(:IPPROTO_IPV6, :IPV6_MULTICAST_IF,
331
                       [@multicast_interface].pack('I'))
332
      else
333
        soc.setsockopt(:SOL_SOCKET, :SO_BROADCAST, true)
334
      end
335

  
336
      soc.connect(addrinfo)
337

  
338
      soc
339
    end
340

  
341
    def send_message(address, message) # :nodoc:
342
      soc = make_socket(address)
343

  
344
      soc.send(message, 0)
345
    rescue
346
      nil
347
    ensure
348
      soc.close if soc
349
    end
350

  
220 351
  end
221 352

  
222 353
  ##
test/rinda/test_rinda.rb (working copy)
2 2

  
3 3
require 'drb/drb'
4 4
require 'drb/eq'
5
require 'rinda/ring'
5 6
require 'rinda/tuplespace'
6 7

  
7 8
require 'singleton'
......
480 481
  @server = DRb.primary_server || DRb.start_service
481 482
end
482 483

  
484
class TestRingServer < Test::Unit::TestCase
485

  
486
  def setup
487
    @port = Rinda::Ring_PORT
488

  
489
    @ts = Rinda::TupleSpace.new
490
    @rs = Rinda::RingServer.new(@ts, [], @port)
491
  end
492

  
493
  def teardown
494
    @rs.shutdown
495
  end
496

  
497
  def test_make_socket_unicast
498
    v4 = @rs.make_socket('127.0.0.1')
499

  
500
    assert_equal('127.0.0.1', v4.local_address.ip_address)
501
    assert_equal(@port,       v4.local_address.ip_port)
502
  end
503

  
504
  def test_make_socket_ipv4_multicast
505
    v4mc = @rs.make_socket('239.0.0.1')
506

  
507
    if Socket.const_defined?(:SO_REUSEPORT) then
508
      assert(v4mc.getsockopt(:SOCKET, :SO_REUSEPORT).bool)
509
    else
510
      assert(v4mc.getsockopt(:SOCKET, :SO_REUSEADDR).bool)
511
    end
512

  
513
    assert_equal('239.0.0.1', v4mc.local_address.ip_address)
514
    assert_equal(@port,       v4mc.local_address.ip_port)
515
  end
516

  
517
  def test_make_socket_ipv6_multicast
518
    begin
519
      v6mc = @rs.make_socket('ff02::1')
520
    rescue Errno::EADDRNOTAVAIL
521
      return # IPv6 address for multicast not available
522
    end
523

  
524
    if Socket.const_defined?(:SO_REUSEPORT) then
525
      assert v6mc.getsockopt(:SOCKET, :SO_REUSEPORT).bool
526
    else
527
      assert v6mc.getsockopt(:SOCKET, :SO_REUSEADDR).bool
528
    end
529

  
530
    assert_equal('ff02::1',  v6mc.local_address.ip_address)
531
    assert_equal(@port, v6mc.local_address.ip_port)
532
  end
533

  
534
  def test_shutdown
535
    @rs.shutdown
536

  
537
    assert_nil(@rs.do_reply, 'otherwise should hang forever')
538
  end
539

  
540
end
541

  
542
class TestRingFinger < Test::Unit::TestCase
543

  
544
  def setup
545
    @rf = Rinda::RingFinger.new
546
    @rf.multicast_interface = 1
547
  end
548

  
549
  def test_make_socket_unicast
550
    v4 = @rf.make_socket('127.0.0.1')
551

  
552
    assert(v4.getsockopt(:SOL_SOCKET, :SO_BROADCAST).bool)
553
  end
554

  
555
  def test_make_socket_ipv4_multicast
556
    v4mc = @rf.make_socket('239.0.0.1')
557

  
558
    assert_equal(1, v4mc.getsockopt(:IPPROTO_IP, :IP_MULTICAST_LOOP).int)
559
    assert_equal(1, v4mc.getsockopt(:IPPROTO_IP, :IP_MULTICAST_TTL).int)
560
  end
561

  
562
  def test_make_socket_ipv6_multicast
563
    v6mc = @rf.make_socket('ff02::1')
564

  
565
    assert_equal(1, v6mc.getsockopt(:IPPROTO_IPV6, :IPV6_MULTICAST_LOOP).int)
566
    assert_equal(1, v6mc.getsockopt(:IPPROTO_IPV6, :IPV6_MULTICAST_HOPS).int)
567
  end
568

  
569
  def test_make_socket_multicast_hops
570
    @rf.multicast_hops = 2
571

  
572
    v4mc = @rf.make_socket('239.0.0.1')
573

  
574
    assert_equal(2, v4mc.getsockopt(:IPPROTO_IP, :IP_MULTICAST_TTL).int)
575

  
576
    v6mc = @rf.make_socket('ff02::1')
577

  
578
    assert_equal(2, v6mc.getsockopt(:IPPROTO_IPV6, :IPV6_MULTICAST_HOPS).int)
579
  end
580

  
581
end
582

  
483 583
end
484 584