Feature #8073 ยป rinda.multicast.3.patch
lib/rinda/ring.rb (working copy) | ||
---|---|---|
4 | 4 |
require 'drb/drb' |
5 | 5 |
require 'rinda/rinda' |
6 | 6 |
require 'thread' |
7 |
require 'ipaddr' |
|
7 | 8 | |
8 | 9 |
module Rinda |
9 | 10 | |
... | ... | |
27 | 28 |
include DRbUndumped |
28 | 29 | |
29 | 30 |
## |
31 |
# Special renewer for the RingServer to allow shutdown |
|
32 | ||
33 |
class Renewer # :nodoc: |
|
34 |
include DRbUndumped |
|
35 | ||
36 |
## |
|
37 |
# Set to false to shutdown future requests using this Renewer |
|
38 | ||
39 |
attr_accessor :renew |
|
40 | ||
41 |
def initialize # :nodoc: |
|
42 |
@renew = true |
|
43 |
end |
|
44 | ||
45 |
def renew # :nodoc: |
|
46 |
@renew ? 1 : true |
|
47 |
end |
|
48 |
end |
|
49 | ||
50 |
## |
|
30 | 51 |
# Advertises +ts+ on the UDP broadcast address at +port+. |
31 | 52 | |
32 |
def initialize(ts, port=Ring_PORT) |
|
53 |
def initialize(ts, addresses=[Socket::INADDR_ANY], port=Ring_PORT) |
|
54 |
@port = port |
|
55 | ||
56 |
if Integer === addresses then |
|
57 |
addresses, @port = [Socket::INADDR_ANY], addresses |
|
58 |
end |
|
59 | ||
60 |
@renewer = Renewer.new |
|
61 | ||
33 | 62 |
@ts = ts |
34 |
@soc = UDPSocket.open |
|
35 |
@soc.bind('', port) |
|
36 |
@w_service = write_service |
|
37 |
@r_service = reply_service |
|
63 |
@sockets = addresses.map do |address| |
|
64 |
make_socket(address) |
|
65 |
end |
|
66 | ||
67 |
@w_services = write_services |
|
68 |
@r_service = reply_service |
|
38 | 69 |
end |
39 | 70 | |
40 | 71 |
## |
41 |
# Creates a thread that picks up UDP packets and passes them to do_write |
|
42 |
# for decoding. |
|
72 |
# Creates a socket at +address+ |
|
43 | 73 | |
44 |
def write_service |
|
45 |
Thread.new do |
|
46 |
loop do |
|
47 |
msg = @soc.recv(1024) |
|
48 |
do_write(msg) |
|
74 |
def make_socket(address) |
|
75 |
addrinfo = Addrinfo.udp(address, @port) |
|
76 | ||
77 |
socket = Socket.new(addrinfo.pfamily, addrinfo.socktype, |
|
78 |
addrinfo.protocol) |
|
79 | ||
80 |
if addrinfo.ipv4_multicast? or addrinfo.ipv6_multicast? then |
|
81 |
if Socket.const_defined?(:SO_REUSEPORT) then |
|
82 |
socket.setsockopt(:SOCKET, :SO_REUSEPORT, true) |
|
83 |
else |
|
84 |
socket.setsockopt(:SOCKET, :SO_REUSEADDR, true) |
|
85 |
end |
|
86 | ||
87 |
if addrinfo.ipv4_multicast? then |
|
88 |
mreq = IPAddr.new(addrinfo.ip_address).hton + |
|
89 |
IPAddr.new('0.0.0.0').hton |
|
90 | ||
91 |
socket.setsockopt(:IPPROTO_IP, :IP_ADD_MEMBERSHIP, mreq) |
|
92 |
else |
|
93 |
mreq = IPAddr.new(addrinfo.ip_address).hton + [0].pack('I') |
|
94 | ||
95 |
socket.setsockopt(:IPPROTO_IPV6, :IPV6_JOIN_GROUP, mreq) |
|
96 |
end |
|
97 |
end |
|
98 | ||
99 |
socket.bind(addrinfo) |
|
100 | ||
101 |
socket |
|
102 |
end |
|
103 | ||
104 |
## |
|
105 |
# Creates threads that pick up UDP packets and passes them to do_write for |
|
106 |
# decoding. |
|
107 | ||
108 |
def write_services |
|
109 |
@sockets.map do |s| |
|
110 |
Thread.new(s) do |socket| |
|
111 |
loop do |
|
112 |
msg = socket.recv(1024) |
|
113 |
do_write(msg) |
|
114 |
end |
|
49 | 115 |
end |
50 | 116 |
end |
51 | 117 |
end |
... | ... | |
69 | 135 | |
70 | 136 |
def reply_service |
71 | 137 |
Thread.new do |
72 |
loop do |
|
138 |
thread = Thread.current |
|
139 |
thread[:continue] = true |
|
140 | ||
141 |
while thread[:continue] do |
|
73 | 142 |
do_reply |
74 | 143 |
end |
75 | 144 |
end |
... | ... | |
80 | 149 |
# address of the local TupleSpace. |
81 | 150 | |
82 | 151 |
def do_reply |
83 |
tuple = @ts.take([:lookup_ring, DRbObject]) |
|
152 |
tuple = @ts.take([:lookup_ring, DRbObject], @renewer)
|
|
84 | 153 |
Thread.new { tuple[1].call(@ts) rescue nil} |
85 | 154 |
rescue |
86 | 155 |
end |
87 | 156 | |
157 |
## |
|
158 |
# Shuts down the RingServer |
|
159 | ||
160 |
def shutdown |
|
161 |
@renewer.renew = false |
|
162 | ||
163 |
@w_services.each do |thread| |
|
164 |
thread.kill |
|
165 |
end |
|
166 | ||
167 |
@sockets.each do |socket| |
|
168 |
socket.close |
|
169 |
end |
|
170 | ||
171 |
@r_service[:continue] = false |
|
172 |
end |
|
173 | ||
88 | 174 |
end |
89 | 175 | |
90 | 176 |
## |
... | ... | |
131 | 217 |
attr_accessor :broadcast_list |
132 | 218 | |
133 | 219 |
## |
220 |
# Maximum number of hops for sent multicast packets (if using a multicast |
|
221 |
# address in the broadcast list). The default is 1 (same as UDP |
|
222 |
# broadcast). |
|
223 | ||
224 |
attr_accessor :multicast_hops |
|
225 | ||
226 |
## |
|
227 |
# The interface index to send IPv6 multicast packets from. |
|
228 | ||
229 |
attr_accessor :multicast_interface |
|
230 | ||
231 |
## |
|
134 | 232 |
# The port that RingFinger will send query packets to. |
135 | 233 | |
136 | 234 |
attr_accessor :port |
... | ... | |
149 | 247 |
@port = port |
150 | 248 |
@primary = nil |
151 | 249 |
@rings = [] |
250 | ||
251 |
@multicast_hops = 1 |
|
252 |
@multicast_interface = 0 |
|
152 | 253 |
end |
153 | 254 | |
154 | 255 |
## |
... | ... | |
178 | 279 | |
179 | 280 |
msg = Marshal.dump([[:lookup_ring, DRbObject.new(block)], timeout]) |
180 | 281 |
@broadcast_list.each do |it| |
181 |
soc = UDPSocket.open |
|
182 |
begin |
|
183 |
soc.setsockopt(Socket::SOL_SOCKET, Socket::SO_BROADCAST, true) |
|
184 |
soc.send(msg, 0, it, @port) |
|
185 |
rescue |
|
186 |
nil |
|
187 |
ensure |
|
188 |
soc.close |
|
189 |
end |
|
282 |
send_message(it, msg) |
|
190 | 283 |
end |
191 | 284 |
sleep(timeout) |
192 | 285 |
end |
... | ... | |
217 | 310 |
@primary |
218 | 311 |
end |
219 | 312 | |
313 |
## |
|
314 |
# Creates a socket for +address+ with the appropriate multicast options |
|
315 |
# for multicast addresses. |
|
316 | ||
317 |
def make_socket(address) # :nodoc: |
|
318 |
addrinfo = Addrinfo.udp(address, @port) |
|
319 | ||
320 |
soc = Socket.new(addrinfo.pfamily, addrinfo.socktype, addrinfo.protocol) |
|
321 | ||
322 |
if addrinfo.ipv4_multicast? then |
|
323 |
soc.setsockopt(:IPPROTO_IP, :IP_MULTICAST_LOOP, true) |
|
324 |
soc.setsockopt(:IPPROTO_IP, :IP_MULTICAST_TTL, |
|
325 |
[@multicast_hops].pack('c')) |
|
326 |
elsif addrinfo.ipv6_multicast? then |
|
327 |
soc.setsockopt(:IPPROTO_IPV6, :IPV6_MULTICAST_LOOP, true) |
|
328 |
soc.setsockopt(:IPPROTO_IPV6, :IPV6_MULTICAST_HOPS, |
|
329 |
[@multicast_hops].pack('I')) |
|
330 |
soc.setsockopt(:IPPROTO_IPV6, :IPV6_MULTICAST_IF, |
|
331 |
[@multicast_interface].pack('I')) |
|
332 |
else |
|
333 |
soc.setsockopt(:SOL_SOCKET, :SO_BROADCAST, true) |
|
334 |
end |
|
335 | ||
336 |
soc.connect(addrinfo) |
|
337 | ||
338 |
soc |
|
339 |
end |
|
340 | ||
341 |
def send_message(address, message) # :nodoc: |
|
342 |
soc = make_socket(address) |
|
343 | ||
344 |
soc.send(message, 0) |
|
345 |
rescue |
|
346 |
nil |
|
347 |
ensure |
|
348 |
soc.close if soc |
|
349 |
end |
|
350 | ||
220 | 351 |
end |
221 | 352 | |
222 | 353 |
## |
test/rinda/test_rinda.rb (working copy) | ||
---|---|---|
2 | 2 | |
3 | 3 |
require 'drb/drb' |
4 | 4 |
require 'drb/eq' |
5 |
require 'rinda/ring' |
|
5 | 6 |
require 'rinda/tuplespace' |
6 | 7 | |
7 | 8 |
require 'singleton' |
... | ... | |
480 | 481 |
@server = DRb.primary_server || DRb.start_service |
481 | 482 |
end |
482 | 483 | |
484 |
class TestRingServer < Test::Unit::TestCase |
|
485 | ||
486 |
def setup |
|
487 |
@port = Rinda::Ring_PORT |
|
488 | ||
489 |
@ts = Rinda::TupleSpace.new |
|
490 |
@rs = Rinda::RingServer.new(@ts, [], @port) |
|
491 |
end |
|
492 | ||
493 |
def teardown |
|
494 |
@rs.shutdown |
|
495 |
end |
|
496 | ||
497 |
def test_make_socket_unicast |
|
498 |
v4 = @rs.make_socket('127.0.0.1') |
|
499 | ||
500 |
assert_equal('127.0.0.1', v4.local_address.ip_address) |
|
501 |
assert_equal(@port, v4.local_address.ip_port) |
|
502 |
end |
|
503 | ||
504 |
def test_make_socket_ipv4_multicast |
|
505 |
v4mc = @rs.make_socket('239.0.0.1') |
|
506 | ||
507 |
if Socket.const_defined?(:SO_REUSEPORT) then |
|
508 |
assert(v4mc.getsockopt(:SOCKET, :SO_REUSEPORT).bool) |
|
509 |
else |
|
510 |
assert(v4mc.getsockopt(:SOCKET, :SO_REUSEADDR).bool) |
|
511 |
end |
|
512 | ||
513 |
assert_equal('239.0.0.1', v4mc.local_address.ip_address) |
|
514 |
assert_equal(@port, v4mc.local_address.ip_port) |
|
515 |
end |
|
516 | ||
517 |
def test_make_socket_ipv6_multicast |
|
518 |
begin |
|
519 |
v6mc = @rs.make_socket('ff02::1') |
|
520 |
rescue Errno::EADDRNOTAVAIL |
|
521 |
return # IPv6 address for multicast not available |
|
522 |
end |
|
523 | ||
524 |
if Socket.const_defined?(:SO_REUSEPORT) then |
|
525 |
assert v6mc.getsockopt(:SOCKET, :SO_REUSEPORT).bool |
|
526 |
else |
|
527 |
assert v6mc.getsockopt(:SOCKET, :SO_REUSEADDR).bool |
|
528 |
end |
|
529 | ||
530 |
assert_equal('ff02::1', v6mc.local_address.ip_address) |
|
531 |
assert_equal(@port, v6mc.local_address.ip_port) |
|
532 |
end |
|
533 | ||
534 |
def test_shutdown |
|
535 |
@rs.shutdown |
|
536 | ||
537 |
assert_nil(@rs.do_reply, 'otherwise should hang forever') |
|
538 |
end |
|
539 | ||
540 |
end |
|
541 | ||
542 |
class TestRingFinger < Test::Unit::TestCase |
|
543 | ||
544 |
def setup |
|
545 |
@rf = Rinda::RingFinger.new |
|
546 |
@rf.multicast_interface = 1 |
|
547 |
end |
|
548 | ||
549 |
def test_make_socket_unicast |
|
550 |
v4 = @rf.make_socket('127.0.0.1') |
|
551 | ||
552 |
assert(v4.getsockopt(:SOL_SOCKET, :SO_BROADCAST).bool) |
|
553 |
end |
|
554 | ||
555 |
def test_make_socket_ipv4_multicast |
|
556 |
v4mc = @rf.make_socket('239.0.0.1') |
|
557 | ||
558 |
assert_equal(1, v4mc.getsockopt(:IPPROTO_IP, :IP_MULTICAST_LOOP).int) |
|
559 |
assert_equal(1, v4mc.getsockopt(:IPPROTO_IP, :IP_MULTICAST_TTL).int) |
|
560 |
end |
|
561 | ||
562 |
def test_make_socket_ipv6_multicast |
|
563 |
v6mc = @rf.make_socket('ff02::1') |
|
564 | ||
565 |
assert_equal(1, v6mc.getsockopt(:IPPROTO_IPV6, :IPV6_MULTICAST_LOOP).int) |
|
566 |
assert_equal(1, v6mc.getsockopt(:IPPROTO_IPV6, :IPV6_MULTICAST_HOPS).int) |
|
567 |
end |
|
568 | ||
569 |
def test_make_socket_multicast_hops |
|
570 |
@rf.multicast_hops = 2 |
|
571 | ||
572 |
v4mc = @rf.make_socket('239.0.0.1') |
|
573 | ||
574 |
assert_equal(2, v4mc.getsockopt(:IPPROTO_IP, :IP_MULTICAST_TTL).int) |
|
575 | ||
576 |
v6mc = @rf.make_socket('ff02::1') |
|
577 | ||
578 |
assert_equal(2, v6mc.getsockopt(:IPPROTO_IPV6, :IPV6_MULTICAST_HOPS).int) |
|
579 |
end |
|
580 | ||
581 |
end |
|
582 | ||
483 | 583 |
end |
484 | 584 |