Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: make sure we use a new codec on restart #299

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 17 additions & 11 deletions lib/logstash/inputs/file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ def register
else
@watcher_class = FileWatch::ObservingRead
end
@codec = LogStash::Codecs::IdentityMapCodec.new(@codec)
@base_codec = @codec # TODO is there a way register would be called after run (on restart?)
@completely_stopped = Concurrent::AtomicBoolean.new
@queue = Concurrent::AtomicReference.new

Expand All @@ -345,11 +345,12 @@ def listener_for(path)
end

def start_processing
# if the pipeline restarts this input,
# make sure previous files are closed
stop
# if the pipeline restarts this input, make sure previous files are closed
quit_watcher
@codec.close if @codec.is_a?(LogStash::Codecs::IdentityMapCodec)

@watcher = @watcher_class.new(@filewatch_config)
@codec = LogStash::Codecs::IdentityMapCodec.new(@base_codec)

@completed_file_handlers = []
if read_mode?
Expand Down Expand Up @@ -386,19 +387,19 @@ def post_process_this(event, path)
def handle_deletable_path(path)
return if tail_mode?
return if @completed_file_handlers.empty?
@logger.debug? && @logger.debug(__method__.to_s, :path => path)
@logger.trace? && @logger.trace(__method__.to_s, :path => path)
@completed_file_handlers.each { |handler| handler.handle(path) }
end

def log_line_received(path, line)
@logger.debug? && @logger.debug("Received line", :path => path, :text => line)
end

# @override LogStash::Inputs::Base#stop
def stop
unless @watcher.nil?
@codec.close
@watcher.quit
end
@logger.trace? && @logger.trace(__method__.to_s, @path)
quit_watcher
@codec.close if @codec
end

# @private used in specs
Expand All @@ -408,6 +409,10 @@ def queue

private

def quit_watcher
@watcher.quit if @watcher
end

def build_sincedb_base_from_settings(settings)
logstash_data_path = settings.get_value("path.data")
Pathname.new(logstash_data_path).join("plugins", "inputs", "file").tap do |path|
Expand All @@ -423,7 +428,7 @@ def attempt_set(event, field_reference, value)

event.set(field_reference, value)
rescue => e
logger.trace("failed to set #{field_reference} to `#{value}`", :exception => e.message)
logger.trace("failed to set #{field_reference} to `#{value}`", :exception => e.class, :message => e.message)
false
end

Expand Down Expand Up @@ -456,14 +461,15 @@ def read_mode?
end

def exit_flush
@logger.trace? && @logger.trace(__method__.to_s, @path)
listener = FlushableListener.new("none", self)
if @codec.identity_count.zero?
# using the base codec without identity/path info
@codec.base_codec.flush do |event|
begin
listener.process_event(event)
rescue => e
@logger.error("File Input: flush on exit downstream error", :exception => e)
@logger.error("flush on exit downstream error", :exception => e.class, :message => e.message)
end
end
else
Expand Down
8 changes: 1 addition & 7 deletions logstash-input-file.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,7 @@ Gem::Specification.new do |s|
s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99"

s.add_runtime_dependency 'logstash-codec-plain'

if RUBY_VERSION.start_with?("1")
s.add_runtime_dependency 'rake', '~> 12.2.0'
s.add_runtime_dependency 'addressable', '~> 2.4.0'
else
s.add_runtime_dependency 'addressable'
end
s.add_runtime_dependency 'addressable'

s.add_runtime_dependency 'concurrent-ruby', '~> 1.0'
s.add_runtime_dependency 'logstash-codec-multiline', ['~> 3.0']
Expand Down
8 changes: 4 additions & 4 deletions spec/inputs/file_tail_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@
end
end
.then("assert both files are mapped as identities and stop") do
wait(2).for {subject.codec.identity_count}.to eq(2), "both files are not mapped as identities"
wait(2).for { subject.codec.identity_count }.to eq(2), "both files are not mapped as identities"
end
.then("stop") do
subject.stop
Expand Down Expand Up @@ -473,13 +473,13 @@
it "collects line events from only one file" do
actions = RSpec::Sequencing
.run("assert one identity is mapped") do
wait(0.4).for{subject.codec.identity_count}.to be > 0, "no identity is mapped"
wait(0.4).for{ subject.codec.respond_to?(:identity_count) ? subject.codec.identity_count : 0 }.to be > 0, "no identity is mapped"
end
.then("stop") do
subject.stop
end
.then("stop flushes last event") do
wait(0.4).for{events.size}.to eq(2), "events size does not equal 2"
wait(0.4).for{ events.size }.to eq(2), "events size does not equal 2"
end
subject.run(events)
# wait for actions future value
Expand Down Expand Up @@ -508,7 +508,7 @@
it "collects line events from both files" do
actions = RSpec::Sequencing
.run("assert both identities are mapped and the first two events are built") do
wait(0.4).for{subject.codec.identity_count == 1 && events.size == 2}.to eq(true), "both identities are not mapped and the first two events are not built"
wait(0.4).for{ events.size == 2 && subject.codec.identity_count == 1 }.to eq(true), "both identities are not mapped and the first two events are not built"
end
.then("wait for close to flush last event of each identity") do
wait(0.8).for{events.size}.to eq(4), "close does not flush last event of each identity"
Expand Down