Skip to content

Commit

Permalink
🔨 refactor the golim logics
Browse files Browse the repository at this point in the history
  • Loading branch information
khalil committed Mar 30, 2024
1 parent ae2e9fb commit 0f3fe31
Show file tree
Hide file tree
Showing 7 changed files with 137 additions and 94 deletions.
70 changes: 48 additions & 22 deletions cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,73 +3,99 @@ package main
import (
"context"
"encoding/json"
"fmt"
"log"
"os"
"time"

"github.com/khalil-farashiani/golim/role"
"github.com/pingcap/log"
"github.com/redis/go-redis/v9"
)

// cache struct to handle redis operations
type cache struct {
*redis.Client
}

// initRedis initializes redis connection
func initRedis() *cache {
url := os.Getenv("REDIS_URI")
opts, err := redis.ParseURL(url)
if err != nil {
panic(err)
log.Fatalf("Error parsing Redis URL: %v", err)
}
return &cache{
redis.NewClient(opts),
}
}

// getAllUserLimitersKeys retrieves all user limiter keys from cache
func (c *cache) getAllUserLimitersKeys(ctx context.Context) []string {
res, err := c.Keys(ctx, "*GLOLIM_KEY*").Result()
res, err := c.Keys(ctx, limiterCacheRegexPatternKey).Result()
if err != nil {
log.Fatal(err.Error())
log.Printf("Error retrieving keys: %v", err)
return nil
}
return res
}

// increaseCap increases the capacity in cache for a given key
func (c *cache) increaseCap(ctx context.Context, key string, rl *limiterRole) {
c.IncrBy(ctx, key, rl.addToken)
if err := c.IncrBy(ctx, key, rl.addToken).Err(); err != nil {
log.Printf("Error increasing capacity: %v", err)
}
}

// decreaseCap decreases the capacity in cache for a given key
func (c *cache) decreaseCap(ctx context.Context, userIP string, rl *limiterRole) {
key := userIP + "GLOLIM_KEY" + rl.operation + " " + rl.endPoint
c.Decr(ctx, key)
key := fmt.Sprintf("%s%s%s %s", userIP, limiterCacheMainKey, rl.operation, rl.endPoint)
if err := c.Decr(ctx, key).Err(); err != nil {
log.Printf("Error decreasing capacity: %v", err)
}
}

