From b4680e5f0ef562e9f3793e027d106d6839c45349 Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Thu, 9 Jan 2025 10:28:46 +0530 Subject: [PATCH] check for existence of events --- .../filebeat/tests/integration/otel_test.go | 47 +++++-------------- 1 file changed, 12 insertions(+), 35 deletions(-) diff --git a/x-pack/filebeat/tests/integration/otel_test.go b/x-pack/filebeat/tests/integration/otel_test.go index ba0c7ba4339..785e9d151db 100644 --- a/x-pack/filebeat/tests/integration/otel_test.go +++ b/x-pack/filebeat/tests/integration/otel_test.go @@ -23,7 +23,7 @@ import ( "github.com/elastic/go-elasticsearch/v8" ) -var eventsLogFileCfg = ` +var beatsCfgFile = ` filebeat.inputs: - type: filestream id: filestream-input-id @@ -39,36 +39,33 @@ output: protocol: http username: admin password: testing - index: logs-integration-default + index: %s queue.mem.flush.timeout: 0s ` func TestFilebeatOTelE2E(t *testing.T) { integration.EnsureESIsRunning(t) - filebeat := integration.NewBeat( + filebeatOTel := integration.NewBeat( t, "filebeat-otel", "../../filebeat.test", "otel", ) - logFilePath := filepath.Join(filebeat.TempDir(), "log.log") - filebeat.WriteConfigFile(fmt.Sprintf(eventsLogFileCfg, logFilePath)) + logFilePath := filepath.Join(filebeatOTel.TempDir(), "log.log") + filebeatOTel.WriteConfigFile(fmt.Sprintf(beatsCfgFile, logFilePath, "logs-integration-default")) logFile, err := os.Create(logFilePath) if err != nil { t.Fatalf("could not create file '%s': %s", logFilePath, err) } - numEvents := 10 - var msg string - var originalMessage = make(map[string]bool) + numEvents := 5 // write events to log file for i := 0; i < numEvents; i++ { - msg = fmt.Sprintf("Line %d", i) - originalMessage[msg] = false + msg := fmt.Sprintf("Line %d", i) _, err = logFile.Write([]byte(msg + "\n")) require.NoErrorf(t, err, "failed to write line %d to temp file", i) } @@ -80,7 +77,7 @@ func TestFilebeatOTelE2E(t *testing.T) { t.Fatalf("could not close log file '%s': %s", logFilePath, err) } - filebeat.Start() + filebeatOTel.Start() // prepare to query ES esCfg := elasticsearch.Config{ @@ -97,38 +94,18 @@ func TestFilebeatOTelE2E(t *testing.T) { require.NoError(t, err) actualHits := &struct{ Hits int }{} - allRetrieved := false - // wait for logs to be published require.Eventually(t, func() bool { findCtx, findCancel := context.WithTimeout(context.Background(), 10*time.Second) defer findCancel() - docs, err := estools.GetAllLogsForIndexWithContext(findCtx, es, ".ds-logs-integration-default*") + OTelDocs, err := estools.GetAllLogsForIndexWithContext(findCtx, es, ".ds-logs-integration-default*") require.NoError(t, err) - // Mark retrieved messages - for _, hit := range docs.Hits.Hits { - message := hit.Source["Body"].(map[string]interface{})["message"].(string) //nolint:errcheck // err check not required on accessing each doc - - if _, exists := originalMessage[message]; exists { - originalMessage[message] = true // Mark as found - } - } - - // Check for missing messages - for _, retrieved := range originalMessage { - if !retrieved { - allRetrieved = false - break - } - allRetrieved = true - } - - actualHits.Hits = docs.Hits.Total.Value - return (actualHits.Hits == numEvents) && allRetrieved + actualHits.Hits = OTelDocs.Hits.Total.Value + return actualHits.Hits == numEvents }, - 3*time.Minute, 1*time.Second, fmt.Sprintf("actual hits: %d; expected hits: %d; and all messages retrieved: %t", actualHits.Hits, numEvents, allRetrieved)) + 2*time.Minute, 1*time.Second, numEvents, actualHits.Hits) }