From 49291b0e2ee8eb03fcaf7b35f18753a34dc67c0f Mon Sep 17 00:00:00 2001 From: Darren Oakley Date: Wed, 27 Nov 2024 14:00:38 +0000 Subject: [PATCH] Make shard exhaustion events a retry-able error. When an opensearch cluster is at full shard capacity, we shouldn't send these messages to `@ERROR` as quite often the next practical step would be to free up shards or expand the cluster. This will save users from having to process messages from wherever they send their `@ERROR` output to. --- lib/fluent/plugin/opensearch_error_handler.rb | 30 ++++++--- test/plugin/test_opensearch_error_handler.rb | 62 +++++++++++++++---- 2 files changed, 69 insertions(+), 23 deletions(-) diff --git a/lib/fluent/plugin/opensearch_error_handler.rb b/lib/fluent/plugin/opensearch_error_handler.rb index 5002866..7e445f6 100644 --- a/lib/fluent/plugin/opensearch_error_handler.rb +++ b/lib/fluent/plugin/opensearch_error_handler.rb @@ -85,6 +85,7 @@ def handle_error(response, tag, chunk, bulk_message_count, extracted_values) chunk.msgpack_each do |time, rawrecord| bulk_message = '' next unless rawrecord.is_a? Hash + begin # we need a deep copy for process_message to alter processrecord = Marshal.load(Marshal.dump(rawrecord)) @@ -95,6 +96,7 @@ def handle_error(response, tag, chunk, bulk_message_count, extracted_values) stats[:bad_chunk_record] += 1 next end + item = items.shift if item.is_a?(Hash) && item.has_key?(@plugin.write_operation) write_operation = @plugin.write_operation @@ -111,6 +113,7 @@ def handle_error(response, tag, chunk, bulk_message_count, extracted_values) stats[:errors_bad_resp] += 1 next end + if item[write_operation].has_key?('status') status = item[write_operation]['status'] else @@ -119,25 +122,30 @@ def handle_error(response, tag, chunk, bulk_message_count, extracted_values) stats[:errors_bad_resp] += 1 next end + case when [200, 201].include?(status) stats[:successes] += 1 when CREATE_OP == write_operation && 409 == status stats[:duplicates] += 1 when 400 == status - stats[:bad_argument] += 1 - reason = "" - log_os_400_reason do - if item[write_operation].has_key?('error') && item[write_operation]['error'].has_key?('type') - reason = " [error type]: #{item[write_operation]['error']['type']}" + error_type = item.dig(write_operation, 'error', 'type') + error_reason = item.dig(write_operation, 'error', 'reason') + + # OS presents shard exhaustion as an exception, but this is 100% retryable... + if error_type == 'illegal_argument_exception' && error_reason =~ /would add \[\d+\] total shards, but this cluster/ + retry_stream.add(time, rawrecord) + else + stats[:bad_argument] += 1 + reason = "" + log_os_400_reason do + reason = " [error type]: #{error_type}" if error_type + reason += " [reason]: \'#{error_reason}\'" if error_reason end - if item[write_operation].has_key?('error') && item[write_operation]['error'].has_key?('reason') - reason += " [reason]: \'#{item[write_operation]['error']['reason']}\'" + if emit_error_label_event? + @plugin.router.emit_error_event(tag, time, rawrecord, OpenSearchError.new("400 - Rejected by OpenSearch#{reason}")) end end - if emit_error_label_event? - @plugin.router.emit_error_event(tag, time, rawrecord, OpenSearchError.new("400 - Rejected by OpenSearch#{reason}")) - end else if item[write_operation]['error'].is_a?(String) reason = item[write_operation]['error'] @@ -172,11 +180,13 @@ def handle_error(response, tag, chunk, bulk_message_count, extracted_values) stats[type] += 1 end end + @plugin.log.on_debug do msg = ["Indexed (op = #{@plugin.write_operation})"] stats.each_pair { |key, value| msg << "#{value} #{key}" } @plugin.log.debug msg.join(', ') end + raise Fluent::Plugin::OpenSearchOutput::RetryStreamError.new(retry_stream) unless retry_stream.empty? end end diff --git a/test/plugin/test_opensearch_error_handler.rb b/test/plugin/test_opensearch_error_handler.rb index ac94291..157e14f 100644 --- a/test/plugin/test_opensearch_error_handler.rb +++ b/test/plugin/test_opensearch_error_handler.rb @@ -235,21 +235,21 @@ def setup def test_400_responses_reason_log records = [{time: 123, record: {"foo" => "bar", '_id' => 'abc'}}] response = parse_response(%({ - "took" : 0, - "errors" : true, - "items" : [ - { - "create" : { - "_index" : "foo", - "status" : 400, - "error" : { - "type" : "mapper_parsing_exception", - "reason" : "failed to parse" + "took" : 0, + "errors" : true, + "items" : [ + { + "create" : { + "_index" : "foo", + "status" : 400, + "error" : { + "type" : "mapper_parsing_exception", + "reason" : "failed to parse" + } } } - } - ] - })) + ] + })) chunk = MockChunk.new(records) dummy_extracted_values = [] @handler.handle_error(response, 'atag', chunk, records.length, dummy_extracted_values) @@ -371,6 +371,42 @@ def test_rejected_execution_exception_responses end end + def test_out_shard_exhaustion_responses + records = [{time: 123, record: {"foo" => "bar", '_id' => 'abc'}}] + response = parse_response(%({ + "took" : 0, + "errors" : true, + "items" : [ + { + "create" : { + "_index" : "foo", + "status" : 400, + "error" : { + "type" : "illegal_argument_exception", + "reason" : "Validation Failed: 1: this action would add [4] total shards, but this cluster currently has [998]/[1000] maximum shards open;" + } + } + } + ] + })) + + begin + failed = false + chunk = MockChunk.new(records) + dummy_extracted_values = [] + @handler.handle_error(response, 'atag', chunk, records.length, dummy_extracted_values) + rescue Fluent::Plugin::OpenSearchErrorHandler::OpenSearchRequestAbortError, Fluent::Plugin::OpenSearchOutput::RetryStreamError => e + failed = true + records = [].tap do |records| + next unless e.respond_to?(:retry_stream) + e.retry_stream.each {|time, record| records << record} + end + # should retry chunk when unrecoverable error is not thrown + assert_equal(1, records.length) + end + assert_true failed + end + def test_es_rejected_execution_exception_responses_as_not_error plugin = TestPlugin.new(@log) plugin.unrecoverable_error_types = ["out_of_memory_error"]