Skip to content

Commit

Permalink
adapt tests
Browse files Browse the repository at this point in the history
Signed-off-by: Florian Bacher <florian.bacher@dynatrace.com>
  • Loading branch information
bacherfl committed Oct 9, 2024
1 parent 04709ed commit 602a01d
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 25 deletions.
12 changes: 8 additions & 4 deletions pkg/stanza/adapter/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
func createNoopReceiver(nextConsumer consumer.Logs) (*receiver, error) {
set := componenttest.NewNopTelemetrySettings()
set.Logger = zap.NewNop()
emitter := helper.NewLogEmitter(set)

pipe, err := pipeline.Config{
Operators: []operator.Config{
{
Expand All @@ -48,15 +48,19 @@ func createNoopReceiver(nextConsumer consumer.Logs) (*receiver, error) {
return nil, err
}

return &receiver{
rcv := &receiver{
set: set,
id: component.MustNewID("testReceiver"),
pipe: pipe,
emitter: emitter,
consumer: nextConsumer,
converter: NewConverter(componenttest.NewNopTelemetrySettings()),
obsrecv: obsrecv,
}, nil
}

emitter := helper.NewLogEmitter(set, rcv.consumeEntries)

rcv.emitter = emitter
return rcv, nil
}

// BenchmarkEmitterToConsumer serves as a benchmark for entries going from the emitter to consumer,
Expand Down
34 changes: 14 additions & 20 deletions pkg/stanza/adapter/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ func TestHandleConsume(t *testing.T) {
require.NoError(t, err, "receiver start failed")

stanzaReceiver := logsReceiver.(*receiver)
logChan := stanzaReceiver.emitter.OutChannelForWrite()
logChan <- []*entry.Entry{entry.New()}

stanzaReceiver.consumeEntries(context.Background(), []*entry.Entry{entry.New()})

// Eventually because of asynchronuous nature of the receiver.
require.Eventually(t,
Expand All @@ -106,8 +106,8 @@ func TestHandleConsumeRetry(t *testing.T) {
require.NoError(t, logsReceiver.Start(context.Background(), componenttest.NewNopHost()))

stanzaReceiver := logsReceiver.(*receiver)
logChan := stanzaReceiver.emitter.OutChannelForWrite()
logChan <- []*entry.Entry{entry.New()}

stanzaReceiver.consumeEntries(context.Background(), []*entry.Entry{entry.New()})

require.Eventually(t,
func() bool {
Expand Down Expand Up @@ -174,7 +174,11 @@ func BenchmarkReadLine(b *testing.B) {
require.NoError(b, yaml.Unmarshal([]byte(pipelineYaml), &operatorCfgs))

set := componenttest.NewNopTelemetrySettings()
emitter := helper.NewLogEmitter(set)
emitter := helper.NewLogEmitter(set, func(_ context.Context, entries []*entry.Entry) {
for _, e := range entries {
convert(e)
}
})
defer func() {
require.NoError(b, emitter.Stop())
}()
Expand Down Expand Up @@ -202,13 +206,6 @@ func BenchmarkReadLine(b *testing.B) {
// Run the actual benchmark
b.ResetTimer()
require.NoError(b, pipe.Start(storageClient))
logChan := emitter.OutChannel()
for i := 0; i < b.N; i++ {
entries := <-logChan
for _, e := range entries {
convert(e)
}
}
}

func BenchmarkParseAndMap(b *testing.B) {
Expand Down Expand Up @@ -241,7 +238,11 @@ func BenchmarkParseAndMap(b *testing.B) {
require.NoError(b, yaml.Unmarshal([]byte(pipelineYaml), &operatorCfgs))

set := componenttest.NewNopTelemetrySettings()
emitter := helper.NewLogEmitter(set)
emitter := helper.NewLogEmitter(set, func(_ context.Context, entries []*entry.Entry) {
for _, e := range entries {
convert(e)
}
})
defer func() {
require.NoError(b, emitter.Stop())
}()
Expand Down Expand Up @@ -269,11 +270,4 @@ func BenchmarkParseAndMap(b *testing.B) {
// Run the actual benchmark
b.ResetTimer()
require.NoError(b, pipe.Start(storageClient))
logChan := emitter.OutChannel()
for i := 0; i < b.N; i++ {
entries := <-logChan
for _, e := range entries {
convert(e)
}
}
}
2 changes: 1 addition & 1 deletion processor/logstransformprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package logstransformprocessor // import "github.com/open-telemetry/opentelemetr
import (
"context"
"errors"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry"
"math"
"runtime"
"sync"
Expand All @@ -18,6 +17,7 @@ import (
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/adapter"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/pipeline"
Expand Down

0 comments on commit 602a01d

Please sign in to comment.