From 6493f3daeb9462c9bac5fa3dbb7858a86c802958 Mon Sep 17 00:00:00 2001 From: Martin Sander Date: Fri, 9 Dec 2022 23:50:42 +0100 Subject: [PATCH] interleave read and write on pipelined_get fixes #776. fixes #941. When reading a large number of keys, memcached starts sending the response when dalli is not yet finished sending the request. As we did not start reading the response until we were finished writing the request, this could lead to the following problem: * the receive buffer (rcvbuf) would fill up * due to TCP backpressure, memcached would stop sending (okay as we are not reading anyway), but also stop reading * the send buffer (sndbuf) would also fill up * as we were using a blocking write without timeout, we would block forever (at least with ruby < 3.2, which introduces IO::Timeout, see #967) This is addressed by using IO::select on the sockets for both read and write, and thus start reading as soon as data is available. --- lib/dalli/pipelined_getter.rb | 164 ++++++++++++----------- lib/dalli/protocol/base.rb | 36 +++-- lib/dalli/protocol/binary.rb | 4 + lib/dalli/protocol/connection_manager.rb | 10 ++ lib/dalli/protocol/meta.rb | 4 + test/integration/test_pipelined_get.rb | 21 +++ 6 files changed, 148 insertions(+), 91 deletions(-) diff --git a/lib/dalli/pipelined_getter.rb b/lib/dalli/pipelined_getter.rb index 5fbb8bb2..e95aa988 100644 --- a/lib/dalli/pipelined_getter.rb +++ b/lib/dalli/pipelined_getter.rb @@ -17,9 +17,8 @@ def process(keys, &block) return {} if keys.empty? @ring.lock do - servers = setup_requests(keys) - start_time = Time.now - servers = fetch_responses(servers, start_time, @ring.socket_timeout, &block) until servers.empty? + requests = setup_requests(keys) + fetch_responses(requests, @ring.socket_timeout, &block) end rescue NetworkError => e Dalli.logger.debug { e.inspect } @@ -27,58 +26,17 @@ def process(keys, &block) retry end - def setup_requests(keys) - groups = groups_for_keys(keys) - make_getkq_requests(groups) - - # TODO: How does this exit on a NetworkError - finish_queries(groups.keys) - end - - ## - # Loop through the server-grouped sets of keys, writing - # the corresponding getkq requests to the appropriate servers - # - # It's worth noting that we could potentially reduce bytes - # on the wire by switching from getkq to getq, and using - # the opaque value to match requests to responses. - ## - def make_getkq_requests(groups) - groups.each do |server, keys_for_server| - server.request(:pipelined_get, keys_for_server) - rescue DalliError, NetworkError => e - Dalli.logger.debug { e.inspect } - Dalli.logger.debug { "unable to get keys for server #{server.name}" } + def setup_requests(all_keys) + groups_for_keys(all_keys).to_h do |server, keys| + # It's worth noting that we could potentially reduce bytes + # on the wire by switching from getkq to getq, and using + # the opaque value to match requests to responses. + [server, server.pipelined_get_request(keys)] end end - ## - # This loops through the servers that have keys in - # our set, sending the noop to terminate the set of queries. - ## - def finish_queries(servers) - deleted = [] - - servers.each do |server| - next unless server.connected? - - begin - finish_query_for_server(server) - rescue Dalli::NetworkError - raise - rescue Dalli::DalliError - deleted.append(server) - end - end - - servers.delete_if { |server| deleted.include?(server) } - rescue Dalli::NetworkError - abort_without_timeout(servers) - raise - end - def finish_query_for_server(server) - server.pipeline_response_setup + server.finish_pipeline_request rescue Dalli::NetworkError raise rescue Dalli::DalliError => e @@ -92,29 +50,98 @@ def abort_without_timeout(servers) servers.each(&:pipeline_abort) end - def fetch_responses(servers, start_time, timeout, &block) + def fetch_responses(requests, timeout, &block) + start_time = Time.now + servers = requests.keys + # Remove any servers which are not connected servers.delete_if { |s| !s.connected? } - return [] if servers.empty? - time_left = remaining_time(start_time, timeout) - readable_servers = servers_with_response(servers, time_left) - if readable_servers.empty? + servers.each(&:pipeline_response_setup) + + until servers.empty? + time_left = remaining_time(start_time, timeout) + servers = read_write_select(servers, requests, time_left, &block) + end + rescue NetworkError + # Abort and raise if we encountered a network error. This triggers + # a retry at the top level. + abort_without_timeout(servers) + raise + end + + def read_write_select(servers, requests, time_left, &block) + # TODO: - This is a bit challenging. Essentially the PipelinedGetter + # is a reactor, but without the benefit of a Fiber or separate thread. + # My suspicion is that we may want to try and push this down into the + # individual servers, but I'm not sure. For now, we keep the + # mapping between the alerted object (the socket) and the + # corrresponding server here. + server_map = servers.each_with_object({}) { |s, h| h[s.sock] = s } + + readable, writable, = IO.select(server_map.keys, server_map.keys, + nil, time_left) + + if readable.nil? abort_with_timeout(servers) return [] end + writable.each do |socket| + server = server_map[socket] + process_writable(server, servers, requests) + end + # Loop through the servers with responses, and # delete any from our list that are finished - readable_servers.each do |server| + readable.each do |socket| + server = server_map[socket] + servers.delete(server) if process_server(server, &block) end + servers - rescue NetworkError - # Abort and raise if we encountered a network error. This triggers - # a retry at the top level. + end + + def process_writable(server, servers, requests) + request = requests[server] + return unless request + + new_request = server_pipelined_get(server, request) + + if new_request.empty? + requests.delete(server) + + begin + finish_query_for_server(server) + rescue Dalli::NetworkError + raise + rescue Dalli::DalliError + servers.delete(server) + end + else + requests[server] = new_request + end + rescue Dalli::NetworkError abort_without_timeout(servers) raise + rescue DalliError => e + Dalli.logger.debug { e.inspect } + Dalli.logger.debug { "unable to get keys for server #{server.name}" } + end + + def server_pipelined_get(server, request) + buffer_size = server.socket_sndbuf + chunk = request[0..buffer_size] + written = server.request(:pipelined_get, chunk) + return request if written == :wait_writable + + request[written..] + rescue Dalli::NetworkError + raise + rescue DalliError => e + Dalli.logger.debug { e.inspect } + Dalli.logger.debug { "unable to get keys for server #{server.name}" } end def remaining_time(start, timeout) @@ -144,23 +171,6 @@ def process_server(server) server.pipeline_complete? end - def servers_with_response(servers, timeout) - return [] if servers.empty? - - # TODO: - This is a bit challenging. Essentially the PipelinedGetter - # is a reactor, but without the benefit of a Fiber or separate thread. - # My suspicion is that we may want to try and push this down into the - # individual servers, but I'm not sure. For now, we keep the - # mapping between the alerted object (the socket) and the - # corrresponding server here. - server_map = servers.each_with_object({}) { |s, h| h[s.sock] = s } - - readable, = IO.select(server_map.keys, nil, nil, timeout) - return [] if readable.nil? - - readable.map { |sock| server_map[sock] } - end - def groups_for_keys(*keys) keys.flatten! keys.map! { |a| @key_manager.validate_key(a.to_s) } diff --git a/lib/dalli/protocol/base.rb b/lib/dalli/protocol/base.rb index 74274f80..5ff1bc87 100644 --- a/lib/dalli/protocol/base.rb +++ b/lib/dalli/protocol/base.rb @@ -18,7 +18,8 @@ class Base def_delegators :@value_marshaller, :serializer, :compressor, :compression_min_size, :compress_by_default? def_delegators :@connection_manager, :name, :sock, :hostname, :port, :close, :connected?, :socket_timeout, - :socket_type, :up!, :down!, :write, :reconnect_down_server?, :raise_down_error + :socket_type, :socket_sndbuf, :up!, :down!, :write, :write_nonblock, :reconnect_down_server?, + :raise_down_error def initialize(attribs, client_options = {}) hostname, port, socket_type, @weight, user_creds = ServerConfigParser.parse(attribs) @@ -66,15 +67,13 @@ def lock!; end def unlock!; end - # Start reading key/value pairs from this connection. This is usually called - # after a series of GETKQ commands. A NOOP is sent, and the server begins - # flushing responses for kv pairs that were found. - # - # Returns nothing. def pipeline_response_setup + response_buffer.reset + end + + def finish_pipeline_request verify_pipelined_state(:getkq) write_noop - response_buffer.reset end # Attempt to receive and parse as many key/value pairs as possible @@ -149,6 +148,14 @@ def quiet? end alias multi? quiet? + def pipelined_get_request(keys) + req = +String.new(capacity: pipelined_get_capacity(keys)) + keys.each do |key| + req << quiet_get_request(key) + end + req + end + # NOTE: Additional public methods should be overridden in Dalli::Threadsafe private @@ -210,13 +217,14 @@ def connect up! end - def pipelined_get(keys) - req = +'' - keys.each do |key| - req << quiet_get_request(key) - end - # Could send noop here instead of in pipeline_response_setup - write(req) + def pipelined_get(bytes) + write_nonblock(bytes) + rescue SystemCallError, Timeout::Error, EOFError => e + @connection_manager.error_on_request!(e) + end + + def pipelined_get_capacity(keys) + (keys.size * request_header_size) + keys.reduce(0) { |acc, k| acc + k.size } end def response_buffer diff --git a/lib/dalli/protocol/binary.rb b/lib/dalli/protocol/binary.rb index 66f71516..fe6c86e9 100644 --- a/lib/dalli/protocol/binary.rb +++ b/lib/dalli/protocol/binary.rb @@ -163,6 +163,10 @@ def write_noop write(req) end + def request_header_size + 24 + end + require_relative 'binary/request_formatter' require_relative 'binary/response_header' require_relative 'binary/response_processor' diff --git a/lib/dalli/protocol/connection_manager.rb b/lib/dalli/protocol/connection_manager.rb index fe8bd911..ae66d625 100644 --- a/lib/dalli/protocol/connection_manager.rb +++ b/lib/dalli/protocol/connection_manager.rb @@ -172,6 +172,10 @@ def read_nonblock @sock.read_available end + def write_nonblock(bytes) + @sock.write_nonblock(bytes, exception: false) + end + def max_allowed_failures @max_allowed_failures ||= @options[:socket_max_failures] || 2 end @@ -250,6 +254,12 @@ def log_up_detected time = Time.now - @down_at Dalli.logger.warn { format('%s is back (downtime was %