Skip to content

Commit

Permalink
pkg: Create generic AttributesStore
Browse files Browse the repository at this point in the history
Rather than creating many implementations of the AttributesStore per type, let's use the new Go generic types.

Starting to use this code as a library I was wondering if this was possible. Turns out it is.
  • Loading branch information
metalmatze committed Nov 28, 2024
1 parent 4ad461c commit 13a70a5
Show file tree
Hide file tree
Showing 14 changed files with 140 additions and 249 deletions.
56 changes: 56 additions & 0 deletions pkg/arrow/from_record.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,69 @@ package arrow
// Utility functions to extract values from Arrow Records.

import (
"fmt"

"github.com/apache/arrow/go/v17/arrow"
"github.com/apache/arrow/go/v17/arrow/array"
"golang.org/x/exp/constraints"

"github.com/open-telemetry/otel-arrow/pkg/otel/common"
"github.com/open-telemetry/otel-arrow/pkg/werror"
)

// UnsignedFromRecord returns the unsigned value for a specific row and column in an
// Arrow record. If the value is null, it returns 0.
func UnsignedFromRecord[unsigned constraints.Unsigned](record arrow.Record, fieldID int, row int) (unsigned, error) {
if fieldID == AbsentFieldID {
return 0, nil
}

arr := record.Column(fieldID)
if arr == nil {
return 0, nil
}

switch arr := arr.(type) {
case *array.Dictionary:
switch dict := arr.Dictionary().(type) {
case *array.Uint32:
if arr.IsNull(row) {
return 0, nil
} else {
return unsigned(dict.Value(arr.GetValueIndex(row))), nil
}
default:
return 0, werror.WrapWithMsg(ErrInvalidArrayType, fmt.Sprintf("dictionary of type %T is unsupported", dict))
}
case *array.Uint8:
if arr.IsNull(row) {
return 0, nil
} else {
return unsigned(arr.Value(row)), nil
}
case *array.Uint16:
if arr.IsNull(row) {
return 0, nil
} else {
return unsigned(arr.Value(row)), nil
}
case *array.Uint32:
if arr.IsNull(row) {
return 0, nil
} else {
return unsigned(arr.Value(row)), nil
}
case *array.Uint64:
if arr.IsNull(row) {
return 0, nil
} else {
return unsigned(arr.Value(row)), nil
}
default:
return 0, werror.WrapWithMsg(ErrInvalidArrayType, fmt.Sprintf("array of type %T is unsupported", arr))
}
}

