Skip to content

Commit

Permalink
monitor pool size closely, to avoid too many connections (#29)
Browse files Browse the repository at this point in the history
fix default logging level
  • Loading branch information
Roey Prat authored Dec 25, 2018
1 parent 7b166b3 commit 8033654
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 6 deletions.
22 changes: 19 additions & 3 deletions cmd/redis-ts-adapter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ type config struct {
remoteTimeout time.Duration
listenAddr string
logLevel string
PoolSize int
IdleTimeout time.Duration
IdleCheckFrequency time.Duration
WriteTimeout time.Duration
}

var cfg = &config{}
Expand Down Expand Up @@ -53,9 +57,17 @@ func parseFlags() {
flag.StringVar(&cfg.listenAddr, "web.listen-address", "127.0.0.1:9201",
"Address to listen on for web endpoints.",
)
flag.StringVar(&cfg.logLevel, "log.level", "debug",
flag.StringVar(&cfg.logLevel, "log.level", "info",
"Only log messages with the given severity or above. One of: [debug, info, warn, error]",
)
flag.IntVar(&cfg.PoolSize, "redis-pool-size", 500,
"Maximum number of socket connections.")
flag.DurationVar(&cfg.IdleTimeout, "redis-idle-timeout", 10*time.Minute,
"Amount of time after which client closes idle connections.")
flag.DurationVar(&cfg.IdleCheckFrequency, "redis-idle-check-frequency", 30*time.Second,
"Frequency of idle checks made by client.")
flag.DurationVar(&cfg.WriteTimeout, "redis-write-timeout", 1*time.Minute,
"Redis write timeout.")

flag.Parse()
validateConfiguration()
Expand Down Expand Up @@ -101,8 +113,12 @@ func buildClients(cfg *config) ([]writer, []reader) {
if cfg.redisSentinelAddress != "" {
log.WithFields(log.Fields{"sentinel_address": cfg.redisSentinelAddress}).Info("Creating redis sentinel client")
client := redis_ts.NewFailoverClient(&redis.FailoverOptions{
MasterName: cfg.redisSentinelMasterName,
SentinelAddrs: []string{cfg.redisSentinelAddress},
MasterName: cfg.redisSentinelMasterName,
SentinelAddrs: []string{cfg.redisSentinelAddress},
PoolSize: cfg.PoolSize,
IdleTimeout: cfg.IdleTimeout,
IdleCheckFrequency: cfg.IdleCheckFrequency,
WriteTimeout: cfg.WriteTimeout,
})
readers = append(readers, client)
writers = append(writers, client)
Expand Down
17 changes: 14 additions & 3 deletions internal/redis_ts/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,14 @@ func add(key string, labels []string, timestamp int64, value float64) redis.Cmde
}

// Write sends a batch of samples to RedisTS via its HTTP API.
func (c *Client) Write(samples model.Samples) error {
func (c *Client) Write(samples model.Samples) (returnErr error) {
pipe := (*redis.Client)(c).Pipeline()

defer func() {
err := pipe.Close()
if err != nil {
returnErr = err
}
}()
for _, s := range samples {
_, exists := s.Metric[model.MetricNameLabel]
if !exists {
Expand Down Expand Up @@ -80,10 +85,16 @@ func metricToLabels(m model.Metric) (labels []string, keyName string) {
return labels, strings.Join(labels, ",")
}

func (c *Client) Read(req *prompb.ReadRequest) (*prompb.ReadResponse, error) {
func (c *Client) Read(req *prompb.ReadRequest) (returnVal *prompb.ReadResponse, returnErr error) {
var timeSeries []*prompb.TimeSeries
results := make([]*prompb.QueryResult, 0, len(req.Queries))
pipe := (*redis.Client)(c).Pipeline()
defer func() {
err := pipe.Close()
if err != nil {
returnErr = err
}
}()

commands := make([]*redis.SliceCmd, 0, len(req.Queries))
for _, q := range req.Queries {
Expand Down

0 comments on commit 8033654

Please sign in to comment.