Skip to content

Commit

Permalink
fix: solved some issues of sampling api
Browse files Browse the repository at this point in the history
  • Loading branch information
mindhash authored and mindhash committed Feb 21, 2023
1 parent afba00e commit 9cd966b
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 11 deletions.
4 changes: 3 additions & 1 deletion ee/query-service/app/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,9 @@ func (ah *APIHandler) RegisterRoutes(router *mux.Router) {
Methods(http.MethodPost)

router.HandleFunc("/api/v1/samplingRules/{version}",
baseapp.AdminAccess(ah.listSamplingRules)).
// todo(amol): commented for testing
//baseapp.AdminAccess(ah.listSamplingRules)).
ah.listSamplingRules).
Methods(http.MethodGet)

router.HandleFunc("/api/v1/samplingRules",
Expand Down
13 changes: 11 additions & 2 deletions ee/query-service/ingestionRules/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ingestionRules

import (
"context"
"fmt"

"github.com/jmoiron/sqlx"
"go.signoz.io/signoz/ee/query-service/model"
Expand Down Expand Up @@ -113,8 +114,9 @@ func (ic *IngestionController) ApplyDropRules(ctx context.Context, postable []Po

if err != nil {
zap.S().Errorf("failed to insert drop rules into agent config", filterConfig, err)
return response, model.InternalErrorStr("failed to apply drop rules ")
return response, model.InternalErrorStr(fmt.Sprintf("failed to apply drop rules: %s", err.Error()))
}

return response, nil
}

Expand Down Expand Up @@ -143,6 +145,13 @@ func (ic *IngestionController) ApplySamplingRules(ctx context.Context, postable

// scan through postable rules, to select the existing rules or insert missing ones
for _, r := range postable {
if apierr := r.IsValid(); apierr != nil {
return nil, apierr
}

if err := r.Config.SamplingConfig.Valid(); err != nil {
return nil, model.BadRequest(err)
}

// note: we process only new and changed rules here, deleted rules are not expected
// from client. if user deletes a rule, the client should not send that rule in the update.
Expand Down Expand Up @@ -206,7 +215,7 @@ func (ic *IngestionController) ApplySamplingRules(ctx context.Context, postable

if err != nil {
zap.S().Errorf("failed to insert sampling rules into agent config: ", params, err)
return response, model.InternalErrorStr("failed to apply sampling rules ")
return response, model.InternalErrorStr(fmt.Sprintf("failed to apply drop rules: %s", err.Error()))
}
return response, nil
}
10 changes: 9 additions & 1 deletion ee/query-service/ingestionRules/postableRule.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package ingestionRules

import "go.signoz.io/signoz/ee/query-service/model"
import (
"fmt"

"go.signoz.io/signoz/ee/query-service/model"
)

// PostableIngestionRules are a list of user defined ingestion rules
type PostableIngestionRules struct {
Expand Down Expand Up @@ -35,5 +39,9 @@ func (p *PostableIngestionRule) IsValid() *model.ApiError {
return model.BadRequestStr("ingestion source is required")
}

if p.Config == nil {
return model.BadRequestStr(fmt.Sprintf("invalid config found on rule: %s", p.Name))
}

return nil
}
11 changes: 6 additions & 5 deletions ee/query-service/model/ingestionRules.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,12 +168,12 @@ type SamplingConfig struct {
}

// Valid checks the validity of a sampling rule and its conditions
func (sc *SamplingConfig) Valid() error {
if sc.Root {
if sc.Name == "" {
return fmt.Errorf("name is required for all sampling rules")
}
func (sc SamplingConfig) Valid() error {
if sc.Name == "" {
return fmt.Errorf("name is required for all sampling rules")
}

if sc.Root {
if !sc.Default && len(sc.FilterSet.Items) != 1 {
// non-default rule has no filter set, raise an error
return fmt.Errorf(fmt.Sprintf("invalid filter for sampling rule (%s)", sc.Name))
Expand All @@ -189,6 +189,7 @@ func (sc *SamplingConfig) Valid() error {
}

}

return nil
}

Expand Down
1 change: 1 addition & 0 deletions pkg/query-service/agentConf/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ func UpsertSamplingProcessor(ctx context.Context, version int, key string, confi
"tail_sampling": config,
}

opamp.AddToTracePipeline("tail_sampling")
configHash, err := opamp.UpsertTraceProcessors(ctx, []interface{}{processorConf}, m.OnConfigUpdate)
if err != nil {
zap.S().Errorf("failed to call agent config update for trace processor:", err)
Expand Down
14 changes: 12 additions & 2 deletions pkg/query-service/app/opamp/opamp_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ const capabilities = protobufs.ServerCapabilities_ServerCapabilities_AcceptsEffe
protobufs.ServerCapabilities_ServerCapabilities_AcceptsStatus

func InitalizeServer(agents *model.Agents) error {
zap.S().Info("initiated opamp server....")
opAmpServer = &Server{
agents: agents,
}
Expand Down Expand Up @@ -91,15 +92,24 @@ func UpsertTraceProcessors(ctx context.Context, processors []interface{}, callba

fmt.Println("processors:", processors)
fmt.Println("trace pipeline:", tracesPipelinePlan)

confHash := ""

if opAmpServer == nil {
if err := InitalizeServer(&model.AllAgents); err != nil {
return confHash, fmt.Errorf("opamp server is down, unable to push config to agent at this moment")
}
}

agents := opAmpServer.agents.GetAllAgents()
if len(agents) == 0 {
return confHash, fmt.Errorf("no agents available at the moment")
}
x := map[string]interface{}{
"processors": processors,
}

updatedProcessors := confmap.NewFromStringMap(x)

agents := opAmpServer.agents.GetAllAgents()
for _, agent := range agents {
config := agent.EffectiveConfig
c, err := yaml.Parser().Unmarshal([]byte(config))
Expand Down
10 changes: 10 additions & 0 deletions pkg/query-service/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ import (
"github.com/soheilhy/cmux"
"go.signoz.io/signoz/pkg/query-service/app/clickhouseReader"
"go.signoz.io/signoz/pkg/query-service/app/dashboards"
"go.signoz.io/signoz/pkg/query-service/app/opamp"
opAmpModel "go.signoz.io/signoz/pkg/query-service/app/opamp/model"

"go.signoz.io/signoz/pkg/query-service/constants"
"go.signoz.io/signoz/pkg/query-service/dao"
"go.signoz.io/signoz/pkg/query-service/featureManager"
Expand Down Expand Up @@ -139,6 +142,11 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {

s.privateHTTP = privateServer

localDB, err = opAmpModel.InitDB(constants.RELATIONAL_DATASOURCE_PATH)
if err != nil {
return nil, err
}

return s, nil
}

Expand Down Expand Up @@ -433,6 +441,8 @@ func (s *Server) Start() error {

}()

opamp.InitalizeServer(&opAmpModel.AllAgents)

return nil
}

Expand Down

0 comments on commit 9cd966b

Please sign in to comment.