// U8FromRecord returns the uint8 value for a specific row and column in an
// Arrow record. If the value is null, it returns 0.
func U8FromRecord(record arrow.Record, fieldID int, row int) (uint8, error) {
Expand Down
209 changes: 22 additions & 187 deletions pkg/otel/common/otlp/attributes.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package otlp
import (
"github.com/apache/arrow/go/v17/arrow"
"go.opentelemetry.io/collector/pdata/pcommon"
"golang.org/x/exp/constraints"

arrowutils "github.com/open-telemetry/otel-arrow/pkg/arrow"
"github.com/open-telemetry/otel-arrow/pkg/otel/common"
Expand All @@ -45,57 +46,33 @@ type (
Ser int
}

// Attributes16Store is a store for attributes.
// AttributesStore is a store for attributes.
// The attributes are stored in a map by ID. This ID represents the
// identifier of the main entity (span, event, link, etc.) to which the
// attributes are attached. So the maximum number of attributes per entity
// is not limited.
Attributes16Store struct {
lastID uint16
attributesByID map[uint16]*pcommon.Map
AttributesStore[u constraints.Unsigned] struct {
lastID u
attributesByID map[u]*pcommon.Map
}

// Attributes32Store is a store for attributes.
// The attributes are stored in a map by ID. This ID represents the
// identifier of the main entity (span, event, link, etc.) to which the
// attributes are attached. So the maximum number of attributes per entity
// is not limited.
Attributes32Store struct {
lastID uint32
attributesByID map[uint32]*pcommon.Map
}

Attrs16ParentIdDecoder struct {
prevParentID uint16
prevKey string
prevValue *pcommon.Value
encodingType int
}

Attrs32ParentIdDecoder struct {
prevParentID uint32
AttrsParentIDDecoder[u constraints.Unsigned] struct {
prevParentID u
prevKey string
prevValue *pcommon.Value
encodingType int
}
)

// NewAttributes16Store creates a new Attributes16Store.
func NewAttributes16Store() *Attributes16Store {
return &Attributes16Store{
attributesByID: make(map[uint16]*pcommon.Map),
}
}

// NewAttributes32Store creates a new Attributes32Store.
func NewAttributes32Store() *Attributes32Store {
return &Attributes32Store{
attributesByID: make(map[uint32]*pcommon.Map),
// NewAttributesStore creates a new AttributesStore.
func NewAttributesStore[u constraints.Unsigned]() *AttributesStore[u] {
return &AttributesStore[u]{
attributesByID: make(map[u]*pcommon.Map),
}
}

// AttributesByDeltaID returns the attributes for the given Delta ID.
func (s *Attributes16Store) AttributesByDeltaID(ID uint16) *pcommon.Map {
func (s *AttributesStore[u]) AttributesByDeltaID(ID u) *pcommon.Map {
s.lastID += ID
if m, ok := s.attributesByID[s.lastID]; ok {
return m
Expand All @@ -104,42 +81,25 @@ func (s *Attributes16Store) AttributesByDeltaID(ID uint16) *pcommon.Map {
}

// AttributesByID returns the attributes for the given ID.
func (s *Attributes16Store) AttributesByID(ID uint16) *pcommon.Map {
func (s *AttributesStore[u]) AttributesByID(ID u) *pcommon.Map {
if m, ok := s.attributesByID[ID]; ok {
return m
}
return nil
}

// AttributesByID returns the attributes for the given ID.
func (s *Attributes32Store) AttributesByID(ID uint32) *pcommon.Map {
if m, ok := s.attributesByID[ID]; ok {
return m
}
return nil
}

// AttributesByDeltaID returns the attributes for the given Delta ID.
func (s *Attributes32Store) AttributesByDeltaID(ID uint32) *pcommon.Map {
s.lastID += ID
if m, ok := s.attributesByID[s.lastID]; ok {
return m
}
return nil
}

// Attributes16StoreFrom creates an Attribute16Store from an arrow.Record.
// AttributesStoreFrom creates an Attribute16Store from an arrow.Record.
// Note: This function doesn't release the record passed as argument. This is
// the responsibility of the caller
func Attributes16StoreFrom(record arrow.Record, store *Attributes16Store) error {
func AttributesStoreFrom[unsigned constraints.Unsigned](record arrow.Record, store *AttributesStore[unsigned]) error {
attrIDS, err := SchemaToAttributeIDs(record.Schema())
if err != nil {
return werror.Wrap(err)
}

attrsCount := int(record.NumRows())

parentIdDecoder := NewAttrs16ParentIdDecoder()
parentIDDecoder := NewAttrsParentIDDecoder[unsigned]()

// Read all key/value tuples from the record and reconstruct the attributes
// map by ID.
Expand All @@ -153,7 +113,6 @@ func Attributes16StoreFrom(record arrow.Record, store *Attributes16Store) error
if err != nil {
return werror.Wrap(err)
}

value := pcommon.NewValueEmpty()
switch pcommon.ValueType(vType) {
case pcommon.ValueTypeStr:
Expand Down Expand Up @@ -206,106 +165,12 @@ func Attributes16StoreFrom(record arrow.Record, store *Attributes16Store) error
// silently ignore unknown types to avoid DOS attacks
}

deltaOrParentID, err := arrowutils.U16FromRecord(record, attrIDS.ParentID, i)
deltaOrParentID, err := arrowutils.UnsignedFromRecord[unsigned](record, attrIDS.ParentID, i)
if err != nil {
return werror.Wrap(err)
}
parentID := parentIdDecoder.Decode(deltaOrParentID, key, &value)

m, ok := store.attributesByID[parentID]
if !ok {
newMap := pcommon.NewMap()
m = &newMap
store.attributesByID[parentID] = m
}
value.CopyTo(m.PutEmpty(key))
}

return nil
}

// Attributes32StoreFrom creates an Attributes32Store from an arrow.Record.
// Note: This function doesn't release the record passed as argument. This is
// the responsibility of the caller
func Attributes32StoreFrom(record arrow.Record, store *Attributes32Store) error {
attrIDS, err := SchemaToAttributeIDs(record.Schema())
if err != nil {
return werror.Wrap(err)
}

attrsCount := int(record.NumRows())

parentIdDecoder := NewAttrs32ParentIdDecoder()

// Read all key/value tuples from the record and reconstruct the attributes
// map by ID.
for i := 0; i < attrsCount; i++ {
key, err := arrowutils.StringFromRecord(record, attrIDS.Key, i)
if err != nil {
return werror.Wrap(err)
}

vType, err := arrowutils.U8FromRecord(record, attrIDS.Type, i)
if err != nil {
return werror.Wrap(err)
}
value := pcommon.NewValueEmpty()
switch pcommon.ValueType(vType) {
case pcommon.ValueTypeStr:
v, err := arrowutils.StringFromRecord(record, attrIDS.Str, i)
if err != nil {
return werror.Wrap(err)
}
value.SetStr(v)
case pcommon.ValueTypeInt:
v, err := arrowutils.I64FromRecord(record, attrIDS.Int, i)
if err != nil {
return werror.Wrap(err)
}
value.SetInt(v)
case pcommon.ValueTypeDouble:
v, err := arrowutils.F64FromRecord(record, attrIDS.Double, i)
if err != nil {
return werror.Wrap(err)
}
value.SetDouble(v)
case pcommon.ValueTypeBool:
v, err := arrowutils.BoolFromRecord(record, attrIDS.Bool, i)
if err != nil {
return werror.Wrap(err)
}
value.SetBool(v)
case pcommon.ValueTypeBytes:
v, err := arrowutils.BinaryFromRecord(record, attrIDS.Bytes, i)
if err != nil {
return werror.Wrap(err)
}
value.SetEmptyBytes().FromRaw(v)
case pcommon.ValueTypeSlice:
v, err := arrowutils.BinaryFromRecord(record, attrIDS.Ser, i)
if err != nil {
return werror.Wrap(err)
}
if err = common.Deserialize(v, value); err != nil {
return werror.Wrap(err)
}
case pcommon.ValueTypeMap:
v, err := arrowutils.BinaryFromRecord(record, attrIDS.Ser, i)
if err != nil {
return werror.Wrap(err)
}
if err = common.Deserialize(v, value); err != nil {
return werror.Wrap(err)
}
default:
// silently ignore unknown types to avoid DOS attacks
}

deltaOrParentID, err := arrowutils.U32FromRecord(record, attrIDS.ParentID, i)
if err != nil {
return werror.Wrap(err)
}
parentID := parentIdDecoder.Decode(deltaOrParentID, key, &value)
parentID := parentIDDecoder.Decode(deltaOrParentID, key, &value)

m, ok := store.attributesByID[parentID]
if !ok {
Expand Down Expand Up @@ -380,43 +245,13 @@ func SchemaToAttributeIDs(schema *arrow.Schema) (*AttributeIDs, error) {
}, nil
}

func NewAttrs16ParentIdDecoder() *Attrs16ParentIdDecoder {
return &Attrs16ParentIdDecoder{
encodingType: carrow.ParentIdDeltaGroupEncoding,
}
}

func (d *Attrs16ParentIdDecoder) Decode(deltaOrParentID uint16, key string, value *pcommon.Value) uint16 {
switch d.encodingType {
case carrow.ParentIdNoEncoding:
return deltaOrParentID
case carrow.ParentIdDeltaEncoding:
decodedParentID := d.prevParentID + deltaOrParentID
d.prevParentID = decodedParentID
return decodedParentID
case carrow.ParentIdDeltaGroupEncoding:
if d.prevKey == key && carrow.Equal(d.prevValue, value) {
parentID := d.prevParentID + deltaOrParentID
d.prevParentID = parentID
return parentID
} else {
d.prevKey = key
d.prevValue = value
d.prevParentID = deltaOrParentID
return deltaOrParentID
}
default:
panic("unknown attrs16 parent ID encoding type")
}
}

func NewAttrs32ParentIdDecoder() *Attrs32ParentIdDecoder {
return &Attrs32ParentIdDecoder{
func NewAttrsParentIDDecoder[u constraints.Unsigned]() *AttrsParentIDDecoder[u] {
return &AttrsParentIDDecoder[u]{
encodingType: carrow.ParentIdDeltaGroupEncoding,
}
}

func (d *Attrs32ParentIdDecoder) Decode(deltaOrParentID uint32, key string, value *pcommon.Value) uint32 {
func (d *AttrsParentIDDecoder[unsigned]) Decode(deltaOrParentID unsigned, key string, value *pcommon.Value) unsigned {
switch d.encodingType {
case carrow.ParentIdNoEncoding:
return deltaOrParentID
Expand All @@ -436,6 +271,6 @@ func (d *Attrs32ParentIdDecoder) Decode(deltaOrParentID uint32, key string, valu
return deltaOrParentID
}
default:
panic("unknown attrs32 parent ID encoding type")
panic("unknown parent ID encoding type")
}
}
2 changes: 1 addition & 1 deletion pkg/otel/common/otlp/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func NewResourceIdsFromSchema(schema *arrow.Schema) (*ResourceIds, error) {
}, nil
}

func UpdateResourceFromRecord(r pcommon.Resource, record arrow.Record, row int, resIds *ResourceIds, attrsStore *Attributes16Store) (schemaUrl string, err error) {
func UpdateResourceFromRecord(r pcommon.Resource, record arrow.Record, row int, resIds *ResourceIds, attrsStore *AttributesStore[uint16]) (schemaUrl string, err error) {
resArr, err := arrowutils.StructFromRecord(record, resIds.Resource, row)
if err != nil {
return "", werror.WrapWithContext(err, map[string]interface{}{"row": row})
Expand Down
2 changes: 1 addition & 1 deletion pkg/otel/common/otlp/scope.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func UpdateScopeFromRecord(
record arrow.Record,
row int,
ids *ScopeIds,
attrsStore *Attributes16Store,
attrsStore *AttributesStore[uint16],
) error {
scopeArray, err := arrowutils.StructFromRecord(record, ids.Scope, row)
if err != nil {
Expand Down
Loading

0 comments on commit 13a70a5

Please sign in to comment.