diff --git a/lib/logstash/outputs/kusto/ingestor.rb b/lib/logstash/outputs/kusto/ingestor.rb index b0c2d42..969653f 100755 --- a/lib/logstash/outputs/kusto/ingestor.rb +++ b/lib/logstash/outputs/kusto/ingestor.rb @@ -115,28 +115,29 @@ def upload_async(data) def upload(data) @logger.info("Sending data to Kusto") - + if data.size > 0 - ingestionLatch = java.util.concurrent.CountDownLatch.new(1) thread_exception = nil - Thread.new do + future = java.util.concurrent.CompletableFuture.supplyAsync do begin data_source_info = Java::com.microsoft.azure.kusto.ingest.source.StreamSourceInfo.new(java.io.ByteArrayInputStream.new(data.to_java_bytes)) - ingestion_result = @kusto_client.ingestFromStream(data_source_info, @ingestion_properties) + @kusto_client.ingestFromStream(data_source_info, @ingestion_properties) rescue => e @logger.error('Error during ingestFromStream.', exception: e.class, message: e.message, backtrace: e.backtrace) thread_exception = e - ensure - ingestionLatch.countDown() end end - # Wait for the ingestion to complete with a timeout - if !ingestionLatch.await(@latch_timeout, java.util.concurrent.TimeUnit::SECONDS) + begin + future.get(@latch_timeout, java.util.concurrent.TimeUnit::SECONDS) + rescue java.util.concurrent.TimeoutException => e @logger.error('Ingestion timed out, possible network issue.') - raise 'Ingestion timed out, possible network issue.' + thread_exception = 'Ingestion timed out, possible network issue.' + rescue java.util.concurrent.ExecutionException => e + thread_exception = e.cause end + # Raise the exception from the thread if it occurred raise thread_exception if thread_exception else