Skip to content

Commit

Permalink
[feat:filebeat/input/redis/slowlog] Add Redis replication role to slo…
Browse files Browse the repository at this point in the history
…wlogs (#40578)

Add Redis replication role to slowlogs

---------

Co-authored-by: Mauri de Souza Meneguzzo <mauri870@gmail.com>
  • Loading branch information
jdheyburn and mauri870 authored Sep 18, 2024
1 parent 8feb181 commit bfed178
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 4 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Journald: removed configuration options `include_matches.or`, `include_matches.and`, `backoff`, `max_backoff`, `cursor_seek_fallback`. {pull}40061[40061]
- Journald: `include_matches.match` now behaves in the same way as matchers in `journalctl`. Users should carefully update their input configuration. {pull}40061[40061]
- Journald: `seek` and `since` behaviour have been simplified, if there is a cursor (state) `seek` and `since` are ignored and the cursor is used. {pull}40061[40061]
- Redis: Added replication role as a field to submitted slowlogs
- Added `container.image.name` to `journald` Filebeat input's Docker-specific translated fields. {pull}40450[40450]


Expand Down
18 changes: 14 additions & 4 deletions filebeat/input/redis/harvester.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,29 +81,38 @@ func (h *Harvester) Run() error {
return nil
default:
}
// Writes Slowlog get and slowlog reset both to the buffer so they are executed together
// Writes Slowlog get, slowlog reset, and role to the buffer so they are executed together
if err := h.conn.Send("SLOWLOG", "GET"); err != nil {
return fmt.Errorf("error sending slowlog get: %w", err)
}
if err := h.conn.Send("SLOWLOG", "RESET"); err != nil {
return fmt.Errorf("error sending slowlog reset: %w", err)
}
if err := h.conn.Send("ROLE"); err != nil {
return fmt.Errorf("error sending role: %w", err)
}

// Flush the buffer to execute both commands and receive the reply from SLOWLOG GET
// Flush the buffer to execute all commands and receive the replies
h.conn.Flush()

// Receives first reply from redis which is the one from GET
// Receives first reply from redis which is the one from SLOWLOG GET
logs, err := rd.Values(h.conn.Receive())
if err != nil {
return fmt.Errorf("error receiving slowlog data: %w", err)
}

// Read reply from RESET
// Read reply from SLOWLOG RESET
_, err = h.conn.Receive()
if err != nil {
return fmt.Errorf("error receiving reset data: %w", err)
}

// Read reply from ROLE
role, err := h.conn.Receive()
if err != nil {
return fmt.Errorf("error receiving replication role: %w", err)
}

for _, item := range logs {
// Stopping here means some of the slowlog events are lost!
select {
Expand Down Expand Up @@ -146,6 +155,7 @@ func (h *Harvester) Run() error {
"duration": mapstr.M{
"us": log.duration,
},
"role": role,
}

if log.args != nil {
Expand Down
1 change: 1 addition & 0 deletions filebeat/tests/system/test_redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ def test_input(self):

assert output["input.type"] == "redis"
assert "redis.slowlog.cmd" in output
assert "redis.slowlog.role" in output

def get_host(self):
return os.getenv('REDIS_HOST', 'localhost')
Expand Down

0 comments on commit bfed178

Please sign in to comment.