Skip to content

Commit

Permalink
new: added ratelimit filter
Browse files Browse the repository at this point in the history
  • Loading branch information
glcbrg committed Jan 31, 2024
1 parent a8db8bd commit 83b8e06
Show file tree
Hide file tree
Showing 6 changed files with 109 additions and 7 deletions.
4 changes: 1 addition & 3 deletions cmd/driplane/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,14 +123,12 @@ func main() {
os.Exit(0)
}

go Signal(&o)
go Signal(o)

log.Debug("Trying to start orchestrator")
o.StartFeeders()
o.WaitFeeders()

log.Debug("Stopping")
o.StopFeeders()

return
}
8 changes: 4 additions & 4 deletions core/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ type Orchestrator struct {
}

// NewOrchestrator create a new instance of the Orchestrator
func NewOrchestrator(config *Configuration) (Orchestrator, error) {
o := Orchestrator{
func NewOrchestrator(config *Configuration) (*Orchestrator, error) {
o := &Orchestrator{

Check warning on line 26 in core/orchestrator.go

View check run for this annotation

Codecov / codecov/patch

core/orchestrator.go#L25-L26

Added lines #L25 - L26 were not covered by tests
config: config,
asts: make(map[string]*AST),
asts: make(map[string]*AST),

Check warning on line 28 in core/orchestrator.go

View check run for this annotation

Codecov / codecov/patch

core/orchestrator.go#L28

Added line #L28 was not covered by tests
}

parser, _ := NewParser()
Expand Down Expand Up @@ -108,4 +108,4 @@ func (o *Orchestrator) StopFeeders() {
// sending a shutdown event on the bus
rs.bus.Publish(data.EventTopicName, &data.Event{Type: "shutdown"})
rs.bus.WaitAsync()
}
}
71 changes: 71 additions & 0 deletions filters/ratelimit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package filters

import (
"context"
"strconv"

"github.com/Matrix86/driplane/data"
"github.com/evilsocket/islazy/log"
"golang.org/x/time/rate"
)

// RateLimit is a Filter to create a RateLimit number
type RateLimit struct {
Base

objects int64
seconds int64
limiter *rate.Limiter
context context.Context
cancelContext context.CancelFunc

params map[string]string
}

// NewRateLimitFilter is the registered method to instantiate a RateLimit Filter
func NewRateLimitFilter(p map[string]string) (Filter, error) {
ctx, cancel := context.WithCancel(context.Background())
f := &RateLimit{
params: p,
seconds: 1,
context: ctx,
cancelContext: cancel,

Check warning on line 32 in filters/ratelimit.go

View check run for this annotation

Codecov / codecov/patch

filters/ratelimit.go#L26-L32

Added lines #L26 - L32 were not covered by tests
}
f.cbFilter = f.DoFilter

Check warning on line 34 in filters/ratelimit.go

View check run for this annotation

Codecov / codecov/patch

filters/ratelimit.go#L34

Added line #L34 was not covered by tests

if v, ok := p["rate"]; ok {
i, err := strconv.ParseInt(v, 10, 64)
if err != nil {
return nil, err

Check warning on line 39 in filters/ratelimit.go

View check run for this annotation

Codecov / codecov/patch

filters/ratelimit.go#L36-L39

Added lines #L36 - L39 were not covered by tests
}
f.objects = i

Check warning on line 41 in filters/ratelimit.go

View check run for this annotation

Codecov / codecov/patch

filters/ratelimit.go#L41

Added line #L41 was not covered by tests
}

f.limiter = rate.NewLimiter(rate.Limit(f.objects), int(f.seconds))

Check warning on line 44 in filters/ratelimit.go

View check run for this annotation

Codecov / codecov/patch

filters/ratelimit.go#L44

Added line #L44 was not covered by tests

return f, nil

Check warning on line 46 in filters/ratelimit.go

View check run for this annotation

Codecov / codecov/patch

filters/ratelimit.go#L46

Added line #L46 was not covered by tests
}

// DoFilter is the mandatory method used to "filter" the input data.Message
func (f *RateLimit) DoFilter(msg *data.Message) (bool, error) {
if f.objects > 0 {
if err := f.limiter.Wait(f.context); err != nil {
return false, nil

Check warning on line 53 in filters/ratelimit.go

View check run for this annotation

Codecov / codecov/patch

filters/ratelimit.go#L50-L53

Added lines #L50 - L53 were not covered by tests
}
}

return true, nil

Check warning on line 57 in filters/ratelimit.go

View check run for this annotation

Codecov / codecov/patch

filters/ratelimit.go#L57

Added line #L57 was not covered by tests
}

// OnEvent is called when an event occurs
func (f *RateLimit) OnEvent(event *data.Event) {
if event.Type == "shutdown" {
log.Debug("shutdown event received")
f.cancelContext()

Check warning on line 64 in filters/ratelimit.go

View check run for this annotation

Codecov / codecov/patch

filters/ratelimit.go#L61-L64

Added lines #L61 - L64 were not covered by tests
}
}

// Set the name of the filter
func init() {
register("ratelimit", NewRateLimitFilter)
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ require (
golang.org/x/oauth2 v0.15.0 // indirect
golang.org/x/sys v0.15.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.5.0
golang.org/x/tools v0.16.1 // indirect
google.golang.org/api v0.154.0 // indirect
google.golang.org/appengine v1.6.8 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,8 @@ golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk=
golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
Expand Down
30 changes: 30 additions & 0 deletions src_docs/content/doc/filters/ratelimit.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
---
title: "RateLimit"
date: 2023-01-31T18:50:23+02:00
draft: false
---

## RateLimit

This filter allows you to set a rate limit on the messages that can go through it. So for example if we don't want to limit the number of messages in a pipe to 5 messages per second we just to set the parameter `rate` to 5.

### Parameters

| Parameter | Type | Default | Description |
|--------------|----------|---------|--------------------------------------------------------------------------------------------------|
| **rate** | _STRING_ | "0" | how many event per second you want to have as rate limiter |


{{< notice info "Example" >}}
`... | ratelimit(rate="5") | ...`
{{< /notice >}}

### Output

The filter will slow down the rate of the messages as specified on the parameter `rate`.

### Examples

{{< alert theme="warning" >}}
Soon...
{{< /alert >}}

0 comments on commit 83b8e06

Please sign in to comment.