Skip to content

Commit

Permalink
Make shard exhaustion events a retry-able error.
Browse files Browse the repository at this point in the history
When an opensearch cluster is at full shard capacity, we shouldn't send
these messages to `@ERROR` as quite often the next practical step would
be to free up shards or expand the cluster.

This will save users from having to process messages from wherever they
send their `@ERROR` output to.
  • Loading branch information
dazoakley committed Nov 27, 2024
1 parent 5bf8450 commit 49291b0
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 23 deletions.
30 changes: 20 additions & 10 deletions lib/fluent/plugin/opensearch_error_handler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ def handle_error(response, tag, chunk, bulk_message_count, extracted_values)
chunk.msgpack_each do |time, rawrecord|
bulk_message = ''
next unless rawrecord.is_a? Hash

begin
# we need a deep copy for process_message to alter
processrecord = Marshal.load(Marshal.dump(rawrecord))
Expand All @@ -95,6 +96,7 @@ def handle_error(response, tag, chunk, bulk_message_count, extracted_values)
stats[:bad_chunk_record] += 1
next
end

item = items.shift
if item.is_a?(Hash) && item.has_key?(@plugin.write_operation)
write_operation = @plugin.write_operation
Expand All @@ -111,6 +113,7 @@ def handle_error(response, tag, chunk, bulk_message_count, extracted_values)
stats[:errors_bad_resp] += 1
next
end

if item[write_operation].has_key?('status')
status = item[write_operation]['status']
else
Expand All @@ -119,25 +122,30 @@ def handle_error(response, tag, chunk, bulk_message_count, extracted_values)
stats[:errors_bad_resp] += 1
next
end

case
when [200, 201].include?(status)
stats[:successes] += 1
when CREATE_OP == write_operation && 409 == status
stats[:duplicates] += 1
when 400 == status
stats[:bad_argument] += 1
reason = ""
log_os_400_reason do
if item[write_operation].has_key?('error') && item[write_operation]['error'].has_key?('type')
reason = " [error type]: #{item[write_operation]['error']['type']}"
error_type = item.dig(write_operation, 'error', 'type')
error_reason = item.dig(write_operation, 'error', 'reason')

# OS presents shard exhaustion as an exception, but this is 100% retryable...
if error_type == 'illegal_argument_exception' && error_reason =~ /would add \[\d+\] total shards, but this cluster/
retry_stream.add(time, rawrecord)
else
stats[:bad_argument] += 1
reason = ""
log_os_400_reason do
reason = " [error type]: #{error_type}" if error_type
reason += " [reason]: \'#{error_reason}\'" if error_reason
end
if item[write_operation].has_key?('error') && item[write_operation]['error'].has_key?('reason')
reason += " [reason]: \'#{item[write_operation]['error']['reason']}\'"
if emit_error_label_event?
@plugin.router.emit_error_event(tag, time, rawrecord, OpenSearchError.new("400 - Rejected by OpenSearch#{reason}"))
end
end
if emit_error_label_event?
@plugin.router.emit_error_event(tag, time, rawrecord, OpenSearchError.new("400 - Rejected by OpenSearch#{reason}"))
end
else
if item[write_operation]['error'].is_a?(String)
reason = item[write_operation]['error']
Expand Down Expand Up @@ -172,11 +180,13 @@ def handle_error(response, tag, chunk, bulk_message_count, extracted_values)
stats[type] += 1
end
end

@plugin.log.on_debug do
msg = ["Indexed (op = #{@plugin.write_operation})"]
stats.each_pair { |key, value| msg << "#{value} #{key}" }
@plugin.log.debug msg.join(', ')
end

raise Fluent::Plugin::OpenSearchOutput::RetryStreamError.new(retry_stream) unless retry_stream.empty?
end
end
62 changes: 49 additions & 13 deletions test/plugin/test_opensearch_error_handler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -235,21 +235,21 @@ def setup
def test_400_responses_reason_log
records = [{time: 123, record: {"foo" => "bar", '_id' => 'abc'}}]
response = parse_response(%({
"took" : 0,
"errors" : true,
"items" : [
{
"create" : {
"_index" : "foo",
"status" : 400,
"error" : {
"type" : "mapper_parsing_exception",
"reason" : "failed to parse"
"took" : 0,
"errors" : true,
"items" : [
{
"create" : {
"_index" : "foo",
"status" : 400,
"error" : {
"type" : "mapper_parsing_exception",
"reason" : "failed to parse"
}
}
}
}
]
}))
]
}))
chunk = MockChunk.new(records)
dummy_extracted_values = []
@handler.handle_error(response, 'atag', chunk, records.length, dummy_extracted_values)
Expand Down Expand Up @@ -371,6 +371,42 @@ def test_rejected_execution_exception_responses
end
end

def test_out_shard_exhaustion_responses
records = [{time: 123, record: {"foo" => "bar", '_id' => 'abc'}}]
response = parse_response(%({
"took" : 0,
"errors" : true,
"items" : [
{
"create" : {
"_index" : "foo",
"status" : 400,
"error" : {
"type" : "illegal_argument_exception",
"reason" : "Validation Failed: 1: this action would add [4] total shards, but this cluster currently has [998]/[1000] maximum shards open;"
}
}
}
]
}))

begin
failed = false
chunk = MockChunk.new(records)
dummy_extracted_values = []
@handler.handle_error(response, 'atag', chunk, records.length, dummy_extracted_values)
rescue Fluent::Plugin::OpenSearchErrorHandler::OpenSearchRequestAbortError, Fluent::Plugin::OpenSearchOutput::RetryStreamError => e
failed = true
records = [].tap do |records|
next unless e.respond_to?(:retry_stream)
e.retry_stream.each {|time, record| records << record}
end
# should retry chunk when unrecoverable error is not thrown
assert_equal(1, records.length)
end
assert_true failed
end

def test_es_rejected_execution_exception_responses_as_not_error
plugin = TestPlugin.new(@log)
plugin.unrecoverable_error_types = ["out_of_memory_error"]
Expand Down

0 comments on commit 49291b0

Please sign in to comment.