diff --git a/pkg/query-service/app/logparsingpipeline/collector_config.go b/pkg/query-service/app/logparsingpipeline/collector_config.go index 5a810f6a8da..a954b5dab5f 100644 --- a/pkg/query-service/app/logparsingpipeline/collector_config.go +++ b/pkg/query-service/app/logparsingpipeline/collector_config.go @@ -19,21 +19,21 @@ var lockLogsPipelineSpec sync.RWMutex // check if the processors already exist // if yes then update the processor. // if something doesn't exists then remove it. -func buildLogParsingProcessors(agentConf, parsingProcessors map[string]interface{}) error { +func buildLogParsingProcessors(agentConf, pipelineProcessors map[string]interface{}) error { agentProcessors := map[string]interface{}{} if agentConf["processors"] != nil { agentProcessors = (agentConf["processors"]).(map[string]interface{}) } exists := map[string]struct{}{} - for key, params := range parsingProcessors { + for key, params := range pipelineProcessors { agentProcessors[key] = params exists[key] = struct{}{} } - // remove the old unwanted processors + // remove the old unwanted pipeline processors for k := range agentProcessors { - isPipelineProcessor := (strings.HasPrefix(k, constants.LogsPPLPfx) || strings.HasPrefix(k, constants.OldLogsPPLPfx)) - if _, ok := exists[k]; !ok && isPipelineProcessor { + _, existsInNewPipelineProcs := exists[k] + if !existsInNewPipelineProcs && isPipelineProcessor(k) { delete(agentProcessors, k) } } @@ -77,10 +77,10 @@ func buildLogsProcessors(current []string, logsParserPipeline []string) ([]strin // removed the old processors which are not used var pipeline []string - for _, v := range current { - k := v - if _, ok := exists[k]; ok || !strings.HasPrefix(k, constants.LogsPPLPfx) || strings.HasPrefix(k, constants.OldLogsPPLPfx) { - pipeline = append(pipeline, v) + for _, procName := range current { + _, isInDesiredPipelineProcs := exists[procName] + if isInDesiredPipelineProcs || !isPipelineProcessor(procName) { + pipeline = append(pipeline, procName) } } @@ -111,8 +111,7 @@ func buildLogsProcessors(current []string, logsParserPipeline []string) ([]strin m := logsParserPipeline[i] if loc, ok := specVsExistingMap[i]; ok { for j := lastMatched; j < loc; j++ { - isPipelineProcessor := strings.HasPrefix(pipeline[j], constants.LogsPPLPfx) || strings.HasPrefix(pipeline[j], constants.OldLogsPPLPfx) - if isPipelineProcessor { + if isPipelineProcessor(pipeline[j]) { delete(specVsExistingMap, existingVsSpec[j]) } else { newPipeline = append(newPipeline, pipeline[j]) @@ -226,3 +225,7 @@ func GenerateCollectorConfigWithPipelines( return updatedConf, nil } + +func isPipelineProcessor(procName string) bool { + return strings.HasPrefix(procName, constants.LogsPPLPfx) || strings.HasPrefix(procName, constants.OldLogsPPLPfx) +}