From 0ad5d671405ae7e98d61cd68ea4d45d654ef0509 Mon Sep 17 00:00:00 2001 From: Raj Kamal Singh <1133322+rkssisodiya@users.noreply.github.com> Date: Thu, 5 Oct 2023 14:27:41 +0530 Subject: [PATCH] QS: Collector simulator (#3656) * feat: get collectorsimulator started and add inmemoryreceiver * feat: add collectorsimulator/inmemoryexporter * feat: add collectorsimulator.SimulateLogsProcessing * chore: clean up collector simulator code a little * chore: update go.sum entries for cors * chore: add collectorsimulator tests to make cmd * chore: move to latest dependency version for collectorsimulator * chore: revert to dependency versions matching signoz-otel-col * chore: cleanup: reorganize collectorsimulator logic * chore: some more cleanup * chore: some more cleanup * chore: some more cleanup * chore: redo go.mod --- Makefile | 1 + go.mod | 38 ++- go.sum | 90 +++++++ .../collectorsimulator/collectorsimulator.go | 234 ++++++++++++++++++ .../inmemoryexporter/config.go | 16 ++ .../inmemoryexporter/config_test.go | 48 ++++ .../inmemoryexporter/exporter.go | 86 +++++++ .../inmemoryexporter/exporter_test.go | 67 +++++ .../inmemoryexporter/factory.go | 34 +++ .../inmemoryexporter/factory_test.go | 28 +++ .../inmemoryreceiver/config.go | 16 ++ .../inmemoryreceiver/config_test.go | 48 ++++ .../inmemoryreceiver/factory.go | 41 +++ .../inmemoryreceiver/factory_test.go | 29 +++ .../inmemoryreceiver/receiver.go | 64 +++++ .../inmemoryreceiver/receiver_test.go | 68 +++++ pkg/query-service/collectorsimulator/logs.go | 122 +++++++++ .../collectorsimulator/logs_test.go | 113 +++++++++ 18 files changed, 1139 insertions(+), 4 deletions(-) create mode 100644 pkg/query-service/collectorsimulator/collectorsimulator.go create mode 100644 pkg/query-service/collectorsimulator/inmemoryexporter/config.go create mode 100644 pkg/query-service/collectorsimulator/inmemoryexporter/config_test.go create mode 100644 pkg/query-service/collectorsimulator/inmemoryexporter/exporter.go create mode 100644 pkg/query-service/collectorsimulator/inmemoryexporter/exporter_test.go create mode 100644 pkg/query-service/collectorsimulator/inmemoryexporter/factory.go create mode 100644 pkg/query-service/collectorsimulator/inmemoryexporter/factory_test.go create mode 100644 pkg/query-service/collectorsimulator/inmemoryreceiver/config.go create mode 100644 pkg/query-service/collectorsimulator/inmemoryreceiver/config_test.go create mode 100644 pkg/query-service/collectorsimulator/inmemoryreceiver/factory.go create mode 100644 pkg/query-service/collectorsimulator/inmemoryreceiver/factory_test.go create mode 100644 pkg/query-service/collectorsimulator/inmemoryreceiver/receiver.go create mode 100644 pkg/query-service/collectorsimulator/inmemoryreceiver/receiver_test.go create mode 100644 pkg/query-service/collectorsimulator/logs.go create mode 100644 pkg/query-service/collectorsimulator/logs_test.go diff --git a/Makefile b/Makefile index d4df55c063..0d46bfa161 100644 --- a/Makefile +++ b/Makefile @@ -165,3 +165,4 @@ test: go test ./pkg/query-service/formatter/... go test ./pkg/query-service/tests/integration/... go test ./pkg/query-service/rules/... + go test ./pkg/query-service/collectorsimulator/... diff --git a/go.mod b/go.mod index 2402e92498..9d22d922c6 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/SigNoz/govaluate v0.0.0-20220522085550-d19c08c206cb github.com/SigNoz/zap_otlp/zap_otlp_encoder v0.0.0-20230822164844-1b861a431974 github.com/SigNoz/zap_otlp/zap_otlp_sync v0.0.0-20230822164844-1b861a431974 - github.com/antonmedv/expr v1.12.4 + github.com/antonmedv/expr v1.12.5 github.com/auth0/go-jwt-middleware v1.0.1 github.com/cespare/xxhash v1.1.0 github.com/coreos/go-oidc/v3 v3.4.0 @@ -37,7 +37,7 @@ require ( github.com/posthog/posthog-go v0.0.0-20220817142604-0b0bbf0f9c0f github.com/prometheus/common v0.44.0 github.com/prometheus/prometheus v2.5.0+incompatible - github.com/rs/cors v1.8.2 + github.com/rs/cors v1.9.0 github.com/russellhaering/gosaml2 v0.9.0 github.com/russellhaering/goxmldsig v1.2.0 github.com/samber/lo v1.38.1 @@ -47,7 +47,7 @@ require ( github.com/soheilhy/cmux v0.1.5 github.com/srikanthccv/ClickHouse-go-mock v0.4.0 github.com/stretchr/testify v1.8.4 - go.opentelemetry.io/collector/confmap v0.70.0 + go.opentelemetry.io/collector/confmap v0.79.0 go.opentelemetry.io/otel v1.17.0 go.opentelemetry.io/otel/sdk v1.16.0 go.uber.org/multierr v1.11.0 @@ -65,6 +65,7 @@ require ( ) require ( + contrib.go.opencensus.io/exporter/prometheus v0.4.2 // indirect github.com/Azure/azure-sdk-for-go/sdk/azcore v1.7.0 // indirect github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.3.0 // indirect github.com/Azure/azure-sdk-for-go/sdk/internal v1.3.0 // indirect @@ -77,6 +78,7 @@ require ( github.com/beevik/etree v1.1.0 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 // indirect + github.com/cenkalti/backoff/v4 v4.2.1 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/dennwc/varint v1.0.0 // indirect @@ -89,8 +91,10 @@ require ( github.com/go-logfmt/logfmt v0.6.0 // indirect github.com/go-logr/logr v1.2.4 // indirect github.com/go-logr/stdr v1.2.2 // indirect + github.com/go-ole/go-ole v1.2.6 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang-jwt/jwt/v4 v4.5.0 // indirect + github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/protobuf v1.5.3 // indirect github.com/golang/snappy v0.0.4 // indirect github.com/gopherjs/gopherjs v1.17.2 // indirect @@ -98,6 +102,7 @@ require ( github.com/gosimple/unidecode v1.0.0 // indirect github.com/grafana/regexp v0.0.0-20221122212121-6b5c0a4cb7fd // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 // indirect + github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/jonboulle/clockwork v0.2.2 // indirect github.com/josharian/intern v1.0.0 // indirect @@ -106,6 +111,7 @@ require ( github.com/klauspost/compress v1.16.7 // indirect github.com/klauspost/cpuid v1.2.3 // indirect github.com/kylelemons/godebug v1.1.0 // indirect + github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect github.com/mattermost/xml-roundtrip-validator v0.1.0 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect github.com/minio/md5-simd v1.1.0 // indirect @@ -116,28 +122,51 @@ require ( github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f // indirect + github.com/observiq/ctimefmt v1.0.0 // indirect github.com/oklog/run v1.1.0 // indirect github.com/oklog/ulid v1.3.1 // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.79.0 // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza v0.79.0 // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/processor/logstransformprocessor v0.79.0 // indirect github.com/paulmach/orb v0.10.0 // indirect github.com/pierrec/lz4/v4 v4.1.18 // indirect github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect + github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect github.com/prometheus/client_golang v1.16.0 // indirect github.com/prometheus/client_model v0.4.0 // indirect github.com/prometheus/common/sigv4 v0.1.0 // indirect github.com/prometheus/procfs v0.11.0 // indirect + github.com/prometheus/statsd_exporter v0.22.7 // indirect github.com/robfig/cron/v3 v3.0.1 // indirect github.com/segmentio/asm v1.2.0 // indirect github.com/segmentio/backo-go v1.0.1 // indirect + github.com/shirou/gopsutil/v3 v3.23.4 // indirect + github.com/shoenig/go-m1cpu v0.1.5 // indirect github.com/shopspring/decimal v1.3.1 // indirect github.com/sirupsen/logrus v1.9.0 // indirect github.com/smarty/assertions v1.15.0 // indirect + github.com/spf13/cobra v1.7.0 // indirect + github.com/spf13/pflag v1.0.5 // indirect + github.com/tklauser/go-sysconf v0.3.11 // indirect + github.com/tklauser/numcpus v0.6.0 // indirect github.com/xtgo/uuid v0.0.0-20140804021211-a0b114877d4c // indirect - go.opentelemetry.io/collector/featuregate v0.70.0 // indirect + github.com/yusufpapurcu/wmi v1.2.2 // indirect + go.opencensus.io v0.24.0 // indirect + go.opentelemetry.io/collector v0.79.0 // indirect + go.opentelemetry.io/collector/component v0.79.0 // indirect + go.opentelemetry.io/collector/consumer v0.79.0 // indirect + go.opentelemetry.io/collector/exporter v0.79.0 // indirect + go.opentelemetry.io/collector/featuregate v1.0.0-rcv0012 // indirect go.opentelemetry.io/collector/pdata v1.0.0-rcv0014 // indirect + go.opentelemetry.io/collector/receiver v0.79.0 // indirect go.opentelemetry.io/collector/semconv v0.81.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.42.0 // indirect + go.opentelemetry.io/contrib/propagators/b3 v1.17.0 // indirect + go.opentelemetry.io/otel/bridge/opencensus v0.39.0 // indirect + go.opentelemetry.io/otel/exporters/prometheus v0.39.0 // indirect go.opentelemetry.io/otel/metric v1.17.0 // indirect + go.opentelemetry.io/otel/sdk/metric v0.39.0 // indirect go.opentelemetry.io/otel/trace v1.17.0 // indirect go.opentelemetry.io/proto/otlp v1.0.0 // indirect go.uber.org/atomic v1.11.0 // indirect @@ -146,6 +175,7 @@ require ( golang.org/x/sys v0.11.0 // indirect golang.org/x/text v0.11.0 // indirect golang.org/x/time v0.3.0 // indirect + gonum.org/v1/gonum v0.13.0 // indirect google.golang.org/appengine v1.6.7 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20230717213848-3f92550aa753 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20230717213848-3f92550aa753 // indirect diff --git a/go.sum b/go.sum index 6283ae2486..e4fb091cf3 100644 --- a/go.sum +++ b/go.sum @@ -53,6 +53,8 @@ cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohl cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs= cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0= cloud.google.com/go/storage v1.22.1/go.mod h1:S8N1cAStu7BOeFfE8KAQzmyyLkK8p/vmRq6kuBTW58Y= +contrib.go.opencensus.io/exporter/prometheus v0.4.2 h1:sqfsYl5GIY/L570iT+l93ehxaWJs2/OwXtiWwew3oAg= +contrib.go.opencensus.io/exporter/prometheus v0.4.2/go.mod h1:dvEHbiKmgvbr5pjaF9fpw1KeYcjrnC1J8B+JKjsZyRQ= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= github.com/Azure/azure-sdk-for-go v65.0.0+incompatible h1:HzKLt3kIwMm4KeJYTdx9EbjRYTySD/t8i1Ee/W5EGXw= github.com/Azure/azure-sdk-for-go v65.0.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc= @@ -90,6 +92,7 @@ github.com/DATA-DOG/go-sqlmock v1.5.0 h1:Shsta01QNfFxHCfpW6YH2STWB0MudeXXEWMr20O github.com/DATA-DOG/go-sqlmock v1.5.0/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM= github.com/Microsoft/go-winio v0.6.1 h1:9/kr64B9VUZrLm5YYwbGtUJnMgqWVOdUAXu6Migciow= github.com/Microsoft/go-winio v0.6.1/go.mod h1:LRdKpFKfdobln8UmuiYcKPot9D2v6svN5+sAH+4kjUM= +github.com/Mottl/ctimefmt v0.0.0-20190803144728-fd2ac23a585a/go.mod h1:eyj2WSIdoPMPs2eNTLpSmM6Nzqo4V80/d6jHpnJ1SAI= github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/SigNoz/govaluate v0.0.0-20220522085550-d19c08c206cb h1:bneLSKPf9YUSFmafKx32bynV6QrzViL/s+ZDvQxH1E4= @@ -114,6 +117,8 @@ github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHG github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/antonmedv/expr v1.12.4 h1:YRkeF7r0cejMS47bDYe3Jyes7L9t1AhpunC+Duq+R9k= github.com/antonmedv/expr v1.12.4/go.mod h1:FPC8iWArxls7axbVLsW+kpg1mz29A1b2M6jt+hZfDkU= +github.com/antonmedv/expr v1.12.5 h1:Fq4okale9swwL3OeLLs9WD9H6GbgBLJyN/NUHRv+n0E= +github.com/antonmedv/expr v1.12.5/go.mod h1:FPC8iWArxls7axbVLsW+kpg1mz29A1b2M6jt+hZfDkU= github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o= github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY= github.com/armon/go-metrics v0.4.1 h1:hR91U9KYmb6bLBYLQjyM+3j+rcd/UhE+G78SFnF8gJA= @@ -146,6 +151,8 @@ github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6r github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY= github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4= +github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM= +github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= @@ -173,6 +180,7 @@ github.com/coreos/go-oidc/v3 v3.4.0/go.mod h1:eHUXhZtXPQLgEaDrOVTgwbgmz1xGOkJNye github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= +github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -244,12 +252,14 @@ github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2 github.com/go-kit/kit v0.13.0 h1:OoneCcHKHQ03LfBpoQCUfCluwd2Vt3ohz+kvbJneZAU= github.com/go-kit/kit v0.13.0/go.mod h1:phqEHMMUbyrCFCTgH48JueqrM3md2HcAZ8N3XE4FKDg= github.com/go-kit/log v0.1.0/go.mod h1:zbhenjAZHb184qTLMA9ZjW7ThYL0H2mk7Q6pNt4vbaY= +github.com/go-kit/log v0.2.0/go.mod h1:NwTd00d/i8cPZ3xOwwiv2PO5MOcx78fFErGNcVmBjv0= github.com/go-kit/log v0.2.1 h1:MRVx0/zhvdseW+Gza6N9rVzU/IVzaeE1SFI4raAhmBU= github.com/go-kit/log v0.2.1/go.mod h1:NwTd00d/i8cPZ3xOwwiv2PO5MOcx78fFErGNcVmBjv0= github.com/go-ldap/ldap v3.0.2+incompatible/go.mod h1:qfd9rJvER9Q0/D/Sqn1DfHRoBp40uXYvFoEVrNEPqRc= github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= +github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs= github.com/go-logfmt/logfmt v0.6.0 h1:wGYYu3uicYdqXVgoYbvnkrPVXkuLM1p1ifugDMEdRi4= github.com/go-logfmt/logfmt v0.6.0/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs= github.com/go-logr/logr v1.2.0/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= @@ -258,6 +268,8 @@ github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ= github.com/go-logr/logr v1.2.4/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY= +github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= github.com/go-openapi/jsonpointer v0.19.6 h1:eCs3fxoIi3Wh6vtgmLTOjdhSpiqphQ+DaPn38N2ZdrE= github.com/go-openapi/jsonpointer v0.19.6/go.mod h1:osyAmYz/mB/C3I+WsTTSgw1ONzaLJoLCyoi6/zppojs= github.com/go-openapi/jsonreference v0.20.1 h1:FBLnyygC4/IZZr893oiomc9XaghoveYTrLC1F86HID8= @@ -291,6 +303,8 @@ github.com/golang/glog v1.1.0/go.mod h1:pfYeQZ3JWZoXTV5sFc986z3HTpwQs9At6P4ImfuP github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= +github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE= +github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y= @@ -469,6 +483,8 @@ github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1: github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/imdario/mergo v0.3.6 h1:xTNEAn+kxVO7dTZGu0CegyqKZmoWFI0rF8UxjlB2d28= github.com/imdario/mergo v0.3.6/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA= +github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= +github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= github.com/ionos-cloud/sdk-go/v6 v6.1.8 h1:493wE/BkZxJf7x79UCE0cYGPZoqQcPiEBALvt7uVGY0= github.com/ionos-cloud/sdk-go/v6 v6.1.8/go.mod h1:EzEgRIDxBELvfoa/uBN0kOQaqovLjUWEB7iW4/Q+t4k= github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= @@ -527,6 +543,8 @@ github.com/lib/pq v1.2.0 h1:LXpIM/LZ5xGFhOpXAQUIMM1HdyqzVYM13zNdjCEEcA0= github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/linode/linodego v1.19.0 h1:n4WJrcr9+30e9JGZ6DI0nZbm5SdAj1kSwvvt/998YUw= github.com/linode/linodego v1.19.0/go.mod h1:XZFR+yJ9mm2kwf6itZ6SCpu+6w3KnIevV0Uu5HNWJgQ= +github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ81pIr0yLvtUWk2if982qA3F3QD6H4= +github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I= github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= github.com/mattermost/xml-roundtrip-validator v0.1.0 h1:RXbVD2UAl7A7nOTR4u7E3ILa4IbtvKBHw64LDsmu9hU= @@ -593,6 +611,8 @@ github.com/npillmayer/nestext v0.1.3/go.mod h1:h2lrijH8jpicr25dFY+oAJLyzlya6jhnu github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= +github.com/observiq/ctimefmt v1.0.0 h1:r7vTJ+Slkrt9fZ67mkf+mA6zAdR5nGIJRMTzkUyvilk= +github.com/observiq/ctimefmt v1.0.0/go.mod h1:mxi62//WbSpG/roCO1c6MqZ7zQTvjVtYheqHN3eOjvc= github.com/oklog/oklog v0.3.2 h1:wVfs8F+in6nTBMkA7CbRw+zZMIB7nNM825cM1wuzoTk= github.com/oklog/oklog v0.3.2/go.mod h1:FCV+B7mhrz4o+ueLpx+KqkyXRGMWOYEvfiXtdGtbWGs= github.com/oklog/run v1.0.0/go.mod h1:dlhp/R75TPv97u0XWUtDeV/lRKWPKSdTuV0TZvrmrQA= @@ -614,6 +634,12 @@ github.com/onsi/gomega v1.27.4 h1:Z2AnStgsdSayCMDiCU42qIz+HLqEPcgiOCXjAU/w+8E= github.com/onsi/gomega v1.27.4/go.mod h1:riYq/GJKh8hhoM01HN6Vmuy93AarCXCBGpvFDK3q3fQ= github.com/open-telemetry/opamp-go v0.5.0 h1:2YFbb6G4qBkq3yTRdVb5Nfz9hKHW/ldUyex352e1J7g= github.com/open-telemetry/opamp-go v0.5.0/go.mod h1:IMdeuHGVc5CjKSu5/oNV0o+UmiXuahoHvoZ4GOmAI9M= +github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.79.0 h1:OZPeakqoSZ1yRlmGBlWi9kISx/9PJzlNLGLutFPOQY0= +github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.79.0/go.mod h1:VOHKYi1wm+/c2wZA3mY1Grd4eYP8uS//EV0yHBbGfGw= +github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza v0.79.0 h1:o1aUgN0pA5Sc0s2bOUy7vDoNyJ6D6qdHihXk3BKyf58= +github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza v0.79.0/go.mod h1:t8I2umZdg81AQmncs7fVHw1YMzSol3A7ecsc2lfqgaM= +github.com/open-telemetry/opentelemetry-collector-contrib/processor/logstransformprocessor v0.79.0 h1:EpuwiWvq1hqS4PAp/+kMvWVkM4o+PRGtTGSDLpmIeME= +github.com/open-telemetry/opentelemetry-collector-contrib/processor/logstransformprocessor v0.79.0/go.mod h1:0dccj1BrKVG00hvt2f70tu7Re1YjAl5Jpy2lduSrLnI= github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM= github.com/opencontainers/image-spec v1.1.0-rc4 h1:oOxKUJWnFC4YGHCCMNql1x4YaDfYBTS5Y4x/Cgeo1E0= @@ -648,11 +674,16 @@ github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndr github.com/posener/complete v1.2.3/go.mod h1:WZIdtGGp+qx0sLrYKtIRAruyNpv6hFCicSgv7Sy7s/s= github.com/posthog/posthog-go v0.0.0-20220817142604-0b0bbf0f9c0f h1:h0p1aZ9F5d6IXOygysob3g4B07b+HuVUQC0VJKD8wA4= github.com/posthog/posthog-go v0.0.0-20220817142604-0b0bbf0f9c0f/go.mod h1:oa2sAs9tGai3VldabTV0eWejt/O4/OOD7azP8GaikqU= +github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c h1:ncq/mPwQF4JjgDlrVEn3C11VoGHZN7m8qihwgMEtzYw= +github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE= github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo= github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M= github.com/prometheus/client_golang v1.11.0/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0= github.com/prometheus/client_golang v1.11.1/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0= +github.com/prometheus/client_golang v1.12.1/go.mod h1:3Z9XVyYiZYEO+YQWt3RD2R3jrbd179Rt297l4aS6nDY= +github.com/prometheus/client_golang v1.12.2/go.mod h1:3Z9XVyYiZYEO+YQWt3RD2R3jrbd179Rt297l4aS6nDY= +github.com/prometheus/client_golang v1.13.0/go.mod h1:vTeo+zgvILHsnnj/39Ou/1fPN5nJFOEMgftOUOmlvYQ= github.com/prometheus/client_golang v1.16.0 h1:yk/hx9hDbrGHovbci4BY+pRMfSuuat626eFsHb7tmT8= github.com/prometheus/client_golang v1.16.0/go.mod h1:Zsulrv/L9oM40tJ7T815tM89lFEugiJ9HzIqaAx4LKc= github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= @@ -665,6 +696,9 @@ github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y8 github.com/prometheus/common v0.10.0/go.mod h1:Tlit/dnDKsSWFlCLTWaA1cyBgKHSMdTB80sz/V91rCo= github.com/prometheus/common v0.26.0/go.mod h1:M7rCNAaPfAosfx8veZJCuw84e35h3Cfd9VFqTh1DIvc= github.com/prometheus/common v0.29.0/go.mod h1:vu+V0TpY+O6vW9J44gczi3Ap/oXXR10b+M/gUGO4Hls= +github.com/prometheus/common v0.32.1/go.mod h1:vu+V0TpY+O6vW9J44gczi3Ap/oXXR10b+M/gUGO4Hls= +github.com/prometheus/common v0.35.0/go.mod h1:phzohg0JFMnBEFGxTDbfu3QyL5GI8gTQJFhYO5B3mfA= +github.com/prometheus/common v0.37.0/go.mod h1:phzohg0JFMnBEFGxTDbfu3QyL5GI8gTQJFhYO5B3mfA= github.com/prometheus/common v0.44.0 h1:+5BrQJwiBB9xsMygAB3TNvpQKOwlkc25LbISbrdOOfY= github.com/prometheus/common v0.44.0/go.mod h1:ofAIvZbQ1e/nugmZGz4/qCb9Ap1VoSTIO7x0VV9VvuY= github.com/prometheus/common/sigv4 v0.1.0 h1:qoVebwtwwEhS85Czm2dSROY5fTo2PAPEVdDeppTwGX4= @@ -673,8 +707,12 @@ github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= +github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= +github.com/prometheus/procfs v0.8.0/go.mod h1:z7EfXMXOkbkqb9IINtpCn86r/to3BnA0uaxHdg830/4= github.com/prometheus/procfs v0.11.0 h1:5EAgkfkMl659uZPbe9AS2N68a7Cc1TJbPEuGzFuRbyk= github.com/prometheus/procfs v0.11.0/go.mod h1:nwNm2aOCAYw8uTR/9bWRREkZFxAUcWzPHWJq+XBB/FM= +github.com/prometheus/statsd_exporter v0.22.7 h1:7Pji/i2GuhK6Lu7DHrtTkFmNBCudCPT1pX2CziuyQR0= +github.com/prometheus/statsd_exporter v0.22.7/go.mod h1:N/TevpjkIh9ccs6nuzY3jQn9dFqnUakOjnEuMPJJJnI= github.com/rhnvrm/simples3 v0.6.1/go.mod h1:Y+3vYm2V7Y4VijFoJHHTrja6OgPrJ2cBti8dPGkC3sA= github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= @@ -687,11 +725,14 @@ github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjR github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= github.com/rs/cors v1.8.2 h1:KCooALfAYGs415Cwu5ABvv9n9509fSiG5SQJn/AQo4U= github.com/rs/cors v1.8.2/go.mod h1:XyqrcTp5zjWr1wsJ8PIRZssZ8b/WMcMf71DJnit4EMU= +github.com/rs/cors v1.9.0 h1:l9HGsTsHJcvW14Nk7J9KFz8bzeAWXn3CG6bgt7LsrAE= +github.com/rs/cors v1.9.0/go.mod h1:XyqrcTp5zjWr1wsJ8PIRZssZ8b/WMcMf71DJnit4EMU= github.com/russellhaering/gosaml2 v0.9.0 h1:CNMnH42z/GirrKjdmNrSS6bAAs47F9bPdl4PfRmVOIk= github.com/russellhaering/gosaml2 v0.9.0/go.mod h1:byViER/1YPUa0Puj9ROZblpoq2jsE7h/CJmitzX0geU= github.com/russellhaering/goxmldsig v1.2.0 h1:Y6GTTc9Un5hCxSzVz4UIWQ/zuVwDvzJk80guqzwx6Vg= github.com/russellhaering/goxmldsig v1.2.0/go.mod h1:gM4MDENBQf7M+V824SGfyIUVFWydB7n0KkEubVJl+Tw= github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= +github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= github.com/ryanuber/columnize v2.1.0+incompatible/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= github.com/ryanuber/go-glob v1.0.0/go.mod h1:807d1WSdnB0XRJzKNil9Om6lcp/3a0v4qIHxIXzX/Yc= @@ -706,6 +747,12 @@ github.com/segmentio/backo-go v1.0.1 h1:68RQccglxZeyURy93ASB/2kc9QudzgIDexJ927N+ github.com/segmentio/backo-go v1.0.1/go.mod h1:9/Rh6yILuLysoQnZ2oNooD2g7aBnvM7r/fNVxRNWfBc= github.com/sethvargo/go-password v0.2.0 h1:BTDl4CC/gjf/axHMaDQtw507ogrXLci6XRiLc7i/UHI= github.com/sethvargo/go-password v0.2.0/go.mod h1:Ym4Mr9JXLBycr02MFuVQ/0JHidNetSgbzutTr3zsYXE= +github.com/shirou/gopsutil v3.21.11+incompatible h1:+1+c1VGhc88SSonWP6foOcLhvnKlUeu/erjjvaPEYiI= +github.com/shirou/gopsutil/v3 v3.23.4 h1:hZwmDxZs7Ewt75DV81r4pFMqbq+di2cbt9FsQBqLD2o= +github.com/shirou/gopsutil/v3 v3.23.4/go.mod h1:ZcGxyfzAMRevhUR2+cfhXDH6gQdFYE/t8j1nsU4mPI8= +github.com/shoenig/go-m1cpu v0.1.5 h1:LF57Z/Fpb/WdGLjt2HZilNnmZOxg/q2bSKTQhgbrLrQ= +github.com/shoenig/go-m1cpu v0.1.5/go.mod h1:Wwvst4LR89UxjeFtLRMrpgRiyY4xPsejnVZym39dbAQ= +github.com/shoenig/test v0.6.3/go.mod h1:byHiCGXqrVaflBLAMq/srcZIHynQPQgeyvkvXnjqq0k= github.com/shopspring/decimal v1.3.1 h1:2Usl1nmF/WZucqkFZhnfFYxxxu8LG21F6nPQBE5gKV8= github.com/shopspring/decimal v1.3.1/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o= github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= @@ -729,6 +776,8 @@ github.com/soheilhy/cmux v0.1.5 h1:jjzc5WVemNEDTLwv9tlmemhC73tI08BNOIGwBOo10Js= github.com/soheilhy/cmux v0.1.5/go.mod h1:T7TcVDs9LWfQgPlPsdngu6I6QIoyIFZDDC6sNE1GqG0= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72 h1:qLC7fQah7D6K1B0ujays3HV9gkFtllcxhzImRR7ArPQ= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= +github.com/spf13/cobra v1.7.0 h1:hyqWnYt1ZQShIddO5kBpj3vu05/++x6tJ6dg8EC572I= +github.com/spf13/cobra v1.7.0/go.mod h1:uLxZILRyS/50WlhOIKD7W6V5bgeIt+4sICxh6uRMrb0= github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/srikanthccv/ClickHouse-go-mock v0.4.0 h1:tLk7qoDLg7Z5YD5mOmNqjRDbsm6ehJVXOFvSnG+gQAg= @@ -746,10 +795,16 @@ github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/stvp/go-udp-testing v0.0.0-20201019212854-469649b16807/go.mod h1:7jxmlfBCDBXRzr0eAQJ48XC1hBu1np4CS5+cHEYfwpc= github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= +github.com/tklauser/go-sysconf v0.3.11 h1:89WgdJhk5SNwJfu+GKyYveZ4IaJ7xAkecBo+KdJV0CM= +github.com/tklauser/go-sysconf v0.3.11/go.mod h1:GqXfhXY3kiPa0nAXPDIQIWzJbMCB7AmcWpGR8lSZfqI= +github.com/tklauser/numcpus v0.6.0 h1:kebhY2Qt+3U6RNK7UqpYNA+tJ23IBEGKkB7JQBfDYms= +github.com/tklauser/numcpus v0.6.0/go.mod h1:FEZLMke0lhOUG6w2JadTzp0a+Nl8PF/GFkQ5UVIcaL4= github.com/urfave/cli v1.22.5/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= github.com/urfave/negroni v1.0.0 h1:kIimOitoypq34K7TG7DUaJ9kq/N4Ofuwi1sjz0KipXc= github.com/urfave/negroni v1.0.0/go.mod h1:Meg73S6kFm/4PpbYdq35yYWoCZ9mS/YSx+lKnmiohz4= @@ -767,6 +822,8 @@ github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +github.com/yusufpapurcu/wmi v1.2.2 h1:KBNDSne4vP5mbSWnJbO+51IMOXJB67QiYCSBrubbPRg= +github.com/yusufpapurcu/wmi v1.2.2/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= go.etcd.io/etcd/api/v3 v3.5.4/go.mod h1:5GB2vv4A4AOn3yk7MftYGHkUfGtDHnEraIjym4dYz5A= go.etcd.io/etcd/client/pkg/v3 v3.5.4/go.mod h1:IJHfcCEKxYu1Os13ZdwCwIUTUVGYTSAM3YSwc9/Ac1g= go.etcd.io/etcd/client/v3 v3.5.4/go.mod h1:ZaRkVgBZC+L+dLCjTcF1hRXpgZXQPOvnA/Ak/gq3kiY= @@ -778,22 +835,46 @@ go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk= go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E= +go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= +go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= +go.opentelemetry.io/collector v0.79.0 h1:Lra7U0ilMor1g5WVkO3YZ0kZYsvzAtGN+Uq+CmC96JY= +go.opentelemetry.io/collector v0.79.0/go.mod h1:O2Vfwykphq9VqdATZiAypjnJMS3WFBXwFSe/0ujo38Q= +go.opentelemetry.io/collector/component v0.79.0 h1:ZKLJ4qa0AngmyGp1RQBJgl6OIP6mxdfrVpbz09h/W34= +go.opentelemetry.io/collector/component v0.79.0/go.mod h1:rX0gixMemcXZTZaML5zUiT+5txZUYkWnACscJkFVj18= go.opentelemetry.io/collector/confmap v0.70.0 h1:GJDaM7c3yFyT7Zv6l2/5ahwaqPCvtC92Ii8Bg2AVdjU= go.opentelemetry.io/collector/confmap v0.70.0/go.mod h1:8//JWR2TMChLH35Az0mGFrCskEIP6POgZJK6iRRhzeM= +go.opentelemetry.io/collector/confmap v0.79.0 h1:a4XVde3lLP81BiSbt8AzVD6pvQBX8YkrB9ZtMSHKv1A= +go.opentelemetry.io/collector/confmap v0.79.0/go.mod h1:cKr2c7lVtEJCuMOncUPlcROJBbTFaHiPjYp1Y8RbL+Q= +go.opentelemetry.io/collector/consumer v0.79.0 h1:V/4PCvbTw2Bt+lYb/ogac0g/nCCb3oKnmz+jM3t5Dyk= +go.opentelemetry.io/collector/consumer v0.79.0/go.mod h1:VfqIyUI5K20zXx3mfVN+skmA+V3sV5fNorJ5TaIOj/U= +go.opentelemetry.io/collector/exporter v0.79.0 h1:PxhKgWf1AkZvN1PjiJT5xiO+pKZA9Y4fyuMs5aNFuEA= +go.opentelemetry.io/collector/exporter v0.79.0/go.mod h1:qlXiqnOUeHelpAwk03f8nB5+91UIqlA7udSBsj9bJ3M= go.opentelemetry.io/collector/featuregate v0.70.0 h1:Xr6hrMT/++SjTm06nreex8WlpgFhYJ7S0yRVn1OvVf8= go.opentelemetry.io/collector/featuregate v0.70.0/go.mod h1:ih+oCwrHW3bLac/qnPUzes28yDCDmh8WzsAKKauwCYI= +go.opentelemetry.io/collector/featuregate v1.0.0-rcv0012 h1:pSO81lfikGEgRXHepmOGy2o6WWCly427UJCgMJC5c8g= +go.opentelemetry.io/collector/featuregate v1.0.0-rcv0012/go.mod h1:/kVAsGUCyJXIDSgHftCN63QiwAEVHRLX2Kh/S+dqgHY= go.opentelemetry.io/collector/pdata v1.0.0-rcv0014 h1:iT5qH0NLmkGeIdDtnBogYDx7L58t6CaWGL378DEo2QY= go.opentelemetry.io/collector/pdata v1.0.0-rcv0014/go.mod h1:BRvDrx43kiSoUx3mr7SoA7h9B8+OY99mUK+CZSQFWW4= +go.opentelemetry.io/collector/receiver v0.79.0 h1:Ag4hciAYklQWDpKbnmqhfh9zJlUskWvThpCpphp12b4= +go.opentelemetry.io/collector/receiver v0.79.0/go.mod h1:+/xe0VoYl6Mli+KQTZWBR2apqFsbioAAqu7abzKDskI= go.opentelemetry.io/collector/semconv v0.81.0 h1:lCYNNo3powDvFIaTPP2jDKIrBiV1T92NK4QgL/aHYXw= go.opentelemetry.io/collector/semconv v0.81.0/go.mod h1:TlYPtzvsXyHOgr5eATi43qEMqwSmIziivJB2uctKswo= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.42.0 h1:pginetY7+onl4qN1vl0xW/V/v6OBZ0vVdH+esuJgvmM= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.42.0/go.mod h1:XiYsayHc36K3EByOO6nbAXnAWbrUxdjUROCEeeROOH8= +go.opentelemetry.io/contrib/propagators/b3 v1.17.0 h1:ImOVvHnku8jijXqkwCSyYKRDt2YrnGXD4BbhcpfbfJo= +go.opentelemetry.io/contrib/propagators/b3 v1.17.0/go.mod h1:IkfUfMpKWmynvvE0264trz0sf32NRTZL4nuAN9AbWRc= go.opentelemetry.io/otel v1.17.0 h1:MW+phZ6WZ5/uk2nd93ANk/6yJ+dVrvNWUjGhnnFU5jM= go.opentelemetry.io/otel v1.17.0/go.mod h1:I2vmBGtFaODIVMBSTPVDlJSzBDNf93k60E6Ft0nyjo0= +go.opentelemetry.io/otel/bridge/opencensus v0.39.0 h1:YHivttTaDhbZIHuPlg1sWsy2P5gj57vzqPfkHItgbwQ= +go.opentelemetry.io/otel/bridge/opencensus v0.39.0/go.mod h1:vZ4537pNjFDXEx//WldAR6Ro2LC8wwmFC76njAXwNPE= +go.opentelemetry.io/otel/exporters/prometheus v0.39.0 h1:whAaiHxOatgtKd+w0dOi//1KUxj3KoPINZdtDaDj3IA= +go.opentelemetry.io/otel/exporters/prometheus v0.39.0/go.mod h1:4jo5Q4CROlCpSPsXLhymi+LYrDXd2ObU5wbKayfZs7Y= go.opentelemetry.io/otel/metric v1.17.0 h1:iG6LGVz5Gh+IuO0jmgvpTB6YVrCGngi8QGm+pMd8Pdc= go.opentelemetry.io/otel/metric v1.17.0/go.mod h1:h4skoxdZI17AxwITdmdZjjYJQH5nzijUUjm+wtPph5o= go.opentelemetry.io/otel/sdk v1.16.0 h1:Z1Ok1YsijYL0CSJpHt4cS3wDDh7p572grzNrBMiMWgE= go.opentelemetry.io/otel/sdk v1.16.0/go.mod h1:tMsIuKXuuIWPBAOrH+eHtvhTL+SntFtXF9QD68aP6p4= +go.opentelemetry.io/otel/sdk/metric v0.39.0 h1:Kun8i1eYf48kHH83RucG93ffz0zGV1sh46FAScOTuDI= +go.opentelemetry.io/otel/sdk/metric v0.39.0/go.mod h1:piDIRgjcK7u0HCL5pCA4e74qpK/jk3NiUoAHATVAmiI= go.opentelemetry.io/otel/trace v1.17.0 h1:/SWhSRHmDPOImIAetP1QAeMnZYiQXrTy4fMMYOdSKWQ= go.opentelemetry.io/otel/trace v1.17.0/go.mod h1:I/4vKTgFclIsXRVucpH25X0mpFSczM7aHeaz0ZBLWjY= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= @@ -977,6 +1058,7 @@ golang.org/x/sys v0.0.0-20190606165138-5da285871e9c/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190922100055-0a153f010e69/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190924154521-2837fb4f24fe/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191001151750-bb3f8db39f24/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -1008,6 +1090,7 @@ golang.org/x/sys v0.0.0-20200905004654-be1d3432aa8f/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201201145000-ef89a241ccb3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201204225414-ed752295db88/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210104204734-6f8348627aad/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210112080510-489259a85091/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -1034,6 +1117,7 @@ golang.org/x/sys v0.0.0-20210908233432-aa78b53d3365/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20211124211545-fe61309f8881/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211210111614-af8b64212486/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220128215802-99c3d69c2c27/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220209214540-3681064d5158/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220227234510-4e6760a101f9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -1043,10 +1127,13 @@ golang.org/x/sys v0.0.0-20220502124256-b6088ccd6cba/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220503163025-988cb79eb6c6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220610221304-9f5ed59c137d/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220708085239-5a0f0661e09d/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.11.0 h1:eG7RXZHdqOJ1i+0lgLgCpSXAp6M3LYlAo6osgSi0xOM= golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= @@ -1138,6 +1225,8 @@ golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20220411194840-2f41105eb62f/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20220517211312-f3a8303e98df/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8= golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8= +gonum.org/v1/gonum v0.13.0 h1:a0T3bh+7fhRyqeNbiC3qVHYmkiQgit3wnNan/2c0HMM= +gonum.org/v1/gonum v0.13.0/go.mod h1:/WPYRckkfWrhWefxyYTfrTtQR0KH4iyHNuzxqXAKyAU= google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE= google.golang.org/api v0.7.0/go.mod h1:WtwebWUNSVBH/HAw79HIFXZNqEvBhG+Ra+ax0hx3E3M= google.golang.org/api v0.8.0/go.mod h1:o4eAsZoiT+ibD93RtjEohWalFOjRDx6CVaqeizhEnKg= @@ -1320,6 +1409,7 @@ google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp0 google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= diff --git a/pkg/query-service/collectorsimulator/collectorsimulator.go b/pkg/query-service/collectorsimulator/collectorsimulator.go new file mode 100644 index 0000000000..c4537cf3ee --- /dev/null +++ b/pkg/query-service/collectorsimulator/collectorsimulator.go @@ -0,0 +1,234 @@ +package collectorsimulator + +import ( + "bytes" + "context" + "fmt" + "strings" + + "github.com/google/uuid" + "github.com/knadh/koanf/parsers/yaml" + "github.com/pkg/errors" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/confmap" + "go.opentelemetry.io/collector/confmap/provider/yamlprovider" + "go.opentelemetry.io/collector/connector" + "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/extension" + "go.opentelemetry.io/collector/otelcol" + "go.opentelemetry.io/collector/processor" + "go.opentelemetry.io/collector/receiver" + "go.opentelemetry.io/collector/service" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + + "go.signoz.io/signoz/pkg/query-service/collectorsimulator/inmemoryexporter" + "go.signoz.io/signoz/pkg/query-service/collectorsimulator/inmemoryreceiver" + "go.signoz.io/signoz/pkg/query-service/model" +) + +// Puts together a collector service with inmemory receiver and exporter +// for simulating processing of signal data through an otel collector +type CollectorSimulator struct { + // collector service to be used for the simulation + collectorSvc *service.Service + + // Buffer where collectorSvc will log errors. + collectorErrorLogsBuffer *bytes.Buffer + + // error channel where collector components will report fatal errors + // Gets passed in as AsyncErrorChannel in service.Settings when creating a collector service. + collectorErrorChannel chan error + + // Unique ids of inmemory receiver and exporter instances that + // will be created by collectorSvc + inMemoryReceiverId string + inMemoryExporterId string +} + +func NewCollectorSimulator( + ctx context.Context, + signalType component.DataType, + processorFactories map[component.Type]processor.Factory, + processorConfigs []ProcessorConfig, +) (*CollectorSimulator, *model.ApiError) { + // Put together collector component factories for use in the simulation + receiverFactories, err := receiver.MakeFactoryMap(inmemoryreceiver.NewFactory()) + if err != nil { + return nil, model.InternalError(errors.Wrap(err, "could not create receiver factories.")) + } + exporterFactories, err := exporter.MakeFactoryMap(inmemoryexporter.NewFactory()) + if err != nil { + return nil, model.InternalError(errors.Wrap(err, "could not create processor factories.")) + } + factories := otelcol.Factories{ + Receivers: receiverFactories, + Processors: processorFactories, + Exporters: exporterFactories, + } + + // Prepare collector config yaml for simulation + inMemoryReceiverId := uuid.NewString() + inMemoryExporterId := uuid.NewString() + + collectorConfYaml, err := generateSimulationConfig( + signalType, inMemoryReceiverId, processorConfigs, inMemoryExporterId, + ) + if err != nil { + return nil, model.BadRequest(errors.Wrap(err, "could not generate collector config")) + } + + // Parse and validate collector config + yamlP := yamlprovider.New() + confProvider, err := otelcol.NewConfigProvider(otelcol.ConfigProviderSettings{ + ResolverSettings: confmap.ResolverSettings{ + URIs: []string{"yaml:" + string(collectorConfYaml)}, + Providers: map[string]confmap.Provider{yamlP.Scheme(): yamlP}, + }, + }) + if err != nil { + return nil, model.BadRequest(errors.Wrap(err, "could not create config provider.")) + } + collectorCfg, err := confProvider.Get(ctx, factories) + if err != nil { + return nil, model.BadRequest(errors.Wrap(err, "failed to parse collector config")) + } + + if err = collectorCfg.Validate(); err != nil { + return nil, model.BadRequest(errors.Wrap(err, "invalid collector config")) + } + + // Build and start collector service. + collectorErrChan := make(chan error) + var collectorErrBuf bytes.Buffer + svcSettings := service.Settings{ + Receivers: receiver.NewBuilder(collectorCfg.Receivers, factories.Receivers), + Processors: processor.NewBuilder(collectorCfg.Processors, factories.Processors), + Exporters: exporter.NewBuilder(collectorCfg.Exporters, factories.Exporters), + Connectors: connector.NewBuilder(collectorCfg.Connectors, factories.Connectors), + Extensions: extension.NewBuilder(collectorCfg.Extensions, factories.Extensions), + AsyncErrorChannel: collectorErrChan, + LoggingOptions: []zap.Option{ + zap.ErrorOutput(zapcore.AddSync(&collectorErrBuf)), + }, + } + + collectorSvc, err := service.New(ctx, svcSettings, collectorCfg.Service) + if err != nil { + return nil, model.InternalError(errors.Wrap(err, "could not instantiate collector service")) + } + + return &CollectorSimulator{ + inMemoryReceiverId: inMemoryReceiverId, + inMemoryExporterId: inMemoryExporterId, + collectorSvc: collectorSvc, + collectorErrorLogsBuffer: &collectorErrBuf, + collectorErrorChannel: collectorErrChan, + }, nil +} + +func (l *CollectorSimulator) Start(ctx context.Context) ( + func(), *model.ApiError, +) { + // Calling collectorSvc.Start below will in turn call Start on + // inmemory receiver and exporter instances created by collectorSvc + // + // inmemory components are indexed in a global map after Start is called + // on them and will have to be cleaned up to ensure there is no memory leak + cleanupFn := func() { + inmemoryreceiver.CleanupInstance(l.inMemoryReceiverId) + inmemoryexporter.CleanupInstance(l.inMemoryExporterId) + } + + err := l.collectorSvc.Start(ctx) + if err != nil { + return cleanupFn, model.InternalError(errors.Wrap(err, "could not start collector service for simulation")) + } + + return cleanupFn, nil +} + +func (l *CollectorSimulator) GetReceiver() *inmemoryreceiver.InMemoryReceiver { + return inmemoryreceiver.GetReceiverInstance(l.inMemoryReceiverId) +} + +func (l *CollectorSimulator) GetExporter() *inmemoryexporter.InMemoryExporter { + return inmemoryexporter.GetExporterInstance(l.inMemoryExporterId) +} + +func (l *CollectorSimulator) Shutdown(ctx context.Context) ( + simulationErrs []string, apiErr *model.ApiError, +) { + shutdownErr := l.collectorSvc.Shutdown(ctx) + + // Collect all errors logged or reported by collectorSvc + simulationErrs = []string{} + close(l.collectorErrorChannel) + for reportedErr := range l.collectorErrorChannel { + simulationErrs = append(simulationErrs, reportedErr.Error()) + } + + if l.collectorErrorLogsBuffer.Len() > 0 { + errBufText := strings.TrimSpace(l.collectorErrorLogsBuffer.String()) + errBufLines := strings.Split(errBufText, "\n") + simulationErrs = append(simulationErrs, errBufLines...) + } + + if shutdownErr != nil { + return simulationErrs, model.InternalError(errors.Wrap( + shutdownErr, "could not shutdown the collector service", + )) + } + return simulationErrs, nil +} + +func generateSimulationConfig( + signalType component.DataType, + receiverId string, + processorConfigs []ProcessorConfig, + exporterId string, +) ([]byte, error) { + baseConf := fmt.Sprintf(` + receivers: + memory: + id: %s + exporters: + memory: + id: %s + service: + telemetry: + metrics: + level: none + logs: + level: error + `, receiverId, exporterId) + + simulationConf, err := yaml.Parser().Unmarshal([]byte(baseConf)) + if err != nil { + return nil, err + } + + processors := map[string]interface{}{} + procNamesInOrder := []string{} + for _, processorConf := range processorConfigs { + processors[processorConf.Name] = processorConf.Config + procNamesInOrder = append(procNamesInOrder, processorConf.Name) + } + simulationConf["processors"] = processors + + svc := simulationConf["service"].(map[string]interface{}) + svc["pipelines"] = map[string]interface{}{ + string(signalType): map[string]interface{}{ + "receivers": []string{"memory"}, + "processors": procNamesInOrder, + "exporters": []string{"memory"}, + }, + } + + simulationConfYaml, err := yaml.Parser().Marshal(simulationConf) + if err != nil { + return nil, err + } + + return simulationConfYaml, nil +} diff --git a/pkg/query-service/collectorsimulator/inmemoryexporter/config.go b/pkg/query-service/collectorsimulator/inmemoryexporter/config.go new file mode 100644 index 0000000000..5b23b041ce --- /dev/null +++ b/pkg/query-service/collectorsimulator/inmemoryexporter/config.go @@ -0,0 +1,16 @@ +package inmemoryexporter + +import "fmt" + +type Config struct { + // Unique id for the exporter. + // Useful for getting a hold of the exporter in code that doesn't control its instantiation. + Id string `mapstructure:"id"` +} + +func (c *Config) Validate() error { + if len(c.Id) < 1 { + return fmt.Errorf("inmemory exporter: id is required") + } + return nil +} diff --git a/pkg/query-service/collectorsimulator/inmemoryexporter/config_test.go b/pkg/query-service/collectorsimulator/inmemoryexporter/config_test.go new file mode 100644 index 0000000000..29749757dc --- /dev/null +++ b/pkg/query-service/collectorsimulator/inmemoryexporter/config_test.go @@ -0,0 +1,48 @@ +package inmemoryexporter + +import ( + "testing" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/confmap" +) + +func TestValidate(t *testing.T) { + tests := []struct { + name string + rawConf *confmap.Conf + errorExpected bool + }{ + { + name: "with id", + rawConf: confmap.NewFromStringMap(map[string]interface{}{ + "id": "test_exporter", + }), + errorExpected: false, + }, + { + name: "empty id", + rawConf: confmap.NewFromStringMap(map[string]interface{}{ + "id": "", + }), + errorExpected: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + factory := NewFactory() + cfg := factory.CreateDefaultConfig() + err := component.UnmarshalConfig(tt.rawConf, cfg) + require.NoError(t, err, "could not UnmarshalConfig") + + err = component.ValidateConfig(cfg) + if tt.errorExpected { + require.NotNilf(t, err, "Invalid config did not return validation error: %v", cfg) + } else { + require.NoErrorf(t, err, "Valid config returned validation error: %v", cfg) + } + }) + } +} diff --git a/pkg/query-service/collectorsimulator/inmemoryexporter/exporter.go b/pkg/query-service/collectorsimulator/inmemoryexporter/exporter.go new file mode 100644 index 0000000000..3cff186016 --- /dev/null +++ b/pkg/query-service/collectorsimulator/inmemoryexporter/exporter.go @@ -0,0 +1,86 @@ +package inmemoryexporter + +import ( + "context" + "fmt" + "sync" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/plog" +) + +// An in-memory exporter for testing and generating previews. +type InMemoryExporter struct { + // Unique identifier for the exporter. + id string + // mu protects the data below + mu sync.Mutex + // slice of pdata.Logs that were received by this exporter. + logs []plog.Logs +} + +// ConsumeLogs implements component.LogsExporter. +func (e *InMemoryExporter) ConsumeLogs(ctx context.Context, ld plog.Logs) error { + e.mu.Lock() + defer e.mu.Unlock() + + e.logs = append(e.logs, ld) + return nil +} + +func (e *InMemoryExporter) GetLogs() []plog.Logs { + e.mu.Lock() + defer e.mu.Unlock() + + return e.logs +} + +func (e *InMemoryExporter) ResetLogs() { + e.mu.Lock() + defer e.mu.Unlock() + + e.logs = nil +} + +func (e *InMemoryExporter) Capabilities() consumer.Capabilities { + return consumer.Capabilities{MutatesData: false} +} + +// Keep track of all exporter instances in the process. +// Useful for getting a hold of the exporter in scenarios where one doesn't +// create the instances. Eg: bringing up a collector service from collector config +var allExporterInstances map[string]*InMemoryExporter +var allExportersLock sync.Mutex + +func init() { + allExporterInstances = make(map[string]*InMemoryExporter) +} + +func GetExporterInstance(id string) *InMemoryExporter { + return allExporterInstances[id] +} + +func CleanupInstance(exporterId string) { + allExportersLock.Lock() + defer allExportersLock.Unlock() + + delete(allExporterInstances, exporterId) +} + +func (e *InMemoryExporter) Start(ctx context.Context, host component.Host) error { + allExportersLock.Lock() + defer allExportersLock.Unlock() + + if allExporterInstances[e.id] != nil { + return fmt.Errorf("exporter with id %s is already running", e.id) + } + + allExporterInstances[e.id] = e + return nil +} + +func (e *InMemoryExporter) Shutdown(ctx context.Context) error { + CleanupInstance(e.id) + return nil +} diff --git a/pkg/query-service/collectorsimulator/inmemoryexporter/exporter_test.go b/pkg/query-service/collectorsimulator/inmemoryexporter/exporter_test.go new file mode 100644 index 0000000000..4fe4753d72 --- /dev/null +++ b/pkg/query-service/collectorsimulator/inmemoryexporter/exporter_test.go @@ -0,0 +1,67 @@ +package inmemoryexporter + +import ( + "context" + "testing" + + "github.com/google/uuid" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/confmap" + "go.opentelemetry.io/collector/exporter" +) + +func TestExporterLifecycle(t *testing.T) { + require := require.New(t) + testExporterId := uuid.NewString() + + // Should be able to get a hold of the exporter after starting it. + require.Nil(GetExporterInstance(testExporterId)) + + constructed, err := makeTestExporter(testExporterId) + require.Nil(err, "could not make test exporter") + + err = constructed.Start(context.Background(), componenttest.NewNopHost()) + require.Nil(err, "could not start test exporter") + + testExporter := GetExporterInstance(testExporterId) + require.NotNil(testExporter, "could not get exporter instance by Id") + + // Should not be able to start 2 exporters with the same id + constructed2, err := makeTestExporter(testExporterId) + require.Nil(err, "could not create second exporter with same id") + + err = constructed2.Start(context.Background(), componenttest.NewNopHost()) + require.NotNil(err, "should not be able to start another exporter with same id before shutting down the previous one") + + // Should not be able to get a hold of an exporter after shutdown + testExporter.Shutdown(context.Background()) + require.Nil(GetExporterInstance(testExporterId), "should not be able to find exporter instance after shutdown") + + // Should be able to start a new exporter with same id after shutting down + constructed3, err := makeTestExporter(testExporterId) + require.Nil(err, "could not make exporter with same Id after shutting down previous one") + + err = constructed3.Start(context.Background(), componenttest.NewNopHost()) + require.Nil(err, "should be able to start another exporter with same id after shutting down the previous one") + + testExporter3 := GetExporterInstance(testExporterId) + require.NotNil(testExporter3, "could not get exporter instance by Id") + + testExporter3.Shutdown(context.Background()) + require.Nil(GetExporterInstance(testExporterId)) +} + +func makeTestExporter(exporterId string) (exporter.Logs, error) { + factory := NewFactory() + + cfg := factory.CreateDefaultConfig() + component.UnmarshalConfig(confmap.NewFromStringMap( + map[string]interface{}{"id": exporterId}), cfg, + ) + + return factory.CreateLogsExporter( + context.Background(), exporter.CreateSettings{}, cfg, + ) +} diff --git a/pkg/query-service/collectorsimulator/inmemoryexporter/factory.go b/pkg/query-service/collectorsimulator/inmemoryexporter/factory.go new file mode 100644 index 0000000000..7752693060 --- /dev/null +++ b/pkg/query-service/collectorsimulator/inmemoryexporter/factory.go @@ -0,0 +1,34 @@ +package inmemoryexporter + +import ( + "context" + + "github.com/google/uuid" + "github.com/pkg/errors" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/exporter" +) + +func createDefaultConfig() component.Config { + return &Config{ + Id: uuid.NewString(), + } +} + +func createLogsExporter( + _ context.Context, _ exporter.CreateSettings, config component.Config, +) (exporter.Logs, error) { + if err := component.ValidateConfig(config); err != nil { + return nil, errors.Wrap(err, "invalid inmemory exporter config") + } + return &InMemoryExporter{ + id: config.(*Config).Id, + }, nil +} + +func NewFactory() exporter.Factory { + return exporter.NewFactory( + "memory", + createDefaultConfig, + exporter.WithLogs(createLogsExporter, component.StabilityLevelBeta)) +} diff --git a/pkg/query-service/collectorsimulator/inmemoryexporter/factory_test.go b/pkg/query-service/collectorsimulator/inmemoryexporter/factory_test.go new file mode 100644 index 0000000000..1a9481169a --- /dev/null +++ b/pkg/query-service/collectorsimulator/inmemoryexporter/factory_test.go @@ -0,0 +1,28 @@ +package inmemoryexporter + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/exporter" +) + +func TestCreateDefaultConfig(t *testing.T) { + factory := NewFactory() + cfg := factory.CreateDefaultConfig() + assert.NotNil(t, cfg, "failed to create default config") + assert.NoError(t, componenttest.CheckConfigStruct(cfg)) +} + +func TestCreateLogsExporter(t *testing.T) { + factory := NewFactory() + cfg := factory.CreateDefaultConfig() + + te, err := factory.CreateLogsExporter( + context.Background(), exporter.CreateSettings{}, cfg, + ) + assert.NoError(t, err) + assert.NotNil(t, te) +} diff --git a/pkg/query-service/collectorsimulator/inmemoryreceiver/config.go b/pkg/query-service/collectorsimulator/inmemoryreceiver/config.go new file mode 100644 index 0000000000..6df842ce3e --- /dev/null +++ b/pkg/query-service/collectorsimulator/inmemoryreceiver/config.go @@ -0,0 +1,16 @@ +package inmemoryreceiver + +import "fmt" + +type Config struct { + // Unique id for the receiver. + // Useful for getting a hold of the receiver in code that doesn't control its instantiation. + Id string `mapstructure:"id"` +} + +func (c *Config) Validate() error { + if len(c.Id) < 1 { + return fmt.Errorf("inmemory receiver: id is required") + } + return nil +} diff --git a/pkg/query-service/collectorsimulator/inmemoryreceiver/config_test.go b/pkg/query-service/collectorsimulator/inmemoryreceiver/config_test.go new file mode 100644 index 0000000000..a0daf71c45 --- /dev/null +++ b/pkg/query-service/collectorsimulator/inmemoryreceiver/config_test.go @@ -0,0 +1,48 @@ +package inmemoryreceiver + +import ( + "testing" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/confmap" +) + +func TestValidate(t *testing.T) { + tests := []struct { + name string + rawConf *confmap.Conf + errorExpected bool + }{ + { + name: "with id", + rawConf: confmap.NewFromStringMap(map[string]interface{}{ + "id": "test_receiver", + }), + errorExpected: false, + }, + { + name: "empty id", + rawConf: confmap.NewFromStringMap(map[string]interface{}{ + "id": "", + }), + errorExpected: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + factory := NewFactory() + cfg := factory.CreateDefaultConfig() + err := component.UnmarshalConfig(tt.rawConf, cfg) + require.NoError(t, err, "could not UnmarshalConfig") + + err = component.ValidateConfig(cfg) + if tt.errorExpected { + require.NotNilf(t, err, "Invalid config did not return validation error: %v", cfg) + } else { + require.NoErrorf(t, err, "Valid config returned validation error: %v", cfg) + } + }) + } +} diff --git a/pkg/query-service/collectorsimulator/inmemoryreceiver/factory.go b/pkg/query-service/collectorsimulator/inmemoryreceiver/factory.go new file mode 100644 index 0000000000..9db222cc43 --- /dev/null +++ b/pkg/query-service/collectorsimulator/inmemoryreceiver/factory.go @@ -0,0 +1,41 @@ +package inmemoryreceiver + +import ( + "context" + + "github.com/google/uuid" + "github.com/pkg/errors" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/receiver" +) + +func createDefaultConfig() component.Config { + return &Config{ + Id: uuid.NewString(), + } +} + +func createLogsReceiver( + _ context.Context, + _ receiver.CreateSettings, + config component.Config, + consumer consumer.Logs, +) (receiver.Logs, error) { + if err := component.ValidateConfig(config); err != nil { + return nil, errors.Wrap(err, "invalid inmemory receiver config") + } + return &InMemoryReceiver{ + id: config.(*Config).Id, + nextConsumer: consumer, + }, nil + +} + +// NewFactory creates a new OTLP receiver factory. +func NewFactory() receiver.Factory { + return receiver.NewFactory( + "memory", + createDefaultConfig, + receiver.WithLogs(createLogsReceiver, component.StabilityLevelBeta)) +} diff --git a/pkg/query-service/collectorsimulator/inmemoryreceiver/factory_test.go b/pkg/query-service/collectorsimulator/inmemoryreceiver/factory_test.go new file mode 100644 index 0000000000..7bdcd80bee --- /dev/null +++ b/pkg/query-service/collectorsimulator/inmemoryreceiver/factory_test.go @@ -0,0 +1,29 @@ +package inmemoryreceiver + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/receiver" +) + +func TestCreateDefaultConfig(t *testing.T) { + factory := NewFactory() + cfg := factory.CreateDefaultConfig() + assert.NotNil(t, cfg, "failed to create default config") + assert.NoError(t, componenttest.CheckConfigStruct(cfg)) +} + +func TestCreateLogsReceiver(t *testing.T) { + factory := NewFactory() + cfg := factory.CreateDefaultConfig() + + te, err := factory.CreateLogsReceiver( + context.Background(), receiver.CreateSettings{}, cfg, consumertest.NewNop(), + ) + assert.NoError(t, err) + assert.NotNil(t, te) +} diff --git a/pkg/query-service/collectorsimulator/inmemoryreceiver/receiver.go b/pkg/query-service/collectorsimulator/inmemoryreceiver/receiver.go new file mode 100644 index 0000000000..d4b0a2abfe --- /dev/null +++ b/pkg/query-service/collectorsimulator/inmemoryreceiver/receiver.go @@ -0,0 +1,64 @@ +package inmemoryreceiver + +import ( + "context" + "fmt" + "sync" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/plog" +) + +// In memory receiver for testing and simulation +type InMemoryReceiver struct { + // Unique identifier for the receiver. + id string + + nextConsumer consumer.Logs +} + +func (r *InMemoryReceiver) ConsumeLogs(ctx context.Context, ld plog.Logs) error { + return r.nextConsumer.ConsumeLogs(ctx, ld) +} + +func (r *InMemoryReceiver) Capabilities() consumer.Capabilities { + return consumer.Capabilities{MutatesData: false} +} + +// Keep track of all receiver instances in the process. +// Useful for getting a hold of the receiver in scenarios where one doesn't +// create the instances. Eg: bringing up a collector service from collector config +var allReceiverInstances map[string]*InMemoryReceiver +var allReceiversLock sync.Mutex + +func init() { + allReceiverInstances = make(map[string]*InMemoryReceiver) +} + +func CleanupInstance(receiverId string) { + allReceiversLock.Lock() + defer allReceiversLock.Unlock() + delete(allReceiverInstances, receiverId) +} + +func (r *InMemoryReceiver) Start(ctx context.Context, host component.Host) error { + allReceiversLock.Lock() + defer allReceiversLock.Unlock() + + if allReceiverInstances[r.id] != nil { + return fmt.Errorf("receiver with id %s is already running", r.id) + } + + allReceiverInstances[r.id] = r + return nil +} + +func (r *InMemoryReceiver) Shutdown(ctx context.Context) error { + CleanupInstance(r.id) + return nil +} + +func GetReceiverInstance(id string) *InMemoryReceiver { + return allReceiverInstances[id] +} diff --git a/pkg/query-service/collectorsimulator/inmemoryreceiver/receiver_test.go b/pkg/query-service/collectorsimulator/inmemoryreceiver/receiver_test.go new file mode 100644 index 0000000000..4fe7169cc7 --- /dev/null +++ b/pkg/query-service/collectorsimulator/inmemoryreceiver/receiver_test.go @@ -0,0 +1,68 @@ +package inmemoryreceiver + +import ( + "context" + "testing" + + "github.com/google/uuid" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/confmap" + "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/receiver" +) + +func TestReceiverLifecycle(t *testing.T) { + require := require.New(t) + testReceiverId := uuid.NewString() + + // Should be able to get a hold of the receiver after starting it. + require.Nil(GetReceiverInstance(testReceiverId), "receiver instance should not exist before Start()") + + constructed, err := makeTestLogReceiver(testReceiverId) + require.Nil(err, "could not make test receiver") + + err = constructed.Start(context.Background(), componenttest.NewNopHost()) + require.Nil(err, "could not start test receiver") + + testReceiver := GetReceiverInstance(testReceiverId) + require.NotNil(testReceiver, "could not get receiver instance by Id") + + // Should not be able to start 2 receivers with the same id + constructed2, err := makeTestLogReceiver(testReceiverId) + require.Nil(err, "could not create second receiver with same id") + + err = constructed2.Start(context.Background(), componenttest.NewNopHost()) + require.NotNil(err, "should not be able to start another receiver with same id before shutting down the previous one") + + // Should not be able to get a hold of an receiver after shutdown + testReceiver.Shutdown(context.Background()) + require.Nil(GetReceiverInstance(testReceiverId), "should not be able to find inmemory receiver after shutdown") + + // Should be able to start a new receiver with same id after shutting down + constructed3, err := makeTestLogReceiver(testReceiverId) + require.Nil(err, "could not make receiver with same Id after shutting down old one") + + err = constructed3.Start(context.Background(), componenttest.NewNopHost()) + require.Nil(err, "should be able to start another receiver with same id after shutting down the previous one") + + testReceiver3 := GetReceiverInstance(testReceiverId) + require.NotNil(testReceiver3, "could not get receiver instance by Id") + + testReceiver3.Shutdown(context.Background()) + require.Nil(GetReceiverInstance(testReceiverId)) +} + +func makeTestLogReceiver(receiverId string) (receiver.Logs, error) { + factory := NewFactory() + + cfg := factory.CreateDefaultConfig() + component.UnmarshalConfig(confmap.NewFromStringMap( + map[string]interface{}{"id": receiverId}), cfg, + ) + + return factory.CreateLogsReceiver( + context.Background(), receiver.CreateSettings{}, cfg, consumertest.NewNop(), + ) +} diff --git a/pkg/query-service/collectorsimulator/logs.go b/pkg/query-service/collectorsimulator/logs.go new file mode 100644 index 0000000000..ab445f79eb --- /dev/null +++ b/pkg/query-service/collectorsimulator/logs.go @@ -0,0 +1,122 @@ +package collectorsimulator + +import ( + "context" + "fmt" + "time" + + "github.com/pkg/errors" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/processor" + "go.signoz.io/signoz/pkg/query-service/model" +) + +type ProcessorConfig struct { + Name string + Config map[string]interface{} +} + +// Simulate processing of logs through the otel collector. +// Useful for testing, validation and generating previews. +func SimulateLogsProcessing( + ctx context.Context, + processorFactories map[component.Type]processor.Factory, + processorConfigs []ProcessorConfig, + logs []plog.Logs, + timeout time.Duration, +) ( + outputLogs []plog.Logs, collectorErrs []string, apiErr *model.ApiError, +) { + // Construct and start a simulator (wraps a collector service) + simulator, apiErr := NewCollectorSimulator( + ctx, component.DataTypeLogs, processorFactories, processorConfigs, + ) + if apiErr != nil { + return nil, nil, model.WrapApiError(apiErr, "could not create logs processing simulator") + } + + simulatorCleanup, apiErr := simulator.Start(ctx) + // We can not rely on collector service to shutdown successfully and cleanup refs to inmemory components. + defer simulatorCleanup() + if apiErr != nil { + return nil, nil, apiErr + } + + // Do the simulation + for _, plog := range logs { + apiErr = SendLogsToSimulator(ctx, simulator, plog) + if apiErr != nil { + return nil, nil, model.WrapApiError(apiErr, "could not consume logs for simulation") + } + } + + result, apiErr := GetProcessedLogsFromSimulator( + simulator, len(logs), timeout, + ) + if apiErr != nil { + return nil, nil, model.InternalError(model.WrapApiError(apiErr, + "could not get processed logs from simulator", + )) + } + + // Shut down the simulator + simulationErrs, apiErr := simulator.Shutdown(ctx) + if apiErr != nil { + return nil, simulationErrs, model.WrapApiError(apiErr, + "could not shutdown logs processing simulator", + ) + } + + return result, simulationErrs, nil +} + +func SendLogsToSimulator( + ctx context.Context, + simulator *CollectorSimulator, + plog plog.Logs, +) *model.ApiError { + receiver := simulator.GetReceiver() + if receiver == nil { + return model.InternalError(fmt.Errorf("could not find in memory receiver for simulator")) + } + if err := receiver.ConsumeLogs(ctx, plog); err != nil { + return model.InternalError(errors.Wrap(err, + "inmemory receiver could not consume logs for simulation", + )) + } + return nil +} + +func GetProcessedLogsFromSimulator( + simulator *CollectorSimulator, + minLogCount int, + timeout time.Duration, +) ( + []plog.Logs, *model.ApiError, +) { + exporter := simulator.GetExporter() + if exporter == nil { + return nil, model.InternalError(fmt.Errorf("could not find in memory exporter for simulator")) + } + + // Must do a time based wait to ensure all logs come through. + // For example, logstransformprocessor does internal batching and it + // takes (processorCount * batchTime) for logs to get through. + startTsMillis := time.Now().UnixMilli() + for { + elapsedMillis := time.Now().UnixMilli() - startTsMillis + if elapsedMillis > timeout.Milliseconds() { + break + } + + exportedLogs := exporter.GetLogs() + if len(exportedLogs) >= minLogCount { + return exportedLogs, nil + } + + time.Sleep(50 * time.Millisecond) + } + + return exporter.GetLogs(), nil +} diff --git a/pkg/query-service/collectorsimulator/logs_test.go b/pkg/query-service/collectorsimulator/logs_test.go new file mode 100644 index 0000000000..796d19f00f --- /dev/null +++ b/pkg/query-service/collectorsimulator/logs_test.go @@ -0,0 +1,113 @@ +package collectorsimulator + +import ( + "context" + "testing" + "time" + + "github.com/knadh/koanf/parsers/yaml" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/logstransformprocessor" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/processor" +) + +func TestLogsProcessingSimulation(t *testing.T) { + require := require.New(t) + + inputLogs := []plog.Logs{ + makeTestPlog("test log 1", map[string]string{ + "method": "GET", + }), + makeTestPlog("test log 2", map[string]string{ + "method": "POST", + }), + } + + testLogstransformConf1, err := yaml.Parser().Unmarshal([]byte(` + operators: + - type: router + id: router_signoz + routes: + - output: add + expr: attributes.method == "GET" + default: noop + - type: add + id: add + field: attributes.test + value: test-value-get + - type: noop + id: noop + `)) + require.Nil(err, "could not unmarshal test logstransform op config") + testProcessor1 := ProcessorConfig{ + Name: "logstransform/test", + Config: testLogstransformConf1, + } + + testLogstransformConf2, err := yaml.Parser().Unmarshal([]byte(` + operators: + - type: router + id: router_signoz + routes: + - output: add + expr: attributes.method == "POST" + default: noop + - type: add + id: add + field: attributes.test + value: test-value-post + - type: noop + id: noop + `)) + require.Nil(err, "could not unmarshal test logstransform op config") + testProcessor2 := ProcessorConfig{ + Name: "logstransform/test2", + Config: testLogstransformConf2, + } + + processorFactories, err := processor.MakeFactoryMap( + logstransformprocessor.NewFactory(), + ) + require.Nil(err, "could not create processors factory map") + + outputLogs, collectorErrs, apiErr := SimulateLogsProcessing( + context.Background(), + processorFactories, + []ProcessorConfig{testProcessor1, testProcessor2}, + inputLogs, + 300*time.Millisecond, + ) + require.Nil(apiErr, apiErr.ToError().Error()) + require.Equal(len(collectorErrs), 0) + + for _, l := range outputLogs { + rl := l.ResourceLogs().At(0) + sl := rl.ScopeLogs().At(0) + record := sl.LogRecords().At(0) + method, exists := record.Attributes().Get("method") + require.True(exists) + testVal, exists := record.Attributes().Get("test") + require.True(exists) + if method.Str() == "GET" { + require.Equal(testVal.Str(), "test-value-get") + } else { + require.Equal(testVal.Str(), "test-value-post") + } + } +} + +func makeTestPlog(body string, attrsStr map[string]string) plog.Logs { + pl := plog.NewLogs() + rl := pl.ResourceLogs().AppendEmpty() + + scopeLog := rl.ScopeLogs().AppendEmpty() + slRecord := scopeLog.LogRecords().AppendEmpty() + slRecord.Body().SetStr(body) + slAttribs := slRecord.Attributes() + for k, v := range attrsStr { + slAttribs.PutStr(k, v) + } + + return pl +}