Skip to content

Commit

Permalink
[receiver:awsfirehosereceiver] merge main
Browse files Browse the repository at this point in the history
  • Loading branch information
vihangvk committed Oct 7, 2024
1 parent 1963945 commit fa0ec51
Show file tree
Hide file tree
Showing 6 changed files with 235 additions and 2 deletions.
27 changes: 27 additions & 0 deletions .chloggen/awsfirehosereceiver_otlp_support.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: awsfirehosereceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: added OTLP v1 support to Firehose receiver

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [34982]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
3 changes: 3 additions & 0 deletions receiver/awsfirehosereceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,6 @@ See [documentation](https://docs.aws.amazon.com/firehose/latest/dev/create-desti
The record type for the CloudWatch metric stream. Expects the format for the records to be JSON.
See [documentation](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-Metric-Streams.html) for details.

### otlp_v1
The OTLP v1 format as produced by CloudWatch metric streams.
See [documentation](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-metric-streams-formats-opentelemetry-100.html) for details.
6 changes: 5 additions & 1 deletion receiver/awsfirehosereceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/metadata"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler/otlpmetricstream"
)

const (
Expand All @@ -28,7 +29,8 @@ const (
var (
errUnrecognizedRecordType = errors.New("unrecognized record type")
availableRecordTypes = map[string]bool{
cwmetricstream.TypeStr: true,
cwmetricstream.TypeStr: true,
otlpmetricstream.TypeStr: true,
}
)

Expand All @@ -54,8 +56,10 @@ func validateRecordType(recordType string) error {
// unmarshalers.
func defaultMetricsUnmarshalers(logger *zap.Logger) map[string]unmarshaler.MetricsUnmarshaler {
cwmsu := cwmetricstream.NewUnmarshaler(logger)
omsu := otlpmetricstream.NewUnmarshaler(logger)
return map[string]unmarshaler.MetricsUnmarshaler{
cwmsu.Type(): cwmsu,
omsu.Type(): omsu,
}
}

Expand Down
2 changes: 1 addition & 1 deletion receiver/awsfirehosereceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfir
go 1.22.0

require (
github.com/gogo/protobuf v1.3.2
github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.111.0
github.com/stretchr/testify v1.9.0
go.opentelemetry.io/collector/component v0.111.0
Expand All @@ -27,7 +28,6 @@ require (
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-viper/mapstructure/v2 v2.1.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/hashicorp/go-version v1.7.0 // indirect
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package otlpmetricstream // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler/otlpmetricstream"

import (
"errors"

"github.com/gogo/protobuf/proto"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler"
)

const (
// Supported version depends on version of go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp dependency
TypeStr = "otlp_v1"
)

var (
errInvalidOTLPFormatStart = errors.New("unable to decode data length from message")
)

// Unmarshaler for the CloudWatch Metric Stream OpenTelemetry record format.
//
// More details can be found at:
// https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-metric-streams-formats-opentelemetry-100.html
type Unmarshaler struct {
logger *zap.Logger
}

var _ unmarshaler.MetricsUnmarshaler = (*Unmarshaler)(nil)

// NewUnmarshaler creates a new instance of the Unmarshaler.
func NewUnmarshaler(logger *zap.Logger) *Unmarshaler {
return &Unmarshaler{logger}
}

// Unmarshal deserializes the records into pmetric.Metrics
func (u Unmarshaler) Unmarshal(records [][]byte) (pmetric.Metrics, error) {
md := pmetric.NewMetrics()
for recordIndex, record := range records {
var dataLen, pos = len(record), 0
for pos < dataLen {
n, nLen := proto.DecodeVarint(record)
if nLen == 0 && n == 0 {
return md, errInvalidOTLPFormatStart
}
req := pmetricotlp.NewExportRequest()
pos += nLen
err := req.UnmarshalProto(record[pos : pos+int(n)])
pos += int(n)
if err != nil {
u.logger.Error(
"Unable to unmarshal input",
zap.Error(err),
zap.Int("record_index", recordIndex),
)
continue
}
req.Metrics().ResourceMetrics().MoveAndAppendTo(md.ResourceMetrics())
}
}

return md, nil
}

// Type of the serialized messages.
func (u Unmarshaler) Type() string {
return TypeStr
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package otlpmetricstream

import (
"testing"
"time"

"github.com/gogo/protobuf/proto"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp"
"go.uber.org/zap"
)

func TestType(t *testing.T) {
unmarshaler := NewUnmarshaler(zap.NewNop())
require.Equal(t, TypeStr, unmarshaler.Type())
}

func createMetricRecord() []byte {
var er = pmetricotlp.NewExportRequest()
var rsm = er.Metrics().ResourceMetrics().AppendEmpty()
var sm = rsm.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty()
sm.SetName("TestMetric")
var dp = sm.SetEmptySummary().DataPoints().AppendEmpty()
dp.SetCount(1)
dp.SetSum(1)
qv := dp.QuantileValues()
min := qv.AppendEmpty()
min.SetQuantile(0)
min.SetValue(0)
max := qv.AppendEmpty()
max.SetQuantile(1)
max.SetValue(1)
dp.SetTimestamp(pcommon.NewTimestampFromTime(time.Now()))

temp, _ := er.MarshalProto()
var record = proto.EncodeVarint(uint64(len(temp)))
record = append(record, temp...)
return record
}

func TestUnmarshal(t *testing.T) {
unmarshaler := NewUnmarshaler(zap.NewNop())
testCases := map[string]struct {
records [][]byte
wantResourceCount int
wantMetricCount int
wantDatapointCount int
wantErr error
}{
"WithSingleRecord": {
records: [][]byte{
createMetricRecord(),
},
wantResourceCount: 1,
wantMetricCount: 1,
wantDatapointCount: 1,
},
"WithMultipleRecords": {
records: [][]byte{
createMetricRecord(),
createMetricRecord(),
createMetricRecord(),
createMetricRecord(),
createMetricRecord(),
createMetricRecord(),
},
wantResourceCount: 6,
wantMetricCount: 6,
wantDatapointCount: 6,
},
"WithEmptyRecord": {
records: make([][]byte, 0),
wantResourceCount: 0,
wantMetricCount: 0,
wantDatapointCount: 0,
},
"WithInvalidRecords": {
records: [][]byte{{1, 2}},
wantResourceCount: 0,
wantMetricCount: 0,
wantDatapointCount: 0,
},
"WithSomeInvalidRecords": {
records: [][]byte{
createMetricRecord(),
{1, 2},
createMetricRecord(),
},
wantResourceCount: 2,
wantMetricCount: 2,
wantDatapointCount: 2,
},
}
for name, testCase := range testCases {
t.Run(name, func(t *testing.T) {

got, err := unmarshaler.Unmarshal(testCase.records)
if testCase.wantErr != nil {
require.Error(t, err)
require.Equal(t, testCase.wantErr, err)
} else {
require.NoError(t, err)
require.NotNil(t, got)
require.Equal(t, testCase.wantResourceCount, got.ResourceMetrics().Len())
gotMetricCount := 0
gotDatapointCount := 0
for i := 0; i < got.ResourceMetrics().Len(); i++ {
rm := got.ResourceMetrics().At(i)
require.Equal(t, 1, rm.ScopeMetrics().Len())
ilm := rm.ScopeMetrics().At(0)
gotMetricCount += ilm.Metrics().Len()
for j := 0; j < ilm.Metrics().Len(); j++ {
metric := ilm.Metrics().At(j)
gotDatapointCount += metric.Summary().DataPoints().Len()
}
}
require.Equal(t, testCase.wantMetricCount, gotMetricCount)
require.Equal(t, testCase.wantDatapointCount, gotDatapointCount)
}
})
}
}

0 comments on commit fa0ec51

Please sign in to comment.