// setLimiter sets a limiter in cache based on parameters
func (c *cache) setLimiter(ctx context.Context, params *role.GetRoleParams, val *role.GetRoleRow) {
key := params.Operation + " " + params.Endpoint
err := c.Set(ctx, key, val, time.Minute*60).Err()
if err != nil {
panic(err)
key := fmt.Sprintf("%s %s", params.Operation, params.Endpoint)
if err := c.Set(ctx, key, *val, time.Minute*60).Err(); err != nil {
log.Printf("Error setting limiter: %v", err)
}
}

// getLimiter retrieves a limiter from cache based on parameters
func (c *cache) getLimiter(ctx context.Context, params role.GetRoleParams) *role.GetRoleRow {
var res role.GetRoleRow
var key = params.Operation + " " + params.Endpoint
key := fmt.Sprintf("%s %s", params.Operation, params.Endpoint)
val, err := c.Get(ctx, key).Result()
if err != nil && err != redis.Nil {
panic(err)
if err != nil {
if err != redis.Nil {
log.Printf("Error getting limiter: %v", err)
}
return nil
}
json.Unmarshal([]byte(val), &res)
return &res
}

func (c *cache) getUserRequestCap(ctx context.Context, ipAddr string, rl *limiterRole) int64 {
key := ipAddr + rl.endPoint + rl.endPoint
var res = new(int64)
val, err := c.Get(ctx, key).Result()
// setUserRequestCap sets user request capacity in cache
func (c *cache) setUserRequestCap(ctx context.Context, key string, role role.GetRoleRow) {
if err := c.Set(ctx, key, role.InitialTokens, time.Hour).Err(); err != nil {
log.Printf("Error setting user request capacity: %v", err)
}
}

if err != nil && err != redis.Nil {
panic(err)
// getUserRequestCap retrieves user request capacity from cache
func (c *cache) getUserRequestCap(ctx context.Context, ipAddr string, g *golim, role role.GetRoleRow) int64 {
key := fmt.Sprintf("%s%s%s %s", ipAddr, limiterCacheMainKey, g.limiterRole.operation, g.limiterRole.endPoint)
val, err := c.Get(ctx, key).Result()
if err != nil {
if err != redis.Nil {
go c.setUserRequestCap(ctx, key, role)
log.Printf("Error getting user request capacity: %v", err)
}
return 0
}
json.Unmarshal([]byte(val), res)
return *res
var res int64
json.Unmarshal([]byte(val), &res)
return res
}
26 changes: 17 additions & 9 deletions const.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,34 @@ const (
)

const (
addRoleOperation = "add"
removeRoleOperationID = "remove"
getRolesOperationID = "getRoles"
addRoleOperation = "add"
removeRoleOperation = "remove"
getRolesOperation = "getRoles"
)

const (
OperationGet = "GET"
OperationPost = "POST"
OperationPut = "PUT"
OperationPatch = "PATCH"
OperationDelete = "DELETE"
limiterCacheMainKey = "GOLIM_KEY"
limiterCacheRegexPatternKey = "*GOLIM_KEY*"
)

const (
unknownLimiterRoleError = "unknown limiter role operation"
unknownLimiterError = "unknown limiter operation"
unsupportedOperationError = "unsupported operation"
requiredNameDestinationError = "name and destination is required"
requiredLimiterIDError = "limiter id is required"
createProxyError = "Error creating proxy request"
sendingProxyError = "Error sending proxy request"
slowDownError = "slow down"
notFoundSqlError = "sql: no rows in result set"
)

const (
helpMessageUsage = `
Golim help:
- golim run -p{--port} <port> [run in the specific port default is 8080]
- golim get -l{--limiter} <limiter id> [get roles of a rate limiter]
- golim init -n{--name} foo -d{--destination} 8.8.8.8 [initial new rate limiter]
- golim add -l{--limiter} <limiter id> -e{--endpoint} <endpoint> -b{--bsize} <bucket size> -a{--add_token} <add_token per minute> -i{--initial_token} <initial tokens> [add specific role to limiter]
- golim remove -i{--id} <role id> [remove specific role]
- golim remove-limiter -l{--limiter} <limiter id> [remove specific limiter]`
)
2 changes: 1 addition & 1 deletion cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ import (
)

func scheduleIncreaseCap(ctx context.Context, g *golim) {
userKeys := g.cache.getAllUserLimitersKeys(ctx)
cr := cron.New()
_, err := cr.AddFunc("@every 1m", func() {
userKeys := g.cache.getAllUserLimitersKeys(ctx)
fmt.Println("Running tasks")
for _, key := range userKeys {
g.cache.increaseCap(ctx, key, g.limiterRole)
Expand Down
71 changes: 37 additions & 34 deletions golim.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"database/sql"
"errors"
"fmt"
"strings"

"github.com/khalil-farashiani/golim/role"
"github.com/peterbourgon/ff/v4"
Expand Down Expand Up @@ -38,20 +40,28 @@ type golim struct {
Store
}

func (g *golim) getRole(ctx context.Context) (role.GetRoleRow, error) {
func (g *golim) getRole(ctx context.Context) (role.GetRoleRow, bool, error) {
params := toGetRole(g)
data := g.cache.getLimiter(ctx, params)
if data != nil {
return *data, nil
return *data, false, nil
}

row, err := g.db.GetRole(ctx, params)
if err != nil {
return role.GetRoleRow{}, err
if strings.Contains(err.Error(), notFoundSqlError) {
return role.GetRoleRow{}, false, nil
}
return role.GetRoleRow{}, false, err
}

if row.Endpoint == "" {
return role.GetRoleRow{}, false, nil
}
go func() {
g.cache.setLimiter(ctx, &params, &row)
}()
return row, nil

g.cache.setLimiter(ctx, &params, &row)

return row, true, nil
}

func (g *golim) getRoles(ctx context.Context) ([]role.GetRolesRow, error) {
Expand All @@ -75,12 +85,13 @@ func (g *golim) createRateLimiter(ctx context.Context) error {
}

func (g *golim) removeRateLimiter(ctx context.Context) error {
return g.db.DeleteRateLimiter(ctx, g.limiter.id.(int64))
return g.db.DeleteRateLimiter(ctx, int64(g.limiter.id.(int)))
}

func (g *golim) ExecCMD(ctx context.Context) (interface{}, error) {

if g.port != 0 {
go scheduleIncreaseCap(ctx, g)
return startServer(g)
}
if g.limiter != nil {
Expand All @@ -89,7 +100,7 @@ func (g *golim) ExecCMD(ctx context.Context) (interface{}, error) {
if g.limiterRole != nil {
return handleLimiterRoleOperation(g, ctx)
}
return nil, errors.New(unsupportedOperationError)
return nil, nil
}

func handleLimiterOperation(g *golim, ctx context.Context) (interface{}, error) {
Expand All @@ -106,9 +117,9 @@ func handleLimiterRoleOperation(g *golim, ctx context.Context) (interface{}, err
switch g.limiterRole.operation {
case addRoleOperation:
return nil, g.addRole(ctx)
case removeRoleOperationID:
case removeRoleOperation:
return nil, g.removeRole(ctx)
case getRolesOperationID:
case getRolesOperation:
return g.getRoles(ctx)
}
return nil, errors.New(unknownLimiterRoleError)
Expand All @@ -124,12 +135,14 @@ func newLimiter(db *sql.DB, cache *cache) *golim {
}

func (g *golim) createHelpCMD() *ff.Command {
helpFlags := ff.NewFlagSet("help")
return &ff.Command{
Name: "help",
Usage: "golim help",
ShortHelp: "Displays help information for golim",
Flags: ff.NewFlagSet("help"),
Flags: helpFlags,
Exec: func(ctx context.Context, args []string) error {
fmt.Println(helpMessageUsage)
return nil
},
}
Expand Down Expand Up @@ -183,13 +196,13 @@ func (g *golim) createInitCMD() *ff.Command {
}

func (g *golim) addRemoveLimiterCMD() *ff.Command {
initFlags := ff.NewFlagSet("remove-limiter")
limiterID := initFlags.Int('l', "limiter", 0, "The name of the golim to initialize")
removeFlags := ff.NewFlagSet("removel")
limiterID := removeFlags.Int('l', "limiter", 0, "The name of the golim to initialize")
return &ff.Command{
Name: "init",
Usage: "golim init -n <limiter_name>",
Name: "removel",
Usage: "golim removel -l <limiter_id>",
ShortHelp: "Initializes a standalone rate golim",
Flags: initFlags,
Flags: removeFlags,
Exec: func(ctx context.Context, args []string) error {
if g.skip {
return nil
Expand Down Expand Up @@ -241,21 +254,21 @@ func (g *golim) createAddCMD() *ff.Command {

func (g *golim) createRemoveCMD() *ff.Command {
removeFlags := ff.NewFlagSet("remove")
limiterID := removeFlags.Int('l', "limiter", 0, "The limiter id")
roleID := removeFlags.Int('i', "role_id", 0, "the role id")

return &ff.Command{
Name: "add",
Usage: "golim add -e <endpoint> -b <bsize> -a <add_token>",
Name: "remove",
Usage: "golim remove -i <role id>",
ShortHelp: "Adds a new golim with the specified configuration",
Flags: removeFlags,
Exec: func(ctx context.Context, args []string) error {
if g.skip {
return nil
}
if *limiterID == 0 {
if *roleID != 0 {
g.limiterRole = &limiterRole{
operation: removeRoleOperationID,
limiterID: *limiterID,
operation: removeRoleOperation,
limiterID: *roleID,
}
}
g.skip = true
Expand All @@ -279,7 +292,7 @@ func (g *golim) createGetRolesCMD() *ff.Command {
}
if *limiterID != 0 {
g.limiterRole = &limiterRole{
operation: getRolesOperationID,
operation: getRolesOperation,
limiterID: *limiterID,
}
} else {
Expand Down Expand Up @@ -315,13 +328,3 @@ func toGetRole(g *golim) role.GetRoleParams {
Operation: g.limiterRole.operation,
}
}

func toRole(row role.GetRoleRow) role.Role {
return role.Role{
Endpoint: row.Endpoint,
Operation: row.Operation,
BucketSize: row.BucketSize,
AddTokenPerMin: row.AddTokenPerMin,
InitialTokens: row.InitialTokens,
}
}
Loading

0 comments on commit 0f3fe31

Please sign in to comment.