Skip to content

Commit

Permalink
Graceful shutdown (#26)
Browse files Browse the repository at this point in the history
* Graceful shutdown

* Added graceful shutdown

* Graceful shutdown
  • Loading branch information
npololnskii authored Oct 6, 2021
1 parent d8bc10a commit 99022f5
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 10 deletions.
22 changes: 20 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@
package main

import (
"context"
"flag"
"fmt"
"github.com/anodot/anodot-common/pkg/metrics3"
"net/http"
_ "net/http/pprof"
"net/url"
"os"
"os/signal"
"runtime"
"strconv"
"strings"
"syscall"
"time"

"github.com/anodot/anodot-common/pkg/metrics3"

metrics2 "github.com/anodot/anodot-common/pkg/metrics"
anodotPrometheus "github.com/anodot/anodot-remote-write/pkg/prometheus"
"github.com/anodot/anodot-remote-write/pkg/relabling"
Expand Down Expand Up @@ -213,7 +217,21 @@ func main() {
anodotPrometheus.SendAgentStatusToBC(client, sendToBCPeriod)
}

s.InitHttp(allWorkers)
c := make(chan os.Signal, 2)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)

ctx, cancel := context.WithCancel(context.Background())

go func() {
oscall := <-c
log.Infof("system call:%+v", oscall)
cancel()

oscall = <-c
log.Fatalf("failed to finish gracefuly. system call:%+v", oscall)
}()

s.InitHttp(ctx, allWorkers)
}

func tags(envVar string) map[string]string {
Expand Down
41 changes: 39 additions & 2 deletions pkg/prometheus/server.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package prometheus

import (
"context"
"fmt"
"os"
"sync"
"time"

"github.com/anodot/anodot-remote-write/utils"
Expand All @@ -22,6 +24,8 @@ import (
"github.com/prometheus/prometheus/prompb"
)

const GRACEFUL_TIMEOUT_SECONDS int = 5

type Receiver struct {
Port int
Parser *AnodotParser
Expand Down Expand Up @@ -66,7 +70,9 @@ func (rc *Receiver) protoToSamples(req *prompb.WriteRequest) model.Samples {
return samples
}

func (rc *Receiver) InitHttp(workers []*remote.Worker) {
func (rc *Receiver) InitHttp(ctx context.Context, workers []*remote.Worker) {
var srv http.Server

log.V(2).Infof("Initializing %d remote write config(s): %s", len(workers), workers)

if os.Getenv("ANODOT_PUSH_METRICS_ENABLED") == "true" {
Expand Down Expand Up @@ -135,5 +141,36 @@ func (rc *Receiver) InitHttp(workers []*remote.Worker) {
log.V(2).Infof("Application metrics available at '*:%d/metrics' ", rc.Port)

versionInfo.With(prometheus.Labels{"version": version.VERSION, "git_sha1": version.REVISION}).Inc()
log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", rc.Port), nil))

go func() {
log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", rc.Port), nil))
}()

// Do graceful shutdown
<-ctx.Done()

log.Info("start shutdown")

ctxShutDown, cancel := context.WithTimeout(context.Background(), time.Duration(GRACEFUL_TIMEOUT_SECONDS)*time.Second)

defer func() {
cancel()
}()

if err := srv.Shutdown(ctxShutDown); err != nil {
log.Fatalf("Server Shutdown Failed:%+s", err)
}

var wg sync.WaitGroup
wg.Add(len(workers))

for i := 0; i < len(workers); i++ {
workers[i].SetStopWg(&wg)

workers[i].FlushBuffer <- true
workers[i].Done <- true
}

wg.Wait()
log.Info("Server exited properly")
}
25 changes: 19 additions & 6 deletions pkg/remote/workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@ type Worker struct {
mu sync.RWMutex
MetricsBuffer []metrics.Anodot20Metric

flushBuffer chan bool
FlushBuffer chan bool

*WorkerConfig
stopWg *sync.WaitGroup
Done chan bool
}

type WorkerConfig struct {
Expand Down Expand Up @@ -59,6 +61,10 @@ func NewWorkerConfig() (*WorkerConfig, error) {
return config, err
}

func (w *Worker) SetStopWg(wg *sync.WaitGroup) {
w.stopWg = wg
}

func (w *Worker) String() string {
return fmt.Sprintf("Anodot URL='%s'", w.metricsSubmitter.AnodotURL().Host)
}
Expand Down Expand Up @@ -158,7 +164,7 @@ func NewWorker(metricsSubmitter metrics.Submitter, config *WorkerConfig) (*Worke
}
maxEPSLimit.Set(float64(maxAllowedEps))

worker := &Worker{metricsSubmitter: metricsSubmitter, WorkerConfig: config, MetricsBuffer: make([]metrics.Anodot20Metric, 0, 100000), flushBuffer: make(chan bool, 4*config.MaxWorkers)}
worker := &Worker{metricsSubmitter: metricsSubmitter, WorkerConfig: config, MetricsBuffer: make([]metrics.Anodot20Metric, 0, 100000), FlushBuffer: make(chan bool, 4*config.MaxWorkers), Done: make(chan bool)}
log.V(4).Infof("Metrics per request size is : %d", worker.MetricsPerRequestSize)
log.V(4).Infof("Metrics buffer size is : %d", len(worker.MetricsBuffer))

Expand All @@ -182,14 +188,14 @@ func NewWorker(metricsSubmitter metrics.Submitter, config *WorkerConfig) (*Worke
}
if time.Since(*timestamp) > w.BatchSendDeadline {
log.V(4).Infof("reached BatchSendDeadline of '%s'. Flushing metrics buffer", w.BatchSendDeadline.String())
w.flushBuffer <- true
w.FlushBuffer <- true
}
}
}(worker)

go func(w *Worker) {
for {
<-w.flushBuffer
<-w.FlushBuffer
bufferedMetrics.WithLabelValues(w.metricsSubmitter.AnodotURL().Host).Set(float64(w.BufferSize()))

var chunkSize int
Expand All @@ -203,7 +209,7 @@ func NewWorker(metricsSubmitter metrics.Submitter, config *WorkerConfig) (*Worke
}

select {
case <-w.flushBuffer:
case <-w.FlushBuffer:
default:
}

Expand Down Expand Up @@ -231,6 +237,13 @@ func NewWorker(metricsSubmitter metrics.Submitter, config *WorkerConfig) (*Worke
}()
}
}
select {
case <-w.Done:
log.Info("Stop worker")
w.stopWg.Done()
return
default:
}
concurrentWorkers.WithLabelValues(w.metricsSubmitter.AnodotURL().Host).Set(float64(atomic.LoadInt64(&w.currentWorkers)))
}
}(worker)
Expand All @@ -256,7 +269,7 @@ func (w *Worker) Do(data []metrics.Anodot20Metric) {
w.mu.Unlock()

if w.BufferSize() >= w.MetricsPerRequestSize {
w.flushBuffer <- true
w.FlushBuffer <- true
}
}

Expand Down

0 comments on commit 99022f5

Please sign in to comment.