Skip to content

Commit

Permalink
chore: some cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
raj-k-singh committed Sep 26, 2024
1 parent 3fe8ab8 commit 99fbd1e
Showing 1 changed file with 14 additions and 11 deletions.
25 changes: 14 additions & 11 deletions pkg/query-service/app/logparsingpipeline/collector_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,21 @@ var lockLogsPipelineSpec sync.RWMutex
// check if the processors already exist
// if yes then update the processor.
// if something doesn't exists then remove it.
func buildLogParsingProcessors(agentConf, parsingProcessors map[string]interface{}) error {
func buildLogParsingProcessors(agentConf, pipelineProcessors map[string]interface{}) error {
agentProcessors := map[string]interface{}{}
if agentConf["processors"] != nil {
agentProcessors = (agentConf["processors"]).(map[string]interface{})
}

exists := map[string]struct{}{}
for key, params := range parsingProcessors {
for key, params := range pipelineProcessors {
agentProcessors[key] = params
exists[key] = struct{}{}
}
// remove the old unwanted processors
// remove the old unwanted pipeline processors
for k := range agentProcessors {
isPipelineProcessor := (strings.HasPrefix(k, constants.LogsPPLPfx) || strings.HasPrefix(k, constants.OldLogsPPLPfx))
if _, ok := exists[k]; !ok && isPipelineProcessor {
_, existsInNewPipelineProcs := exists[k]
if !existsInNewPipelineProcs && isPipelineProcessor(k) {
delete(agentProcessors, k)
}
}
Expand Down Expand Up @@ -77,10 +77,10 @@ func buildLogsProcessors(current []string, logsParserPipeline []string) ([]strin

// removed the old processors which are not used
var pipeline []string
for _, v := range current {
k := v
if _, ok := exists[k]; ok || !strings.HasPrefix(k, constants.LogsPPLPfx) || strings.HasPrefix(k, constants.OldLogsPPLPfx) {
pipeline = append(pipeline, v)
for _, procName := range current {
_, isInDesiredPipelineProcs := exists[procName]
if isInDesiredPipelineProcs || !isPipelineProcessor(procName) {
pipeline = append(pipeline, procName)
}
}

Expand Down Expand Up @@ -111,8 +111,7 @@ func buildLogsProcessors(current []string, logsParserPipeline []string) ([]strin
m := logsParserPipeline[i]
if loc, ok := specVsExistingMap[i]; ok {
for j := lastMatched; j < loc; j++ {
isPipelineProcessor := strings.HasPrefix(pipeline[j], constants.LogsPPLPfx) || strings.HasPrefix(pipeline[j], constants.OldLogsPPLPfx)
if isPipelineProcessor {
if isPipelineProcessor(pipeline[j]) {
delete(specVsExistingMap, existingVsSpec[j])
} else {
newPipeline = append(newPipeline, pipeline[j])
Expand Down Expand Up @@ -226,3 +225,7 @@ func GenerateCollectorConfigWithPipelines(

return updatedConf, nil
}

func isPipelineProcessor(procName string) bool {
return strings.HasPrefix(procName, constants.LogsPPLPfx) || strings.HasPrefix(procName, constants.OldLogsPPLPfx)
}

0 comments on commit 99fbd1e

Please sign in to comment.