Index: lib/net/http.rb =================================================================== --- lib/net/http.rb (revision 33476) +++ lib/net/http.rb (working copy) @@ -283,6 +283,38 @@ module Net #:nodoc: # See Net::HTTP::Proxy for further details and examples such as proxies that # require a username and password. # + # === Pipelining + # + # On HTTP/1.1 servers requests can be pipelined which can reduce response + # time for a series of requests. When pipelining Net::HTTP sends requests + # in the list without waiting for a response then retrieves the responses + # from the server after all requests are issued. The server will return the + # responses in the same order they were issued. + # + # Example: + # + # requests = [] + # requests << Net::HTTP::Get.new('/images/bug.png') + # requests << Net::HTTP::Get.new('/images/date.png') + # requests << Net::HTTP::Get.new('/images/find.png') + # + # http = Net::HTTP.start 'localhost' do + # http.pipeline requests do |req, res| + # # ... + # end + # end + # + # Before attempting to pipeline a sequence of requests, Net::HTTP will + # consume one request to check if the server is pipelining-capable. To avoid + # the check set #pipelining to true: + # + # http = Net::HTTP.start 'localhost' do + # http.pipelining = true + # responses = http.pipeline requests + # # ... + # + # For more details see Net::HTTP#pipeline + # # == HTTP Request Classes # # Here is the HTTP request class hierarchy. @@ -372,6 +404,80 @@ module Net #:nodoc: end # :startdoc: + class Error < StandardError; end + + class PipelineError < Error + + ## + # Remaining requests that have not been sent to the HTTP server + + attr_reader :requests + + ## + # Retrieved responses up to the error point + + attr_reader :responses + + ## + # Creates a new Error with +message+, a list of +requests+ that have not + # been sent to the server and a list of +responses+ that have been + # retrieved from the server. + + def initialize message, requests, responses + super message + + @requests = requests + @responses = responses + end + + end + + # Raised when the server appears to not support persistent connections + # which are required for pipelining requests. + + class PipelinePersistenceError < PipelineError + # Creates a new PipelinePersistenceError with a list of +requests+ that + # have not been sent to the server and a list of +responses+ that have + # been retrieved from the server. + + def initialize requests, responses + super 'persistent connections required', requests, responses + end + end + + # Raised when the server appears to not support pipelining requests + + class PipelineUnsupportedError < PipelineError + # Creates a new PipelineUnsupportedError with a list of +requests+ that + # have not been sent to the server and a list of +responses+ that have + # been retrieved from the server. + + def initialize reason = nil, requests, responses + message = 'pipeline connections are not supported' + message << " (#{reason})" if reason + super message, requests, responses + end + end + + # Raised if an error occurs while reading responses. + + class PipelineResponseError < PipelineError + + # The original exception + + attr_accessor :original + + # Creates a new PipelineResponseError with an original +exception+, a + # list of +requests+ that were in-flight and a list of +responses+ that + # have been retrieved from the server. + + def initialize exception, requests, responses + @original = exception + message = "error reading responses: #{original} (#{original.class})" + super message, requests, responses + end + end + # Turns on net/http 1.2 (ruby 1.8) features. # Defaults to ON in ruby 1.8 or later. def HTTP.version_1_2 @@ -576,6 +682,7 @@ module Net #:nodoc: @address = address @port = (port || HTTP.default_port) @curr_http_version = HTTPVersion + @pipelining = nil # nil means unknown @no_keepalive_server = false @close_on_empty_response = false @socket = nil @@ -589,6 +696,8 @@ module Net #:nodoc: @enable_post_connection_check = true @compression = nil @sspi_enabled = false + @socket_class = BufferedIO + if defined?(SSL_ATTRIBUTES) SSL_ATTRIBUTES.each do |name| instance_variable_set "@#{name}", nil @@ -657,6 +766,11 @@ module Net #:nodoc: attr_accessor :close_on_empty_response + # Set to true if this server supports pipelining, false if it does not. + # If unset, #pipeline will figure it out + + attr_accessor :pipelining + # Returns true if SSL/TLS is being used with HTTP. def use_ssl? @use_ssl @@ -776,7 +890,7 @@ module Net #:nodoc: s = OpenSSL::SSL::SSLSocket.new(s, @ssl_context) s.sync_close = true end - @socket = BufferedIO.new(s) + @socket = @socket_class.new(s) @socket.read_timeout = @read_timeout @socket.continue_timeout = @continue_timeout @socket.debug_output = @debug_output @@ -828,6 +942,24 @@ module Net #:nodoc: end private :do_finish + # Closes the connection and rescues any IOErrors this may cause + + def reset_pipeline requests, responses + begin + finish + rescue IOError + end + + start + rescue Errno::ECONNREFUSED + raise PipelineError.new("connection refused: #{address}:#{port}", + requests, responses) + rescue Errno::EHOSTDOWN + raise PipelineError.new("host down: #{address}:#{port}", + requests, responses) + end + private :reset_pipeline + # # proxy # @@ -1298,6 +1430,73 @@ module Net #:nodoc: res end + # Pipelines +requests+ to the HTTP server yielding responses if a block is + # given. Returns all responses received. + # + # Only idempotent sequences of requests will be pipelined. If a + # non-idempotent request (like a POST) is included in a request sequence + # #pipeline will wait for a response before proceeding with further + # sequences of requests. + # + # The Net::HTTP connection must be started (#start must be called) before + # calling #pipeline. + # + # Raises a subclass of the PipelineError exception if the connection is + # not pipeline-capable, if the HTTP session has not been started, or their + # was a problem sending or receiving the +requests+. The remaining + # outstanding requests and returned responses can be retrieved from the + # PipelineError exception. + # + # Example: + # + # requests = [] + # requests << Net::HTTP::Get.new('/images/bug.png') + # requests << Net::HTTP::Get.new('/images/date.png') + # requests << Net::HTTP::Get.new('/images/find.png') + # + # http = Net::HTTP.new 'localhost' + # http.start do + # http.pipeline requests do |req, res| + # open File.basename(req.path), 'wb' do |io| + # io.write res.body + # end + # end + # end + + def pipeline requests, &block # :yields: response + requests = requests.dup + responses = [] + + raise PipelineError.new('Net::HTTP not started', requests, responses) unless + started? + + pipeline_check requests, responses, &block + + retried = responses.length + + until requests.empty? do + begin + in_flight = pipeline_send requests + + pipeline_receive in_flight, responses, &block + rescue PipelineResponseError => e + e.requests.reverse_each do |request| + requests.unshift request + end + + raise if responses.length == retried or not requests.first.idempotent? + + retried = responses.length + + reset_pipeline requests, responses + + retry + end + end + + responses + end + private # Executes a request which uses a representation @@ -1391,6 +1590,132 @@ module Net #:nodoc: raise HTTPAuthenticationError.new('HTTP authentication failed', err) end + # Ensures this connection supports pipelining. + # + # If the server has not been tested for pipelining support one of the + # +requests+ will be consumed and placed in +responses+. + # + # A PersistenceError will be raised if the server does not support + # persistent connections. (The server is HTTP/1.1, but closed the + # connection.) + # + # A PipelineUnsupportedError will be raised if the server does not support + # pipelining. (The server is not HTTP/1.1.) + + def pipeline_check requests, responses + unless @pipelining.nil? then # tri-state + return if @pipelining + raise PipelineUnsupportedError.new(requests, responses) unless + @pipelining + else + @pipelining = false + end + + if '1.1' > @curr_http_version then + @pipelining = false + reason = "server is HTTP/#{@curr_http_version}" + raise PipelineUnsupportedError.new(reason, requests, responses) + end + + req = requests.shift + tried_once = false + + begin + res = request(req) + rescue Timeout::Error, EOFError, Errno::ECONNABORTED, Errno::ECONNRESET, + Errno::EPIPE, Net::HTTPBadResponse => e + if tried_once then + requests.unshift req + raise PipelineResponseError.new(e, requests, responses) + end + + tried_once = true + + reset_pipeline requests, responses + + retry + end + + responses << res + + yield req, res if block_given? + + @pipelining = keep_alive? req, res + + if '1.1' > @curr_http_version then + @pipelining = false + reason = "server is HTTP/#{@curr_http_version}" + raise PipelineUnsupportedError.new(reason, requests, responses) + elsif not @pipelining then + raise PipelinePersistenceError.new(requests, responses) + end + + @close_on_empty_response = false + end + + # Receives HTTP responses for the +in_flight+ requests and adds them to + # +responses+ + + def pipeline_receive in_flight, responses + while req = in_flight.shift do + begin + begin + res = Net::HTTPResponse.read_new @socket + end while res.kind_of? Net::HTTPContinue + + res.reading_body @socket, req.response_body_permitted? do + responses << res + yield req, res if block_given? + end + + end_transport req, res + rescue StandardError, Timeout::Error + in_flight.unshift req + raise + end + end + + responses + rescue Timeout::Error, EOFError, Errno::ECONNABORTED, Errno::ECONNRESET, + Errno::EPIPE, Net::HTTPBadResponse => e + finish + + raise PipelineResponseError.new(e, in_flight, responses) + end + + # Sends +requests+ to the HTTP server and removes them from the +requests+ + # list. Returns the requests that have been pipelined and are in-flight. + # + # If a non-idempotent request is first in +requests+ it will be sent and no + # further requests will be pipelined. + # + # If a non-idempotent request is encountered after an idempotent request it + # will not be sent. + # + # After the response for a non-idempotent request is received another + # series of pipelined requests will be issued via #pipeline. + + def pipeline_send requests + in_flight = [] + + while req = requests.shift do + idempotent = req.idempotent? + + unless idempotent or in_flight.empty? then + requests.unshift req + break + end + + begin_transport req + req.exec @socket, @curr_http_version, edit_path(req.path) + in_flight << req + + break unless idempotent + end + + in_flight + end + # # utils # @@ -1874,6 +2199,17 @@ module Net #:nodoc: "\#<#{self.class} #{@method}>" end + ## + # Is this request idempotent according to RFC 2616? + + def idempotent? + case self + when Net::HTTP::Delete, Net::HTTP::Get, Net::HTTP::Head, + Net::HTTP::Options, Net::HTTP::Put, Net::HTTP::Trace then + true + end + end + def request_body_permitted? @request_has_body end Index: test/net/http/test_http.rb =================================================================== --- test/net/http/test_http.rb (revision 33476) +++ test/net/http/test_http.rb (working copy) @@ -211,7 +211,6 @@ module TestNetHTTP_version_1_1_methods end end - module TestNetHTTP_version_1_2_methods def test_request @@ -553,3 +552,19 @@ class TestNetHTTPContinue < Test::Unit:: assert_not_match(/HTTP\/1.1 100 continue/, @debug.string) end end + +class TestNetHttp < Test::Unit::TestCase + + def test_idempotent_eh + assert Net::HTTP::Delete.new('/').idempotent? + assert Net::HTTP::Get.new('/').idempotent? + assert Net::HTTP::Head.new('/').idempotent? + assert Net::HTTP::Options.new('/').idempotent? + assert Net::HTTP::Put.new('/').idempotent? + assert Net::HTTP::Trace.new('/').idempotent? + + refute Net::HTTP::Post.new('/').idempotent? + end + +end + Index: test/net/http/utils.rb =================================================================== --- test/net/http/utils.rb (revision 33476) +++ test/net/http/utils.rb (working copy) @@ -6,7 +6,124 @@ rescue LoadError end require 'webrick/httpservlet/abstract' +class Net::HTTP + attr_accessor :socket_class +end + module TestNetHTTPUtils + ## + # Records the data Net::BufferedIO sends and receives in write_io and read_io + + class TestIO < Net::BufferedIO + + attr_reader :read_io + attr_reader :write_io + + def self.ios + @ios + end + + def self.new(io) + io = super + @ios << io + io + end + + def self.reset + @ios = [] + end + + reset + + def initialize(io) + super + + @read_io = StringIO.new + @write_io = StringIO.new + end + + def rbuf_consume(len) + s = super len + @read_io << s + s + end + + def write0(str) + @write_io << str + + super + end + + end + + ## + # Raises Errno::ECONNRESET for every request + + class ErrorAlways < TestIO + + def readline + raise Errno::ECONNRESET + end + + end + + ## + # Raises Errno::ECONNRESET for every request after the first + + class ErrorAfterOne < TestIO + + def self.reset + @@count = 0 + super + end + + def readline + @@count += 1 + raise Errno::ECONNRESET if @@count >= 2 + + super + end + + end + + ## + # Raises Errno::ECONNRESET upon the second request + + class ErrorAfterOneOnce < TestIO + + def self.reset + @@count = 0 + super + end + + def readline + @@count += 1 + raise Errno::ECONNRESET if @@count == 2 + + super + end + + end + + ## + # Raises Errno::ECONNRESET upon every even request 2, 4, 6 + + class ErrorEven < TestIO + + def self.reset + @@count = 0 + super + end + + def readline + @@count += 1 + raise Errno::ECONNRESET if @@count % 2 == 0 + + super + end + + end + def start(&block) new().start(&block) end @@ -15,6 +132,7 @@ module TestNetHTTPUtils klass = Net::HTTP::Proxy(config('proxy_host'), config('proxy_port')) http = klass.new(config('host'), config('port')) http.set_debug_output logfile() + http.socket_class = @net_http_io http end @@ -27,7 +145,17 @@ module TestNetHTTPUtils end def setup + TestIO.reset + ErrorAlways.reset + ErrorAfterOne.reset + ErrorAfterOneOnce.reset + ErrorEven.reset + + BadOnce.reset + Counter.reset spawn_server + + @net_http_io = TestIO end def teardown @@ -51,6 +179,10 @@ module TestNetHTTPUtils :ServerType => Thread, } server_config[:OutputBufferSize] = 4 if config('chunked') + + server_config[:HTTPVersion] = config('http_version') if + config('http_version') + if defined?(OpenSSL) and config('ssl_enable') server_config.update({ :SSLEnable => true, @@ -59,7 +191,11 @@ module TestNetHTTPUtils }) end @server = WEBrick::HTTPServer.new(server_config) - @server.mount('/', Servlet, config('chunked')) + @server.mount('/', Servlet, config('chunked')) + @server.mount('/bad_once', BadOnce) + @server.mount('/close', Closer) + @server.mount('/count', Counter) + @server.mount('/reset_after', ResetAfter) @server.start n_try_max = 5 begin @@ -103,6 +239,89 @@ module TestNetHTTPUtils end end + class BadOnce < WEBrick::HTTPServlet::AbstractServlet + + @instance = nil + + def self.get_instance(server, *options) + @instance ||= super + end + + def self.reset + @instance = nil + end + + def initialize(server) + @count = 1 + end + + def do_GET(req, res) + def res.status_line + "bogus" + end if @count == 1 + + res.body = "Was bad. Now #{@count}" + + @count += 1 + end + + alias do_POST do_GET + + end + + class Counter < WEBrick::HTTPServlet::AbstractServlet + + @instance = nil + + def self.get_instance(server, *options) + @instance ||= super + end + + def self.reset + @instance = nil + end + + def initialize(server) + @count = 1 + end + + def do_GET(req, res) + res['Content-Type'] = 'text/plain' + res.body = "Worked #{@count}!" + @count += 1 + end + + alias do_POST do_GET + + end + + class Closer < WEBrick::HTTPServlet::AbstractServlet + + def do_GET(req, res) + res['Content-Type'] = 'text/plain' + res.body = "closing this connection" + res.close + end + + alias do_POST do_GET + + end + + class ResetAfter < WEBrick::HTTPServlet::AbstractServlet + + def do_GET(req, res) + def res._write_data(socket, data) + socket.close_read + socket << data + end + + res.body = "Reading has been shut down" + end + + alias do_POST do_GET + + end + class NullWriter def <<(s) end def puts(*args) end Index: test/net/http/test_pipeline.rb =================================================================== --- test/net/http/test_pipeline.rb (revision 0) +++ test/net/http/test_pipeline.rb (revision 0) @@ -0,0 +1,342 @@ +require 'test/unit' +require 'net/http' +require 'stringio' +require 'uri' +require_relative 'utils' + +class TestNetHttpPipeline < Test::Unit::TestCase + + CONFIG = { + 'host' => '127.0.0.1', + 'port' => 10083, + 'proxy_host' => nil, + 'proxy_port' => nil, + 'chunked' => true, + } + + include TestNetHTTPUtils + + def setup + super + + @get1 = Net::HTTP::Get.new '/count' + @get2 = Net::HTTP::Get.new '/count' + @get3 = Net::HTTP::Get.new '/count' + @post = Net::HTTP::Post.new '/count' + end + + def test_pipeline + requests = [@get1, @get2] + + responses = start do |http| + http.pipeline requests + end + + assert_equal 'Worked 1!', responses.first.body + assert_equal 'Worked 2!', responses.last.body + + refute_empty requests + end + + def test_pipeline_block + requests = [@get1, @get2] + responses = [] + + start do |http| + http.pipeline requests do |req, res| responses << [req, res] end + end + + refute_empty requests + + assert_equal requests, responses.map { |req,| req } + + responses = responses.map { |_, res| res } + assert_equal 'Worked 1!', responses.first.body + assert_equal 'Worked 2!', responses.last.body + end + + def test_pipeline_non_idempotent + responses = start do |http| + http.pipelining = true + + http.pipeline [@get1, @get2, @post, @get3] + end + + assert_equal 'Worked 1!', responses.shift.body + assert_equal 'Worked 2!', responses.shift.body + assert_equal 'Worked 3!', responses.shift.body + assert_equal 'Worked 4!', responses.shift.body + + assert responses.empty? + end + + def test_pipeline_not_started + @started = false + + e = assert_raises Net::HTTP::PipelineError do + http = new + http.pipeline [] + end + + assert_equal 'Net::HTTP not started', e.message + end + + def test_pipeline_retry + requests = [@get1, @get2, @get3] + + @net_http_io = ErrorAfterOneOnce + + responses = start do |http| + http.pipelining = true + + http.pipeline requests + end + + assert_equal 'Worked 1!', responses.shift.body + assert_equal 'Worked 3!', responses.shift.body # response 2 was lost + assert_equal 'Worked 4!', responses.shift.body + assert_empty responses + + refute_empty requests + end + + def test_pipeline_retry_fail_post + @net_http_io = ErrorAlways + + requests = [@post] + + e = assert_raises Net::HTTP::PipelineResponseError do + start do |http| + http.pipelining = true + http.pipeline requests + end + end + + assert_empty e.responses + + assert_equal [@post], e.requests + end + + def test_pipeline_retry_fail_different + @net_http_io = ErrorEven + + requests = [@get1, @get2, @get3] + + responses = start do |http| + http.pipelining = true + http.pipeline requests + end + + assert_equal 'Worked 1!', responses.shift.body + assert_equal 'Worked 3!', responses.shift.body + assert_equal 'Worked 5!', responses.shift.body + assert_empty responses + + refute_empty requests + end + + def test_pipeline_retry_fail_same + @net_http_io = ErrorAfterOne + + requests = [@get1, @get2, @get3] + + e = assert_raises Net::HTTP::PipelineResponseError do + start do |http| + http.pipelining = true + http.pipeline requests + end + end + + responses = e.responses + assert_equal 'Worked 1!', responses.shift.body + assert_empty responses + + assert_equal [@get2, @get3], e.requests + end + + # end #pipeline tests + + def test_pipeline_check + requests = [@get1, @get2] + responses = [] + + start do |http| + http.send :pipeline_check, requests, responses + + assert http.pipelining + end + + assert_equal [@get2], requests + assert_equal 1, responses.length + assert_equal 'Worked 1!', responses.first.body + end + + def test_pipeline_check_again + start do |http| + http.pipelining = false + + e = assert_raises Net::HTTP::PipelineUnsupportedError do + http.send :pipeline_check, [@get1, @get2], [] + end + + assert_equal [@get1, @get2], e.requests + assert_empty e.responses + refute http.pipelining + end + + end + + def test_pipeline_check_bad_response + bad_once = Net::HTTP::Get.new '/bad_once' + + start do |http| + requests = [bad_once, @get2] + responses = [] + + http.send :pipeline_check, requests, responses + + assert_equal [@get2], requests + assert_equal 1, responses.length + assert_equal 'Was bad. Now 2', responses.first.body + assert http.pipelining + end + end + + def test_pipeline_check_non_persistent + start do |http| + get1 = Net::HTTP::Get.new '/close' + + e = assert_raises Net::HTTP::PipelinePersistenceError do + http.send :pipeline_check, [get1, @get2], [] + end + + refute http.pipelining + assert_equal [@get2], e.requests + assert_equal 1, e.responses.length + end + end + + def test_pipeline_check_pipelining + start do |http| + http.pipelining = true + + requests = [@get1, @get2] + responses = [] + + http.send :pipeline_check, requests, responses + + assert_equal [@get1, @get2], requests + assert_empty responses + assert http.pipelining + end + end + + def test_pipeline_receive + responses = [] + + r = start do |http| + in_flight = http.send :pipeline_send, [@get1, @get2] + + assert_equal 2, in_flight.length + + http.send :pipeline_receive, in_flight, responses + end + + assert_equal 'Worked 1!', responses.first.body + assert_equal 'Worked 2!', responses.last.body + + assert_same r, responses + end + + def test_pipeline_receive_bad_response + bad_once = Net::HTTP::Get.new '/reset_after' + responses = [] + + start do |http| + in_flight = http.send :pipeline_send, [bad_once, @get2] + + e = assert_raises Net::HTTP::PipelineResponseError do + http.send :pipeline_receive, in_flight, responses + end + + assert_equal [@get2], e.requests + assert_equal 1, e.responses.length + assert_equal 'Reading has been shut down', e.responses.first.body + + assert_kind_of EOFError, e.original + end + end + + def test_pipeline_send + requests = [@get1, @get2, @post, @get3] + + in_flight = start do |http| + http.send :pipeline_send, requests + end + + assert_equal [@get1, @get2], in_flight + assert_equal [@post, @get3], requests + end + + def test_pipeline_send_non_idempotent + requests = [@post, @get3] + + in_flight = start do |http| + http.send :pipeline_send, requests + end + + assert_equal [@post], in_flight + assert_equal [@get3], requests + end + +end + +class TestNetHttpPipeline_1_0 < Test::Unit::TestCase + + CONFIG = { + 'host' => '127.0.0.1', + 'port' => 10084, + 'proxy_host' => nil, + 'proxy_port' => nil, + 'chunked' => false, + 'http_version' => '1.0' + } + + include TestNetHTTPUtils + + def setup + super + + @get1 = Net::HTTP::Get.new '/count' + @get2 = Net::HTTP::Get.new '/count' + end + + def test_pipeline_1_0 + start do |http| + http.instance_variable_set :@curr_http_version, '1.0' + + e = assert_raises Net::HTTP::PipelineUnsupportedError do + http.pipeline [@get1, @get2] + end + + assert_equal [@get1, @get2], e.requests + assert_empty e.responses + end + end + + def test_pipeline_check_1_0 + start do |http| + e = assert_raises Net::HTTP::PipelineUnsupportedError do + http.send :pipeline_check, [@get1, @get2], [] + end + + assert_match %r%server is HTTP/1\.0%, e.message + assert_equal [@get2], e.requests + assert_equal 1, e.responses.length + assert_equal 'Worked 1!', e.responses.first.body + refute http.pipelining + end + end + +end +