Skip to content

Commit

Permalink
check for existence of events
Browse files Browse the repository at this point in the history
  • Loading branch information
khushijain21 committed Jan 9, 2025
1 parent 3eac53a commit b4680e5
Showing 1 changed file with 12 additions and 35 deletions.
47 changes: 12 additions & 35 deletions x-pack/filebeat/tests/integration/otel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"github.com/elastic/go-elasticsearch/v8"
)

var eventsLogFileCfg = `
var beatsCfgFile = `
filebeat.inputs:
- type: filestream
id: filestream-input-id
Expand All @@ -39,36 +39,33 @@ output:
protocol: http
username: admin
password: testing
index: logs-integration-default
index: %s
queue.mem.flush.timeout: 0s
`

func TestFilebeatOTelE2E(t *testing.T) {
integration.EnsureESIsRunning(t)

filebeat := integration.NewBeat(
filebeatOTel := integration.NewBeat(
t,
"filebeat-otel",
"../../filebeat.test",
"otel",
)

logFilePath := filepath.Join(filebeat.TempDir(), "log.log")
filebeat.WriteConfigFile(fmt.Sprintf(eventsLogFileCfg, logFilePath))
logFilePath := filepath.Join(filebeatOTel.TempDir(), "log.log")
filebeatOTel.WriteConfigFile(fmt.Sprintf(beatsCfgFile, logFilePath, "logs-integration-default"))

logFile, err := os.Create(logFilePath)
if err != nil {
t.Fatalf("could not create file '%s': %s", logFilePath, err)
}

numEvents := 10
var msg string
var originalMessage = make(map[string]bool)
numEvents := 5

// write events to log file
for i := 0; i < numEvents; i++ {
msg = fmt.Sprintf("Line %d", i)
originalMessage[msg] = false
msg := fmt.Sprintf("Line %d", i)
_, err = logFile.Write([]byte(msg + "\n"))
require.NoErrorf(t, err, "failed to write line %d to temp file", i)
}
Expand All @@ -80,7 +77,7 @@ func TestFilebeatOTelE2E(t *testing.T) {
t.Fatalf("could not close log file '%s': %s", logFilePath, err)
}

filebeat.Start()
filebeatOTel.Start()

// prepare to query ES
esCfg := elasticsearch.Config{
Expand All @@ -97,38 +94,18 @@ func TestFilebeatOTelE2E(t *testing.T) {
require.NoError(t, err)

actualHits := &struct{ Hits int }{}
allRetrieved := false

// wait for logs to be published
require.Eventually(t,
func() bool {
findCtx, findCancel := context.WithTimeout(context.Background(), 10*time.Second)
defer findCancel()

docs, err := estools.GetAllLogsForIndexWithContext(findCtx, es, ".ds-logs-integration-default*")
OTelDocs, err := estools.GetAllLogsForIndexWithContext(findCtx, es, ".ds-logs-integration-default*")
require.NoError(t, err)

// Mark retrieved messages
for _, hit := range docs.Hits.Hits {
message := hit.Source["Body"].(map[string]interface{})["message"].(string) //nolint:errcheck // err check not required on accessing each doc

if _, exists := originalMessage[message]; exists {
originalMessage[message] = true // Mark as found
}
}

// Check for missing messages
for _, retrieved := range originalMessage {
if !retrieved {
allRetrieved = false
break
}
allRetrieved = true
}

actualHits.Hits = docs.Hits.Total.Value
return (actualHits.Hits == numEvents) && allRetrieved
actualHits.Hits = OTelDocs.Hits.Total.Value
return actualHits.Hits == numEvents
},
3*time.Minute, 1*time.Second, fmt.Sprintf("actual hits: %d; expected hits: %d; and all messages retrieved: %t", actualHits.Hits, numEvents, allRetrieved))
2*time.Minute, 1*time.Second, numEvents, actualHits.Hits)

}

0 comments on commit b4680e5

Please sign in to comment.