Skip to content

Commit

Permalink
Add completableFuture with timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
MonishkaDas committed Dec 26, 2024
1 parent 92edb8f commit 0ce8a08
Showing 1 changed file with 10 additions and 9 deletions.
19 changes: 10 additions & 9 deletions lib/logstash/outputs/kusto/ingestor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -115,28 +115,29 @@ def upload_async(data)

def upload(data)
@logger.info("Sending data to Kusto")

if data.size > 0
ingestionLatch = java.util.concurrent.CountDownLatch.new(1)
thread_exception = nil

Thread.new do
future = java.util.concurrent.CompletableFuture.supplyAsync do
begin
data_source_info = Java::com.microsoft.azure.kusto.ingest.source.StreamSourceInfo.new(java.io.ByteArrayInputStream.new(data.to_java_bytes))
ingestion_result = @kusto_client.ingestFromStream(data_source_info, @ingestion_properties)
@kusto_client.ingestFromStream(data_source_info, @ingestion_properties)
rescue => e
@logger.error('Error during ingestFromStream.', exception: e.class, message: e.message, backtrace: e.backtrace)
thread_exception = e
ensure
ingestionLatch.countDown()
end
end

# Wait for the ingestion to complete with a timeout
if !ingestionLatch.await(@latch_timeout, java.util.concurrent.TimeUnit::SECONDS)
begin
future.get(@latch_timeout, java.util.concurrent.TimeUnit::SECONDS)
rescue java.util.concurrent.TimeoutException => e
@logger.error('Ingestion timed out, possible network issue.')
raise 'Ingestion timed out, possible network issue.'
thread_exception = 'Ingestion timed out, possible network issue.'
rescue java.util.concurrent.ExecutionException => e
thread_exception = e.cause
end

# Raise the exception from the thread if it occurred
raise thread_exception if thread_exception
else
Expand Down

0 comments on commit 0ce8a08

Please sign in to comment.