diff --git a/plugin/storage/badger/factory.go b/plugin/storage/badger/factory.go index b5665e0b176..6c0788864ae 100644 --- a/plugin/storage/badger/factory.go +++ b/plugin/storage/badger/factory.go @@ -29,8 +29,10 @@ import ( "github.com/jaegertracing/jaeger/pkg/metrics" "github.com/jaegertracing/jaeger/plugin" depStore "github.com/jaegertracing/jaeger/plugin/storage/badger/dependencystore" + badgerSampling "github.com/jaegertracing/jaeger/plugin/storage/badger/samplingstore" badgerStore "github.com/jaegertracing/jaeger/plugin/storage/badger/spanstore" "github.com/jaegertracing/jaeger/storage/dependencystore" + "github.com/jaegertracing/jaeger/storage/samplingstore" "github.com/jaegertracing/jaeger/storage/spanstore" ) @@ -170,6 +172,11 @@ func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) { return depStore.NewDependencyStore(sr), nil } +// CreateSamplingStore implements storage.SamplingStoreFactory +func (f *Factory) CreateSamplingStore(maxBuckets int) (samplingstore.Store, error) { + return badgerSampling.NewSamplingStore(f.store), nil +} + // Close Implements io.Closer and closes the underlying storage func (f *Factory) Close() error { close(f.maintenanceDone) diff --git a/plugin/storage/badger/samplingstore/storage.go b/plugin/storage/badger/samplingstore/storage.go new file mode 100644 index 00000000000..49d51ee3668 --- /dev/null +++ b/plugin/storage/badger/samplingstore/storage.go @@ -0,0 +1,262 @@ +// Copyright (c) 2023 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package samplingstore + +import ( + "bytes" + "encoding/binary" + "encoding/json" + "fmt" + "time" + + "github.com/dgraph-io/badger/v3" + + "github.com/jaegertracing/jaeger/cmd/collector/app/sampling/model" + jaegermodel "github.com/jaegertracing/jaeger/model" +) + +const ( + throughputKeyPrefix byte = 0x08 + probabilitiesKeyPrefix byte = 0x09 +) + +type SamplingStore struct { + store *badger.DB +} + +type ProbabilitiesAndQPS struct { + Hostname string + Probabilities model.ServiceOperationProbabilities + QPS model.ServiceOperationQPS +} + +func NewSamplingStore(db *badger.DB) *SamplingStore { + return &SamplingStore{ + store: db, + } +} + +func (s *SamplingStore) InsertThroughput(throughput []*model.Throughput) error { + startTime := jaegermodel.TimeAsEpochMicroseconds(time.Now()) + entriesToStore := make([]*badger.Entry, 0) + entries, err := s.createThroughputEntry(throughput, startTime) + if err != nil { + return err + } + entriesToStore = append(entriesToStore, entries) + err = s.store.Update(func(txn *badger.Txn) error { + for i := range entriesToStore { + err = txn.SetEntry(entriesToStore[i]) + if err != nil { + return err + } + } + + return nil + }) + + return nil +} + +func (s *SamplingStore) GetThroughput(start, end time.Time) ([]*model.Throughput, error) { + var retSlice []*model.Throughput + prefix := []byte{throughputKeyPrefix} + + err := s.store.View(func(txn *badger.Txn) error { + opts := badger.DefaultIteratorOptions + it := txn.NewIterator(opts) + defer it.Close() + + val := []byte{} + for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() { + item := it.Item() + k := item.Key() + startTime := k[1:9] + fmt.Printf("key=%s\n", k) + val, err := item.ValueCopy(val) + if err != nil { + return err + } + t, err := initalStartTime(startTime) + if err != nil { + return err + } + throughputs, err := decodeThroughtputValue(val) + if err != nil { + return err + } + + if t.After(start) && (t.Before(end) || t.Equal(end)) { + retSlice = append(retSlice, throughputs...) + } + } + return nil + }) + if err != nil { + return nil, err + } + + return retSlice, nil +} + +func (s *SamplingStore) InsertProbabilitiesAndQPS(hostname string, + probabilities model.ServiceOperationProbabilities, + qps model.ServiceOperationQPS, +) error { + startTime := jaegermodel.TimeAsEpochMicroseconds(time.Now()) + entriesToStore := make([]*badger.Entry, 0) + entries, err := s.createProbabilitiesEntry(hostname, probabilities, qps, startTime) + if err != nil { + return err + } + entriesToStore = append(entriesToStore, entries) + err = s.store.Update(func(txn *badger.Txn) error { + // Write the entries + for i := range entriesToStore { + err = txn.SetEntry(entriesToStore[i]) + if err != nil { + return err + } + } + + return nil + }) + + return nil +} + +// GetLatestProbabilities implements samplingstore.Reader#GetLatestProbabilities. +func (s *SamplingStore) GetLatestProbabilities() (model.ServiceOperationProbabilities, error) { + var retVal model.ServiceOperationProbabilities + var unMarshalProbabilities ProbabilitiesAndQPS + prefix := []byte{probabilitiesKeyPrefix} + + err := s.store.View(func(txn *badger.Txn) error { + opts := badger.DefaultIteratorOptions + it := txn.NewIterator(opts) + defer it.Close() + + val := []byte{} + for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() { + item := it.Item() + val, err := item.ValueCopy(val) + if err != nil { + return err + } + unMarshalProbabilities, err = decodeProbabilitiesValue(val) + if err != nil { + return err + } + retVal = unMarshalProbabilities.Probabilities + } + return nil + }) + if err != nil { + return nil, err + } + return retVal, nil +} + +func (s *SamplingStore) createProbabilitiesEntry(hostname string, probabilities model.ServiceOperationProbabilities, qps model.ServiceOperationQPS, startTime uint64) (*badger.Entry, error) { + pK, pV, err := s.createProbabilitiesKV(hostname, probabilities, qps, startTime) + if err != nil { + return nil, err + } + + e := s.createBadgerEntry(pK, pV) + + return e, nil +} + +func (s *SamplingStore) createProbabilitiesKV(hostname string, probabilities model.ServiceOperationProbabilities, qps model.ServiceOperationQPS, startTime uint64) ([]byte, []byte, error) { + key := make([]byte, 16) + key[0] = probabilitiesKeyPrefix + pos := 1 + binary.BigEndian.PutUint64(key[pos:], startTime) + + var bb []byte + var err error + val := ProbabilitiesAndQPS{ + Hostname: hostname, + Probabilities: probabilities, + QPS: qps, + } + bb, err = json.Marshal(val) + return key, bb, err +} + +func (s *SamplingStore) createThroughputEntry(throughput []*model.Throughput, startTime uint64) (*badger.Entry, error) { + pK, pV, err := s.createThroughputKV(throughput, startTime) + if err != nil { + return nil, err + } + + e := s.createBadgerEntry(pK, pV) + + return e, nil +} + +func (s *SamplingStore) createBadgerEntry(key []byte, value []byte) *badger.Entry { + return &badger.Entry{ + Key: key, + Value: value, + } +} + +func (s *SamplingStore) createThroughputKV(throughput []*model.Throughput, startTime uint64) ([]byte, []byte, error) { + key := make([]byte, 16) + key[0] = throughputKeyPrefix + pos := 1 + binary.BigEndian.PutUint64(key[pos:], startTime) + + var bb []byte + var err error + + bb, err = json.Marshal(throughput) + return key, bb, err +} + +func decodeThroughtputValue(val []byte) ([]*model.Throughput, error) { + var throughput []*model.Throughput + + err := json.Unmarshal(val, &throughput) + if err != nil { + return nil, err + } + return throughput, err +} + +func decodeProbabilitiesValue(val []byte) (ProbabilitiesAndQPS, error) { + var probabilities ProbabilitiesAndQPS + + err := json.Unmarshal(val, &probabilities) + if err != nil { + return ProbabilitiesAndQPS{}, err + } + return probabilities, nil +} + +func initalStartTime(timeBytes []byte) (time.Time, error) { + var usec int64 + + buf := bytes.NewReader(timeBytes) + + if err := binary.Read(buf, binary.BigEndian, &usec); err != nil { + panic(nil) + } + + t := time.UnixMicro(usec) + return t, nil +} diff --git a/plugin/storage/badger/samplingstore/storage_test.go b/plugin/storage/badger/samplingstore/storage_test.go new file mode 100644 index 00000000000..c13638b6c22 --- /dev/null +++ b/plugin/storage/badger/samplingstore/storage_test.go @@ -0,0 +1,140 @@ +// Copyright (c) 2023 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package samplingstore + +import ( + "encoding/json" + "testing" + "time" + + "github.com/dgraph-io/badger/v3" + "github.com/stretchr/testify/assert" + + samplemodel "github.com/jaegertracing/jaeger/cmd/collector/app/sampling/model" +) + +func newTestSamplingStore(db *badger.DB) *SamplingStore { + return NewSamplingStore(db) +} + +func TestInsertThroughput(t *testing.T) { + runWithBadger(t, func(t *testing.T, store *SamplingStore) { + throughputs := []*samplemodel.Throughput{ + {Service: "my-svc", Operation: "op"}, + {Service: "our-svc", Operation: "op2"}, + } + err := store.InsertThroughput(throughputs) + assert.NoError(t, err) + }) +} + +func TestGetThroughput(t *testing.T) { + runWithBadger(t, func(t *testing.T, store *SamplingStore) { + start := time.Now() + expected := []*samplemodel.Throughput{ + {Service: "my-svc", Operation: "op"}, + {Service: "our-svc", Operation: "op2"}, + } + err := store.InsertThroughput(expected) + assert.NoError(t, err) + + actual, err := store.GetThroughput(start, start.Add(time.Second*time.Duration(10))) + assert.NoError(t, err) + assert.Equal(t, expected, actual) + }) +} + +func TestInsertProbabilitiesAndQPS(t *testing.T) { + runWithBadger(t, func(t *testing.T, store *SamplingStore) { + err := store.InsertProbabilitiesAndQPS( + "dell11eg843d", + samplemodel.ServiceOperationProbabilities{"new-srv": {"op": 0.1}}, + samplemodel.ServiceOperationQPS{"new-srv": {"op": 4}}, + ) + assert.NoError(t, err) + }) +} + +func TestGetLatestProbabilities(t *testing.T) { + runWithBadger(t, func(t *testing.T, store *SamplingStore) { + err := store.InsertProbabilitiesAndQPS( + "dell11eg843d", + samplemodel.ServiceOperationProbabilities{"new-srv": {"op": 0.1}}, + samplemodel.ServiceOperationQPS{"new-srv": {"op": 4}}, + ) + assert.NoError(t, err) + err = store.InsertProbabilitiesAndQPS( + "newhostname", + samplemodel.ServiceOperationProbabilities{"new-srv2": {"op": 0.123}}, + samplemodel.ServiceOperationQPS{"new-srv2": {"op": 1}}, + ) + assert.NoError(t, err) + + expected := samplemodel.ServiceOperationProbabilities{"new-srv2": {"op": 0.123}} + actual, err := store.GetLatestProbabilities() + assert.NoError(t, err) + assert.Equal(t, expected, actual) + }) +} + +func TestDecodeProbabilitiesValue(t *testing.T) { + expected := ProbabilitiesAndQPS{ + Hostname: "dell11eg843d", + Probabilities: samplemodel.ServiceOperationProbabilities{"new-srv": {"op": 0.1}}, + QPS: samplemodel.ServiceOperationQPS{"new-srv": {"op": 4}}, + } + + marshalBytes, err := json.Marshal(expected) + assert.NoError(t, err) + // This should pass without error + actual, err := decodeProbabilitiesValue(marshalBytes) + assert.NoError(t, err) + assert.Equal(t, expected, actual) + + // Simulate data corruption by removing the first byte. + corruptedBytes := marshalBytes[1:] + _, err = decodeProbabilitiesValue(corruptedBytes) + assert.Error(t, err) // Expect an error +} + +func TestDecodeThroughtputValue(t *testing.T) { + expected := []*samplemodel.Throughput{ + {Service: "my-svc", Operation: "op"}, + {Service: "our-svc", Operation: "op2"}, + } + + marshalBytes, err := json.Marshal(expected) + assert.NoError(t, err) + acrual, err := decodeThroughtputValue(marshalBytes) + assert.NoError(t, err) + assert.Equal(t, expected, acrual) +} + +func runWithBadger(t *testing.T, test func(t *testing.T, store *SamplingStore)) { + opts := badger.DefaultOptions("") + + opts.SyncWrites = false + dir := t.TempDir() + opts.Dir = dir + opts.ValueDir = dir + + store, err := badger.Open(opts) + assert.NoError(t, err) + defer func() { + assert.NoError(t, store.Close()) + }() + ss := newTestSamplingStore(store) + test(t, ss) +} diff --git a/plugin/storage/integration/badgerstore_test.go b/plugin/storage/integration/badgerstore_test.go index 778aea2af43..233f4e0b8e4 100644 --- a/plugin/storage/integration/badgerstore_test.go +++ b/plugin/storage/integration/badgerstore_test.go @@ -49,6 +49,9 @@ func (s *BadgerIntegrationStorage) initialize() error { if err != nil { return err } + if s.SamplingStore, err = s.factory.CreateSamplingStore(0); err != nil { + return err + } s.SpanReader = sr s.SpanWriter = sw diff --git a/plugin/storage/integration/integration.go b/plugin/storage/integration/integration.go index afb743506c9..9f1cf163376 100644 --- a/plugin/storage/integration/integration.go +++ b/plugin/storage/integration/integration.go @@ -452,6 +452,7 @@ func (s *StorageIntegration) testGetLatestProbability(t *testing.T) { } defer s.cleanUp(t) + s.SamplingStore.InsertProbabilitiesAndQPS("newhostname1", samplemodel.ServiceOperationProbabilities{"new-srv3": {"op": 0.123}}, samplemodel.ServiceOperationQPS{"new-srv2": {"op": 11}}) s.SamplingStore.InsertProbabilitiesAndQPS("dell11eg843d", samplemodel.ServiceOperationProbabilities{"new-srv": {"op": 0.1}}, samplemodel.ServiceOperationQPS{"new-srv": {"op": 4}}) expected := samplemodel.ServiceOperationProbabilities{"new-srv": {"op": 0.1}}