Skip to content

Commit

Permalink
Feat: add api /api/v1/helper for debugging (#291)
Browse files Browse the repository at this point in the history
  • Loading branch information
ethfoo authored Sep 1, 2022
1 parent 9637d74 commit d0c5305
Show file tree
Hide file tree
Showing 10 changed files with 515 additions and 104 deletions.
7 changes: 5 additions & 2 deletions cmd/loggie/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/loggie-io/loggie/pkg/discovery/kubernetes"
"github.com/loggie-io/loggie/pkg/eventbus"
_ "github.com/loggie-io/loggie/pkg/include"
"github.com/loggie-io/loggie/pkg/ops/helper"
"github.com/loggie-io/loggie/pkg/util/yaml"
"github.com/pkg/errors"
"go.uber.org/automaxprocs/maxprocs"
Expand Down Expand Up @@ -105,8 +106,7 @@ func main() {

if syscfg.Loggie.Reload.Enabled {
syscfg.Loggie.Reload.ConfigPath = pipelineConfigPath
rld := reloader.NewReloader(controller, &syscfg.Loggie.Reload)
go rld.Run(stopCh)
reloader.Setup(stopCh, controller, &syscfg.Loggie.Reload)
}

if syscfg.Loggie.Discovery.Enabled {
Expand All @@ -118,6 +118,9 @@ func main() {
go k8sDiscovery.Start(stopCh)
}

// api for debugging
helper.Setup(controller)

if syscfg.Loggie.Http.Enabled {
go func() {
if err = http.ListenAndServe(fmt.Sprintf("%s:%d", syscfg.Loggie.Http.Host, syscfg.Loggie.Http.Port), nil); err != nil {
Expand Down
21 changes: 0 additions & 21 deletions pkg/control/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ limitations under the License.
package control

import (
"github.com/loggie-io/loggie/pkg/util/yaml"
"net/http"
_ "net/http/pprof"
"time"

Expand All @@ -28,12 +26,6 @@ import (
"github.com/loggie-io/loggie/pkg/pipeline"
)

const handleCurrentPipelines = "/api/v1/controller/pipelines"

func (c *Controller) initHttp() {
http.HandleFunc(handleCurrentPipelines, c.currentPipelinesHandler)
}

type Controller struct {
CurrentConfig *PipelineConfig
pipelineRunner map[string]*pipeline.Pipeline
Expand Down Expand Up @@ -144,16 +136,3 @@ func (c *Controller) reportMetric(p pipeline.Config, eventType eventbus.Componen
ComponentConfigs: componentConfigs,
})
}

func (c *Controller) currentPipelinesHandler(writer http.ResponseWriter, request *http.Request) {
data, err := yaml.Marshal(c.CurrentConfig)
if err != nil {
log.Warn("marshal current pipeline config err: %v", err)
writer.WriteHeader(http.StatusInternalServerError)
writer.Write([]byte(err.Error()))
return
}

writer.WriteHeader(http.StatusOK)
writer.Write(data)
}
42 changes: 42 additions & 0 deletions pkg/control/http.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
Copyright 2022 Loggie Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package control

import (
"github.com/loggie-io/loggie/pkg/core/log"
"github.com/loggie-io/loggie/pkg/util/yaml"
"net/http"
)

const HandleCurrentPipelines = "/api/v1/controller/pipelines"

func (c *Controller) initHttp() {
http.HandleFunc(HandleCurrentPipelines, c.currentPipelinesHandler)
}

func (c *Controller) currentPipelinesHandler(writer http.ResponseWriter, request *http.Request) {
data, err := yaml.Marshal(c.CurrentConfig)
if err != nil {
log.Warn("marshal current pipeline config err: %v", err)
writer.WriteHeader(http.StatusInternalServerError)
writer.Write([]byte(err.Error()))
return
}

writer.WriteHeader(http.StatusOK)
writer.Write(data)
}
8 changes: 4 additions & 4 deletions pkg/core/reloader/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ import (
"github.com/loggie-io/loggie/pkg/core/log"
)

const handleReadPipelineConfigPath = "/api/v1/reload/config"
const HandleReadPipelineConfigPath = "/api/v1/reload/config"

func (r *Reloader) initHttp() {
http.HandleFunc(handleReadPipelineConfigPath, r.readPipelineConfigHandler)
func (r *reloader) initHttp() {
http.HandleFunc(HandleReadPipelineConfigPath, r.readPipelineConfigHandler)
}

func (r *Reloader) readPipelineConfigHandler(writer http.ResponseWriter, request *http.Request) {
func (r *reloader) readPipelineConfigHandler(writer http.ResponseWriter, request *http.Request) {
matches, err := filepath.Glob(r.config.ConfigPath)
if err != nil {
log.Info("glob match path %s error: %v", r.config.ConfigPath, err)
Expand Down
74 changes: 42 additions & 32 deletions pkg/core/reloader/reload.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ import (
"github.com/loggie-io/loggie/pkg/pipeline"
)

type Reloader struct {
var globalReloader *reloader

type reloader struct {
controller *control.Controller

config *ReloadConfig
Expand All @@ -45,16 +47,17 @@ type ReloadConfig struct {
ReloadPeriod time.Duration `yaml:"period" default:"10s"`
}

func NewReloader(controller *control.Controller, config *ReloadConfig) *Reloader {
reload := &Reloader{
func Setup(stopCh <-chan struct{}, controller *control.Controller, config *ReloadConfig) {
reload := &reloader{
controller: controller,
config: config,
}
reload.initHttp()
return reload
globalReloader = reload
go globalReloader.Run(stopCh)
}

func (r *Reloader) Run(stopCh <-chan struct{}) {
func (r *reloader) Run(stopCh <-chan struct{}) {
log.Info("reloader starting...")
t := time.NewTicker(r.config.ReloadPeriod)
defer t.Stop()
Expand All @@ -69,28 +72,12 @@ func (r *Reloader) Run(stopCh <-chan struct{}) {
// If there is at least one pipeline not running, we will not ignore the configuration, and always try to restart the pipeline
r.controller.RetryNotRunningPipeline()

// read and validate config files
newConfig, err := control.ReadPipelineConfigFromFile(r.config.ConfigPath, func(s os.FileInfo) bool {
newConfig, _, stopList, startList := DiffPipelineConfigs(func(s os.FileInfo) bool {
if time.Since(s.ModTime()) > 6*r.config.ReloadPeriod {
return true
}
return false
})
if err != nil && !os.IsNotExist(err) {
if errors.Is(err, control.ErrIgnoreAllFile) {
continue
}

log.Error("read pipeline config file error: %v", err)
continue
}

if newConfig == nil {
newConfig = &control.PipelineConfig{}
}

// diff config
stopList, startList := diffConfig(newConfig, r.controller.CurrentConfig)

if len(stopList) > 0 || len(startList) > 0 {
log.Info("loggie is reloading..")
Expand Down Expand Up @@ -123,7 +110,29 @@ func (r *Reloader) Run(stopCh <-chan struct{}) {
}
}

func diffConfig(newConfig *control.PipelineConfig, oldConfig *control.PipelineConfig) (stopComponentList []pipeline.Config, startComponentList []pipeline.Config) {
func DiffPipelineConfigs(ignoreFunc control.FileIgnore) (newCfg *control.PipelineConfig, diffPipes []string, stopComponentList []pipeline.Config, startComponentList []pipeline.Config) {
// read and validate config files
newConfig, err := control.ReadPipelineConfigFromFile(globalReloader.config.ConfigPath, ignoreFunc)
if err != nil && !os.IsNotExist(err) {
if errors.Is(err, control.ErrIgnoreAllFile) {
return nil, nil, nil, nil
}

log.Error("read pipeline config file error: %v", err)
return nil, nil, nil, nil
}

// Empty configuration cannot be ignored, because if it is empty configuration, it may be necessary to stop the current pipeline
if newConfig == nil {
newConfig = &control.PipelineConfig{}
}

// diff config
diffs, stopList, startList := diffConfig(newConfig, globalReloader.controller.CurrentConfig)
return newConfig, diffs, stopList, startList
}

func diffConfig(newConfig *control.PipelineConfig, oldConfig *control.PipelineConfig) (diffList []string, stopComponentList []pipeline.Config, startComponentList []pipeline.Config) {
oldPipeIndex := make(map[string]pipeline.Config)
for _, p := range oldConfig.Pipelines {
oldPipeIndex[p.Name] = p
Expand All @@ -150,6 +159,7 @@ func diffConfig(newConfig *control.PipelineConfig, oldConfig *control.PipelineCo
}))
})

var diffs []string
for _, newPipe := range newConfig.Pipelines {
oldPipe, ok := oldPipeIndex[newPipe.Name]
if !ok {
Expand All @@ -160,21 +170,21 @@ func diffConfig(newConfig *control.PipelineConfig, oldConfig *control.PipelineCo
delete(oldPipeIndex, oldPipe.Name)

// diff
equal := cmp.Equal(oldPipe, newPipe, sourceComparer, interceptorComparer)
if !equal {
startList = append(startList, newPipe)
stopList = append(stopList, oldPipe)
}
if !equal && log.IsDebugLevel() {
diff := cmp.Diff(oldPipe, newPipe, sourceComparer, interceptorComparer)
log.Debug("diff pipeline config: \n%s", diff)
if cmp.Equal(oldPipe, newPipe, sourceComparer, interceptorComparer) {
continue
}

startList = append(startList, newPipe)
stopList = append(stopList, oldPipe)
diff := cmp.Diff(oldPipe, newPipe, sourceComparer, interceptorComparer)
log.Info("diff pipeline %s: \n%s", newPipe.Name, diff)
diffs = append(diffs, diff)
}

// add old pipelines to stopList
for k := range oldPipeIndex {
stopList = append(stopList, oldPipeIndex[k])
}

return stopList, startList
return diffs, stopList, startList
}
2 changes: 1 addition & 1 deletion pkg/eventbus/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ type ErrorMetricData struct {
type WatchMetricData struct {
BaseMetric
FileInfos []FileInfo
TotalFileCount int
ActiveFileCount int
InactiveFdCount int
SourceFields map[string]interface{}
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/eventbus/listener/filewatcher/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ type data struct {

FileInfo []*fileInfo `json:"info,omitempty"` // key=fileName

TotalFileCount int `json:"total"`
ActiveFileCount int `json:"active"`
InactiveFdCount int `json:"inactive"`
SourceFields map[string]interface{} `json:"sourceFields,omitempty"`
}
Expand Down Expand Up @@ -118,11 +118,11 @@ func (l *Listener) exportPrometheus() {
m1 := promeExporter.ExportedMetrics{
{
Desc: prometheus.NewDesc(
buildFQName("total_file_count"),
"file count total",
buildFQName("active_file_count"),
"active file count total",
nil, labels,
),
Eval: float64(d.TotalFileCount),
Eval: float64(d.ActiveFileCount),
ValType: prometheus.GaugeValue,
},
{
Expand Down Expand Up @@ -241,7 +241,7 @@ func (l *Listener) consumer(e eventbus.WatchMetricData) {
files = append(files, f)
}
m.FileInfo = files
m.TotalFileCount = e.TotalFileCount
m.ActiveFileCount = e.ActiveFileCount
m.InactiveFdCount = e.InactiveFdCount

l.data[key] = m
Expand Down
Loading

0 comments on commit d0c5305

Please sign in to comment.