forked from lateefj/slowgrog
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcmd.go
132 lines (120 loc) · 2.95 KB
/
cmd.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package main
import (
"fmt"
"time"
"github.com/garyburd/redigo/redis"
)
const (
MONITOR_BUFFER_SIZE = 1000
INFO = "INFO"
MONITOR = "MONITOR"
SLOWLOG = "SLOWLOG"
)
func newPool(server, password string) *redis.Pool {
return &redis.Pool{
MaxIdle: 3,
IdleTimeout: 240 * time.Second,
Dial: func() (redis.Conn, error) {
c, err := redis.Dial("tcp", server)
if err != nil {
return nil, err
}
if password != "" {
if _, err := c.Do("AUTH", password); err != nil {
c.Close()
return nil, err
}
}
return c, err
},
TestOnBorrow: func(c redis.Conn, t time.Time) error {
_, err := c.Do("PING")
return err
},
}
}
type DataCmds interface {
SlowlogCmd() ([]Slowlog, error)
InfoCmd() (string, error)
MonitorCmd(chan bool) chan string
}
type RedisCmds struct {
Pool *redis.Pool
}
func NewRedisCmds() *RedisCmds {
server := fmt.Sprintf("%s:%d", RedisHost, RedisPort)
p := newPool(server, RedisPassword)
return &RedisCmds{Pool: p}
}
func (rc *RedisCmds) conn() redis.Conn {
return rc.Pool.Get()
}
func (rc *RedisCmds) SlowlogCmd() ([]Slowlog, error) {
c := rc.conn()
entries, err := redis.Values(c.Do(SLOWLOG, "GET", SlowlogSize))
if err != nil {
Logger.Errorf("Redis SLOWLOG GET %s", err)
return nil, err
}
Logger.Debugf("Size of entires for slow log is %d", len(entries))
return ParseSlowlogReply(entries, err)
}
func (rc *RedisCmds) InfoCmd() (string, error) {
c := rc.conn()
c.Send(INFO)
c.Flush()
reply, err := c.Receive()
if err != nil {
Logger.Errorf("Failed trying ot get INFO from redis: %s", err)
return "", err
}
return redis.String(reply, err)
}
func (rc *RedisCmds) MonitorCmd(stopper chan bool) chan string {
replies := make(chan string, MONITOR_BUFFER_SIZE)
// In background push on the connection
go func() {
c := rc.conn()
c.Send(MONITOR)
c.Flush()
timeoutSet := false
for {
// Danger performance danger but this setting has to be overwritten by config so YMMV
// If the length is greater than 0 and the timeout is not already set then
// this will sleep in the future
if MonitorSampleLength > 0 && !timeoutSet {
// Flag the timeout is set so we don't double add sleeps
timeoutSet = true
time.AfterFunc(time.Duration(MonitorSampleLength)*time.Microsecond, func() {
time.Sleep(time.Duration(MonitorSampleLength) * time.Microsecond)
// Turn timeout off so it will get reset
timeoutSet = false
})
}
select {
case <-stopper: // Stops the monitoring!
break
default:
reply, err := c.Receive()
if err != nil {
// Try to reconnect on error
Logger.Errorf("Reconnecting to redis after fail %s", err)
c.Close()
c = rc.conn()
c.Send(MONITOR)
c.Flush()
break
}
r, err := redis.String(reply, err)
if err != nil {
Logger.Errorf("Couldn't convert reply %s", err)
break
}
replies <- r
}
}
c.Close()
close(replies)
}()
return replies
}