diff --git a/cmd/driplane/main.go b/cmd/driplane/main.go index 2944379..6eca5c5 100644 --- a/cmd/driplane/main.go +++ b/cmd/driplane/main.go @@ -123,7 +123,7 @@ func main() { os.Exit(0) } - go Signal(&o) + go Signal(o) log.Debug("Trying to start orchestrator") o.StartFeeders() @@ -131,6 +131,4 @@ func main() { log.Debug("Stopping") o.StopFeeders() - - return } diff --git a/core/orchestrator.go b/core/orchestrator.go index 96f0902..fe91d9a 100644 --- a/core/orchestrator.go +++ b/core/orchestrator.go @@ -22,10 +22,10 @@ type Orchestrator struct { } // NewOrchestrator create a new instance of the Orchestrator -func NewOrchestrator(config *Configuration) (Orchestrator, error) { - o := Orchestrator{ +func NewOrchestrator(config *Configuration) (*Orchestrator, error) { + o := &Orchestrator{ config: config, - asts: make(map[string]*AST), + asts: make(map[string]*AST), } parser, _ := NewParser() @@ -108,4 +108,4 @@ func (o *Orchestrator) StopFeeders() { // sending a shutdown event on the bus rs.bus.Publish(data.EventTopicName, &data.Event{Type: "shutdown"}) rs.bus.WaitAsync() -} \ No newline at end of file +} diff --git a/filters/ratelimit.go b/filters/ratelimit.go new file mode 100644 index 0000000..db4bfbd --- /dev/null +++ b/filters/ratelimit.go @@ -0,0 +1,71 @@ +package filters + +import ( + "context" + "strconv" + + "github.com/Matrix86/driplane/data" + "github.com/evilsocket/islazy/log" + "golang.org/x/time/rate" +) + +// RateLimit is a Filter to create a RateLimit number +type RateLimit struct { + Base + + objects int64 + seconds int64 + limiter *rate.Limiter + context context.Context + cancelContext context.CancelFunc + + params map[string]string +} + +// NewRateLimitFilter is the registered method to instantiate a RateLimit Filter +func NewRateLimitFilter(p map[string]string) (Filter, error) { + ctx, cancel := context.WithCancel(context.Background()) + f := &RateLimit{ + params: p, + seconds: 1, + context: ctx, + cancelContext: cancel, + } + f.cbFilter = f.DoFilter + + if v, ok := p["rate"]; ok { + i, err := strconv.ParseInt(v, 10, 64) + if err != nil { + return nil, err + } + f.objects = i + } + + f.limiter = rate.NewLimiter(rate.Limit(f.objects), int(f.seconds)) + + return f, nil +} + +// DoFilter is the mandatory method used to "filter" the input data.Message +func (f *RateLimit) DoFilter(msg *data.Message) (bool, error) { + if f.objects > 0 { + if err := f.limiter.Wait(f.context); err != nil { + return false, nil + } + } + + return true, nil +} + +// OnEvent is called when an event occurs +func (f *RateLimit) OnEvent(event *data.Event) { + if event.Type == "shutdown" { + log.Debug("shutdown event received") + f.cancelContext() + } +} + +// Set the name of the filter +func init() { + register("ratelimit", NewRateLimitFilter) +} diff --git a/go.mod b/go.mod index bde1453..9f8565c 100644 --- a/go.mod +++ b/go.mod @@ -97,6 +97,7 @@ require ( golang.org/x/oauth2 v0.15.0 // indirect golang.org/x/sys v0.15.0 // indirect golang.org/x/text v0.14.0 // indirect + golang.org/x/time v0.5.0 golang.org/x/tools v0.16.1 // indirect google.golang.org/api v0.154.0 // indirect google.golang.org/appengine v1.6.8 // indirect diff --git a/go.sum b/go.sum index cdb28e7..ed13e7a 100644 --- a/go.sum +++ b/go.sum @@ -452,6 +452,8 @@ golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk= +golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= diff --git a/src_docs/content/doc/filters/ratelimit.md b/src_docs/content/doc/filters/ratelimit.md new file mode 100644 index 0000000..a37cb82 --- /dev/null +++ b/src_docs/content/doc/filters/ratelimit.md @@ -0,0 +1,30 @@ +--- +title: "RateLimit" +date: 2023-01-31T18:50:23+02:00 +draft: false +--- + +## RateLimit + +This filter allows you to set a rate limit on the messages that can go through it. So for example if we don't want to limit the number of messages in a pipe to 5 messages per second we just to set the parameter `rate` to 5. + +### Parameters + +| Parameter | Type | Default | Description | +|--------------|----------|---------|--------------------------------------------------------------------------------------------------| +| **rate** | _STRING_ | "0" | how many event per second you want to have as rate limiter | + + +{{< notice info "Example" >}} +`... | ratelimit(rate="5") | ...` +{{< /notice >}} + +### Output + +The filter will slow down the rate of the messages as specified on the parameter `rate`. + +### Examples + +{{< alert theme="warning" >}} +Soon... +{{< /alert >}} \ No newline at end of file