Skip to content

Commit

Permalink
feat: Add sampling store support to Badger (#4834)
Browse files Browse the repository at this point in the history
## Which problem is this PR solving?
Related  #3305

## Description of the changes
-   Implemented badger db for sampling store

## How was this change tested?
- Added Unit test and also tested it with the already Implemented
integration test

## Checklist
- [x] I have read
https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md
- [x] I have signed all commits
- [x] I have added unit tests for the new functionality
- [x] I have run lint and test steps successfully
  - for `jaeger`: `make lint test`
  - for `jaeger-ui`: `yarn lint` and `yarn test`

---------

Signed-off-by: slayer321 <sachin.maurya7666@gmail.com>
  • Loading branch information
slayer321 authored Oct 26, 2023
1 parent 0e3be02 commit f99eae5
Show file tree
Hide file tree
Showing 5 changed files with 413 additions and 0 deletions.
7 changes: 7 additions & 0 deletions plugin/storage/badger/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@ import (
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/plugin"
depStore "github.com/jaegertracing/jaeger/plugin/storage/badger/dependencystore"
badgerSampling "github.com/jaegertracing/jaeger/plugin/storage/badger/samplingstore"
badgerStore "github.com/jaegertracing/jaeger/plugin/storage/badger/spanstore"
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage/samplingstore"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

Expand Down Expand Up @@ -170,6 +172,11 @@ func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) {
return depStore.NewDependencyStore(sr), nil
}

// CreateSamplingStore implements storage.SamplingStoreFactory
func (f *Factory) CreateSamplingStore(maxBuckets int) (samplingstore.Store, error) {
return badgerSampling.NewSamplingStore(f.store), nil
}

// Close Implements io.Closer and closes the underlying storage
func (f *Factory) Close() error {
close(f.maintenanceDone)
Expand Down
262 changes: 262 additions & 0 deletions plugin/storage/badger/samplingstore/storage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,262 @@
// Copyright (c) 2023 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package samplingstore

import (
"bytes"
"encoding/binary"
"encoding/json"
"fmt"
"time"

"github.com/dgraph-io/badger/v3"

"github.com/jaegertracing/jaeger/cmd/collector/app/sampling/model"
jaegermodel "github.com/jaegertracing/jaeger/model"
)

const (
throughputKeyPrefix byte = 0x08
probabilitiesKeyPrefix byte = 0x09
)

type SamplingStore struct {
store *badger.DB
}

type ProbabilitiesAndQPS struct {
Hostname string
Probabilities model.ServiceOperationProbabilities
QPS model.ServiceOperationQPS
}

func NewSamplingStore(db *badger.DB) *SamplingStore {
return &SamplingStore{
store: db,
}
}

func (s *SamplingStore) InsertThroughput(throughput []*model.Throughput) error {
startTime := jaegermodel.TimeAsEpochMicroseconds(time.Now())
entriesToStore := make([]*badger.Entry, 0)
entries, err := s.createThroughputEntry(throughput, startTime)
if err != nil {
return err
}
entriesToStore = append(entriesToStore, entries)
err = s.store.Update(func(txn *badger.Txn) error {
for i := range entriesToStore {
err = txn.SetEntry(entriesToStore[i])
if err != nil {
return err
}
}

return nil
})

return nil
}

func (s *SamplingStore) GetThroughput(start, end time.Time) ([]*model.Throughput, error) {
var retSlice []*model.Throughput
prefix := []byte{throughputKeyPrefix}

err := s.store.View(func(txn *badger.Txn) error {
opts := badger.DefaultIteratorOptions
it := txn.NewIterator(opts)
defer it.Close()

val := []byte{}
for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() {
item := it.Item()
k := item.Key()
startTime := k[1:9]
fmt.Printf("key=%s\n", k)
val, err := item.ValueCopy(val)
if err != nil {
return err
}
t, err := initalStartTime(startTime)
if err != nil {
return err
}
throughputs, err := decodeThroughtputValue(val)
if err != nil {
return err
}

if t.After(start) && (t.Before(end) || t.Equal(end)) {
retSlice = append(retSlice, throughputs...)
}
}
return nil
})
if err != nil {
return nil, err
}

return retSlice, nil
}

func (s *SamplingStore) InsertProbabilitiesAndQPS(hostname string,
probabilities model.ServiceOperationProbabilities,
qps model.ServiceOperationQPS,
) error {
startTime := jaegermodel.TimeAsEpochMicroseconds(time.Now())
entriesToStore := make([]*badger.Entry, 0)
entries, err := s.createProbabilitiesEntry(hostname, probabilities, qps, startTime)
if err != nil {
return err
}
entriesToStore = append(entriesToStore, entries)
err = s.store.Update(func(txn *badger.Txn) error {
// Write the entries
for i := range entriesToStore {
err = txn.SetEntry(entriesToStore[i])
if err != nil {
return err
}
}

return nil
})

return nil
}

// GetLatestProbabilities implements samplingstore.Reader#GetLatestProbabilities.
func (s *SamplingStore) GetLatestProbabilities() (model.ServiceOperationProbabilities, error) {
var retVal model.ServiceOperationProbabilities
var unMarshalProbabilities ProbabilitiesAndQPS
prefix := []byte{probabilitiesKeyPrefix}

err := s.store.View(func(txn *badger.Txn) error {
opts := badger.DefaultIteratorOptions
it := txn.NewIterator(opts)
defer it.Close()

val := []byte{}
for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() {
item := it.Item()
val, err := item.ValueCopy(val)
if err != nil {
return err
}
unMarshalProbabilities, err = decodeProbabilitiesValue(val)
if err != nil {
return err
}
retVal = unMarshalProbabilities.Probabilities
}
return nil
})
if err != nil {
return nil, err
}
return retVal, nil
}

func (s *SamplingStore) createProbabilitiesEntry(hostname string, probabilities model.ServiceOperationProbabilities, qps model.ServiceOperationQPS, startTime uint64) (*badger.Entry, error) {
pK, pV, err := s.createProbabilitiesKV(hostname, probabilities, qps, startTime)
if err != nil {
return nil, err
}

e := s.createBadgerEntry(pK, pV)

return e, nil
}

func (s *SamplingStore) createProbabilitiesKV(hostname string, probabilities model.ServiceOperationProbabilities, qps model.ServiceOperationQPS, startTime uint64) ([]byte, []byte, error) {
key := make([]byte, 16)
key[0] = probabilitiesKeyPrefix
pos := 1
binary.BigEndian.PutUint64(key[pos:], startTime)

var bb []byte
var err error
val := ProbabilitiesAndQPS{
Hostname: hostname,
Probabilities: probabilities,
QPS: qps,
}
bb, err = json.Marshal(val)
return key, bb, err
}

func (s *SamplingStore) createThroughputEntry(throughput []*model.Throughput, startTime uint64) (*badger.Entry, error) {
pK, pV, err := s.createThroughputKV(throughput, startTime)
if err != nil {
return nil, err
}

e := s.createBadgerEntry(pK, pV)

return e, nil
}

func (s *SamplingStore) createBadgerEntry(key []byte, value []byte) *badger.Entry {
return &badger.Entry{
Key: key,
Value: value,
}
}

func (s *SamplingStore) createThroughputKV(throughput []*model.Throughput, startTime uint64) ([]byte, []byte, error) {
key := make([]byte, 16)
key[0] = throughputKeyPrefix
pos := 1
binary.BigEndian.PutUint64(key[pos:], startTime)

var bb []byte
var err error

bb, err = json.Marshal(throughput)
return key, bb, err
}

func decodeThroughtputValue(val []byte) ([]*model.Throughput, error) {
var throughput []*model.Throughput

err := json.Unmarshal(val, &throughput)
if err != nil {
return nil, err
}
return throughput, err
}

func decodeProbabilitiesValue(val []byte) (ProbabilitiesAndQPS, error) {
var probabilities ProbabilitiesAndQPS

err := json.Unmarshal(val, &probabilities)
if err != nil {
return ProbabilitiesAndQPS{}, err
}
return probabilities, nil
}

func initalStartTime(timeBytes []byte) (time.Time, error) {
var usec int64

buf := bytes.NewReader(timeBytes)

if err := binary.Read(buf, binary.BigEndian, &usec); err != nil {
panic(nil)
}

t := time.UnixMicro(usec)
return t, nil
}
Loading

0 comments on commit f99eae5

Please sign in to comment.