Skip to content

Commit

Permalink
interleave read and write on pipelined_get
Browse files Browse the repository at this point in the history
fixes #776.
fixes #941.

When reading a large number of keys, memcached starts sending the
response when dalli is not yet finished sending the request.

As we did not start reading the response until we were finished writing
the request, this could lead to the following problem:

* the receive buffer (rcvbuf) would fill up
* due to TCP backpressure, memcached would stop sending (okay as we are
  not reading anyway), but also stop reading
* the send buffer (sndbuf) would also fill up
* as we were using a blocking write without timeout, we would block
  forever (at least with ruby < 3.2, which introduces IO::Timeout, see
  #967)

This is addressed by using IO::select on the sockets for both read and
write, and thus start reading as soon as data is available.
  • Loading branch information
Martin Sander authored and marvinthepa committed Nov 2, 2024
1 parent 06be66d commit f20883f
Show file tree
Hide file tree
Showing 7 changed files with 149 additions and 91 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ Unreleased
==========

- Fix cannot read response data included terminator `\r\n` when use meta protocol (matsubara0507)
- Do not block on get_multi with a large number of keys (marvinthepa)

3.2.8
==========
Expand Down
163 changes: 86 additions & 77 deletions lib/dalli/pipelined_getter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,68 +17,26 @@ def process(keys, &block)
return {} if keys.empty?

@ring.lock do
servers = setup_requests(keys)
start_time = Time.now
servers = fetch_responses(servers, start_time, @ring.socket_timeout, &block) until servers.empty?
requests = setup_requests(keys)
fetch_responses(requests, @ring.socket_timeout, &block)
end
rescue NetworkError => e
Dalli.logger.debug { e.inspect }
Dalli.logger.debug { 'retrying pipelined gets because of timeout' }
retry
end

def setup_requests(keys)
groups = groups_for_keys(keys)
make_getkq_requests(groups)

# TODO: How does this exit on a NetworkError
finish_queries(groups.keys)
end

##
# Loop through the server-grouped sets of keys, writing
# the corresponding getkq requests to the appropriate servers
#
# It's worth noting that we could potentially reduce bytes
# on the wire by switching from getkq to getq, and using
# the opaque value to match requests to responses.
##
def make_getkq_requests(groups)
groups.each do |server, keys_for_server|
server.request(:pipelined_get, keys_for_server)
rescue DalliError, NetworkError => e
Dalli.logger.debug { e.inspect }
Dalli.logger.debug { "unable to get keys for server #{server.name}" }
def setup_requests(all_keys)
groups_for_keys(all_keys).to_h do |server, keys|
# It's worth noting that we could potentially reduce bytes
# on the wire by switching from getkq to getq, and using
# the opaque value to match requests to responses.
[server, server.pipelined_get_request(keys)]
end
end

##
# This loops through the servers that have keys in
# our set, sending the noop to terminate the set of queries.
##
def finish_queries(servers)
deleted = []

servers.each do |server|
next unless server.connected?

begin
finish_query_for_server(server)
rescue Dalli::NetworkError
raise
rescue Dalli::DalliError
deleted.append(server)
end
end

servers.delete_if { |server| deleted.include?(server) }
rescue Dalli::NetworkError
abort_without_timeout(servers)
raise
end

def finish_query_for_server(server)
server.pipeline_response_setup
server.finish_pipeline_request
rescue Dalli::NetworkError
raise
rescue Dalli::DalliError => e
Expand All @@ -92,29 +50,97 @@ def abort_without_timeout(servers)
servers.each(&:pipeline_abort)
end

def fetch_responses(servers, start_time, timeout, &block)
def fetch_responses(requests, timeout, &block)
start_time = Time.now
servers = requests.keys

# Remove any servers which are not connected
servers.delete_if { |s| !s.connected? }
return [] if servers.empty?

time_left = remaining_time(start_time, timeout)
readable_servers = servers_with_response(servers, time_left)
if readable_servers.empty?
servers.each(&:pipeline_response_setup)

until servers.empty?
time_left = remaining_time(start_time, timeout)
servers = read_write_select(servers, requests, time_left, &block)
end
rescue NetworkError
# Abort and raise if we encountered a network error. This triggers
# a retry at the top level.
abort_without_timeout(servers)
raise
end

def read_write_select(servers, requests, time_left, &block)
# TODO: - This is a bit challenging. Essentially the PipelinedGetter
# is a reactor, but without the benefit of a Fiber or separate thread.
# My suspicion is that we may want to try and push this down into the
# individual servers, but I'm not sure. For now, we keep the
# mapping between the alerted object (the socket) and the
# corrresponding server here.
server_map = servers.each_with_object({}) { |s, h| h[s.sock] = s }

readable, writable, = IO.select(server_map.keys, server_map.keys,
nil, time_left)

if readable.nil?
abort_with_timeout(servers)
return []
end

writable.each do |socket|
server = server_map[socket]
process_writable(server, servers, requests)
end

# Loop through the servers with responses, and
# delete any from our list that are finished
readable_servers.each do |server|
readable.each do |socket|
server = server_map[socket]

servers.delete(server) if process_server(server, &block)
end

servers
rescue NetworkError
# Abort and raise if we encountered a network error. This triggers
# a retry at the top level.
end

def process_writable(server, servers, requests)
request = requests[server]
return unless request

new_request = server_pipelined_get(server, request)

if new_request.empty?
requests.delete(server)

begin
finish_query_for_server(server)
rescue Dalli::NetworkError
raise
rescue Dalli::DalliError
servers.delete(server)
end
else
requests[server] = new_request
end
rescue Dalli::NetworkError
abort_without_timeout(servers)
raise
rescue DalliError => e
Dalli.logger.debug { e.inspect }
Dalli.logger.debug { "unable to get keys for server #{server.name}" }
end

def server_pipelined_get(server, request)
chunk = request[0..server.chunk_size]
written = server.request(:pipelined_get, chunk)
return request if written == :wait_writable

request[written..]
rescue Dalli::NetworkError
raise
rescue DalliError => e
Dalli.logger.debug { e.inspect }
Dalli.logger.debug { "unable to get keys for server #{server.name}" }
end

def remaining_time(start, timeout)
Expand Down Expand Up @@ -144,23 +170,6 @@ def process_server(server)
server.pipeline_complete?
end

def servers_with_response(servers, timeout)
return [] if servers.empty?

# TODO: - This is a bit challenging. Essentially the PipelinedGetter
# is a reactor, but without the benefit of a Fiber or separate thread.
# My suspicion is that we may want to try and push this down into the
# individual servers, but I'm not sure. For now, we keep the
# mapping between the alerted object (the socket) and the
# corrresponding server here.
server_map = servers.each_with_object({}) { |s, h| h[s.sock] = s }

readable, = IO.select(server_map.keys, nil, nil, timeout)
return [] if readable.nil?

readable.map { |sock| server_map[sock] }
end

def groups_for_keys(*keys)
keys.flatten!
keys.map! { |a| @key_manager.validate_key(a.to_s) }
Expand Down
36 changes: 22 additions & 14 deletions lib/dalli/protocol/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ class Base

def_delegators :@value_marshaller, :serializer, :compressor, :compression_min_size, :compress_by_default?
def_delegators :@connection_manager, :name, :sock, :hostname, :port, :close, :connected?, :socket_timeout,
:socket_type, :up!, :down!, :write, :reconnect_down_server?, :raise_down_error
:socket_type, :chunk_size, :up!, :down!, :write, :write_nonblock, :reconnect_down_server?,
:raise_down_error

def initialize(attribs, client_options = {})
hostname, port, socket_type, @weight, user_creds = ServerConfigParser.parse(attribs)
Expand Down Expand Up @@ -66,15 +67,13 @@ def lock!; end

def unlock!; end

# Start reading key/value pairs from this connection. This is usually called
# after a series of GETKQ commands. A NOOP is sent, and the server begins
# flushing responses for kv pairs that were found.
#
# Returns nothing.
def pipeline_response_setup
response_buffer.reset
end

def finish_pipeline_request
verify_pipelined_state(:getkq)
write_noop
response_buffer.reset
end

# Attempt to receive and parse as many key/value pairs as possible
Expand Down Expand Up @@ -149,6 +148,14 @@ def quiet?
end
alias multi? quiet?

def pipelined_get_request(keys)
req = +String.new(capacity: pipelined_get_capacity(keys))
keys.each do |key|
req << quiet_get_request(key)
end
req
end

# NOTE: Additional public methods should be overridden in Dalli::Threadsafe

private
Expand Down Expand Up @@ -210,13 +217,14 @@ def connect
up!
end

def pipelined_get(keys)
req = +''
keys.each do |key|
req << quiet_get_request(key)
end
# Could send noop here instead of in pipeline_response_setup
write(req)
def pipelined_get(bytes)
write_nonblock(bytes)
rescue SystemCallError, Timeout::Error, EOFError => e
@connection_manager.error_on_request!(e)
end

def pipelined_get_capacity(keys)
(keys.size * request_header_size) + keys.reduce(0) { |acc, k| acc + k.size }
end

def response_buffer
Expand Down
4 changes: 4 additions & 0 deletions lib/dalli/protocol/binary.rb
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,10 @@ def write_noop
write(req)
end

def request_header_size
24
end

require_relative 'binary/request_formatter'
require_relative 'binary/response_header'
require_relative 'binary/response_processor'
Expand Down
11 changes: 11 additions & 0 deletions lib/dalli/protocol/connection_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,10 @@ def read_nonblock
@sock.read_available
end

def write_nonblock(bytes)
@sock.write_nonblock(bytes, exception: false)
end

def max_allowed_failures
@max_allowed_failures ||= @options[:socket_max_failures] || 2
end
Expand Down Expand Up @@ -250,6 +254,13 @@ def log_up_detected
time = Time.now - @down_at
Dalli.logger.warn { format('%<name>s is back (downtime was %<time>.3f seconds)', name: name, time: time) }
end

def chunk_size
@chunk_size ||=
options[:chunk_size] ||
@sock&.getsockopt(::Socket::SOL_SOCKET, ::Socket::SO_SNDBUF)&.int ||
32_768
end
end
end
end
4 changes: 4 additions & 0 deletions lib/dalli/protocol/meta.rb
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,10 @@ def authenticate_connection
raise Dalli::DalliError, 'Authentication not supported for the meta protocol.'
end

def request_header_size
17
end

require_relative 'meta/key_regularizer'
require_relative 'meta/request_formatter'
require_relative 'meta/response_processor'
Expand Down
21 changes: 21 additions & 0 deletions test/integration/test_pipelined_get.rb
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,27 @@
end
end

it 'does not block for a large number of existing keys' do
memcached_persistent(p) do |dc|
dc.close
dc.flush

key_count = 200_000
range = 0...key_count
dc.quiet do
range.each { |i| dc.set(i, "foobar_#{i}") }
end

Timeout.timeout 60 do
resp = dc.get_multi(range.to_a)

assert_equal key_count, resp.count
end
rescue Timeout::Error
flunk "timed out while getting #{key_count} keys with get_multi"
end
end

describe 'pipeline_next_responses' do
it 'raises NetworkError when called before pipeline_response_setup' do
memcached_persistent(p) do |dc|
Expand Down

0 comments on commit f20883f

Please sign in to comment.