diff --git a/lib/dalli/pipelined_getter.rb b/lib/dalli/pipelined_getter.rb index cc34bc5c..c2d1a895 100644 --- a/lib/dalli/pipelined_getter.rb +++ b/lib/dalli/pipelined_getter.rb @@ -17,9 +17,8 @@ def process(keys, &block) return {} if keys.empty? @ring.lock do - servers = setup_requests(keys) - start_time = Time.now - servers = fetch_responses(servers, start_time, @ring.socket_timeout, &block) until servers.empty? + requests = setup_requests(keys) + fetch_responses(requests, @ring.socket_timeout, &block) end rescue NetworkError => e Dalli.logger.debug { e.inspect } @@ -27,58 +26,17 @@ def process(keys, &block) retry end - def setup_requests(keys) - groups = groups_for_keys(keys) - make_getkq_requests(groups) - - # TODO: How does this exit on a NetworkError - finish_queries(groups.keys) - end - - ## - # Loop through the server-grouped sets of keys, writing - # the corresponding getkq requests to the appropriate servers - # - # It's worth noting that we could potentially reduce bytes - # on the wire by switching from getkq to getq, and using - # the opaque value to match requests to responses. - ## - def make_getkq_requests(groups) - groups.each do |server, keys_for_server| - server.request(:pipelined_get, keys_for_server) - rescue DalliError, NetworkError => e - Dalli.logger.debug { e.inspect } - Dalli.logger.debug { "unable to get keys for server #{server.name}" } - end - end - - ## - # This loops through the servers that have keys in - # our set, sending the noop to terminate the set of queries. - ## - def finish_queries(servers) - deleted = [] - - servers.each do |server| - next unless server.alive? - - begin - finish_query_for_server(server) - rescue Dalli::NetworkError - raise - rescue Dalli::DalliError - deleted.append(server) - end + def setup_requests(all_keys) + groups_for_keys(all_keys).to_h do |server, keys| + # It's worth noting that we could potentially reduce bytes + # on the wire by switching from getkq to getq, and using + # the opaque value to match requests to responses. + [server, server.pipelined_get_request(keys)] end - - servers.delete_if { |server| deleted.include?(server) } - rescue Dalli::NetworkError - abort_without_timeout(servers) - raise end def finish_query_for_server(server) - server.pipeline_response_setup + server.finish_pipeline_request rescue Dalli::NetworkError raise rescue Dalli::DalliError => e @@ -92,24 +50,24 @@ def abort_without_timeout(servers) servers.each(&:pipeline_abort) end - def fetch_responses(servers, start_time, timeout, &block) + def fetch_responses(requests, timeout, &block) + # FIXME: this was here. why. where should it go? # Remove any servers which are not connected - servers.delete_if { |s| !s.connected? } - return [] if servers.empty? + # servers.delete_if { |s| !s.connected? } - time_left = remaining_time(start_time, timeout) - readable_servers = servers_with_response(servers, time_left) - if readable_servers.empty? - abort_with_timeout(servers) - return [] - end + start_time = Time.now + servers = requests.keys - # Loop through the servers with responses, and - # delete any from our list that are finished - readable_servers.each do |server| - servers.delete(server) if process_server(server, &block) + # FIXME: this was executed before the finish request was sent. Why? + servers.delete_if { |s| !s.alive? } + + # could be postponed to after the first write + servers.each(&:pipeline_response_setup) + + until servers.empty? + time_left = remaining_time(start_time, timeout) + servers = read_write_select(servers, requests, time_left, &block) end - servers rescue NetworkError # Abort and raise if we encountered a network error. This triggers # a retry at the top level. @@ -117,6 +75,70 @@ def fetch_responses(servers, start_time, timeout, &block) raise end + def read_write_select(servers, requests, time_left, &block) + # TODO: - This is a bit challenging. Essentially the PipelinedGetter + # is a reactor, but without the benefit of a Fiber or separate thread. + # My suspicion is that we may want to try and push this down into the + # individual servers, but I'm not sure. For now, we keep the + # mapping between the alerted object (the socket) and the + # corrresponding server here. + server_map = servers.each_with_object({}) { |s, h| h[s.sock] = s } + + readable, writable, = IO.select(server_map.keys, server_map.keys, + nil, time_left) + + if readable.nil? + abort_with_timeout(servers) + return [] + end + + writable.each do |socket| + server = server_map[socket] + process_writable(socket, server, requests) + end + + readable.each do |socket| + server = server_map[socket] + + servers.delete(server) if process_server(server, &block) + end + + servers + end + + def process_writable(socket, server, requests) + request = requests[server] + return unless request + + # FIXME: cache in server class? use a different value? + buffer_size = socket.getsockopt(::Socket::SOL_SOCKET, ::Socket::SO_SNDBUF).int + + chunk = request[0..buffer_size] + written = server.request(:pipelined_get, chunk) + return if written == :wait_writable + + new_request = requests[server][written..] + + # FIXME: check the error handling here. Looks wrong + if new_request.empty? + requests.delete(server) + + begin + finish_query_for_server(server) + rescue Dalli::NetworkError + raise + rescue Dalli::DalliError + servers.delete(server) + end + else + requests[server] = new_request + end + rescue DalliError, NetworkError => e + Dalli.logger.debug { e.inspect } + Dalli.logger.debug { "unable to get keys for server #{server.name}" } + # FIXME: especially here. nothing raised, server not removed from servers + end + def remaining_time(start, timeout) elapsed = Time.now - start return 0 if elapsed > timeout @@ -144,23 +166,6 @@ def process_server(server) server.pipeline_complete? end - def servers_with_response(servers, timeout) - return [] if servers.empty? - - # TODO: - This is a bit challenging. Essentially the PipelinedGetter - # is a reactor, but without the benefit of a Fiber or separate thread. - # My suspicion is that we may want to try and push this down into the - # individual servers, but I'm not sure. For now, we keep the - # mapping between the alerted object (the socket) and the - # corrresponding server here. - server_map = servers.each_with_object({}) { |s, h| h[s.sock] = s } - - readable, = IO.select(server_map.keys, nil, nil, timeout) - return [] if readable.nil? - - readable.map { |sock| server_map[sock] } - end - def groups_for_keys(*keys) keys.flatten! keys.map! { |a| @key_manager.validate_key(a.to_s) } diff --git a/lib/dalli/protocol/base.rb b/lib/dalli/protocol/base.rb index e5a2c39c..ad37eda7 100644 --- a/lib/dalli/protocol/base.rb +++ b/lib/dalli/protocol/base.rb @@ -18,7 +18,7 @@ class Base def_delegators :@value_marshaller, :serializer, :compressor, :compression_min_size, :compress_by_default? def_delegators :@connection_manager, :name, :sock, :hostname, :port, :close, :connected?, :socket_timeout, - :socket_type, :up!, :down!, :write, :reconnect_down_server?, :raise_down_error + :socket_type, :up!, :down!, :write, :write_nonblock, :reconnect_down_server?, :raise_down_error def initialize(attribs, client_options = {}) hostname, port, socket_type, @weight, user_creds = ServerConfigParser.parse(attribs) @@ -59,16 +59,17 @@ def lock!; end def unlock!; end - # Start reading key/value pairs from this connection. This is usually called - # after a series of GETKQ commands. A NOOP is sent, and the server begins - # flushing responses for kv pairs that were found. + # Get ready to read key/value pairs from this connection. + # This is usually called before or after the first GETKQ command. # # Returns nothing. def pipeline_response_setup verify_state(:getkq) - write_noop response_buffer.reset - @connection_manager.start_request! + end + + def finish_pipeline_request + write_noop end # Attempt to receive and parse as many key/value pairs as possible @@ -143,6 +144,14 @@ def quiet? end alias multi? quiet? + def pipelined_get_request(keys) + req = +'' + keys.each do |key| + req << quiet_get_request(key) + end + req + end + # NOTE: Additional public methods should be overridden in Dalli::Threadsafe private @@ -201,13 +210,8 @@ def connect raise end - def pipelined_get(keys) - req = +'' - keys.each do |key| - req << quiet_get_request(key) - end - # Could send noop here instead of in pipeline_response_setup - write(req) + def pipelined_get(bytes) + write_nonblock(bytes) end def response_buffer diff --git a/lib/dalli/protocol/connection_manager.rb b/lib/dalli/protocol/connection_manager.rb index cc9492ad..673d2cf0 100644 --- a/lib/dalli/protocol/connection_manager.rb +++ b/lib/dalli/protocol/connection_manager.rb @@ -169,6 +169,10 @@ def read_nonblock @sock.read_available end + def write_nonblock(bytes) + @sock.write_nonblock(bytes, exception: false) + end + def max_allowed_failures @max_allowed_failures ||= @options[:socket_max_failures] || 2 end diff --git a/test/integration/test_failover.rb b/test/integration/test_failover.rb index 85de7e7d..0230622f 100644 --- a/test/integration/test_failover.rb +++ b/test/integration/test_failover.rb @@ -3,6 +3,15 @@ require_relative '../helper' describe 'failover' do + before do + @level = Dalli.logger.level + Dalli.logger.level = Logger::DEBUG + end + + after do + Dalli.logger.level = @level + end + MemcachedManager.supported_protocols.each do |p| describe "using the #{p} protocol" do # Timeouts on JRuby work differently and aren't firing, meaning we're