From baf35e9e2cf4b4138450ddb743f45af2763f0889 Mon Sep 17 00:00:00 2001 From: Mmx Date: Mon, 28 Oct 2024 17:12:36 +0800 Subject: [PATCH 1/2] feat(local): allow to migrate static token buckets --- drivers/local/driver.go | 2 +- drivers/local/token_bucket.go | 27 +++++++++++++++++++++++++++ 2 files changed, 28 insertions(+), 1 deletion(-) diff --git a/drivers/local/driver.go b/drivers/local/driver.go index bf993e5d5f8..86980943ef5 100644 --- a/drivers/local/driver.go +++ b/drivers/local/driver.go @@ -76,7 +76,7 @@ func (d *Local) Init(ctx context.Context) error { if d.thumbConcurrency == 0 { d.thumbTokenBucket = NewNopTokenBucket() } else { - d.thumbTokenBucket = NewStaticTokenBucket(d.thumbConcurrency) + d.thumbTokenBucket = NewStaticTokenBucketWithMigration(d.thumbTokenBucket, d.thumbConcurrency) } return nil } diff --git a/drivers/local/token_bucket.go b/drivers/local/token_bucket.go index 38fbe73fc9b..32adfbc5a23 100644 --- a/drivers/local/token_bucket.go +++ b/drivers/local/token_bucket.go @@ -23,6 +23,33 @@ func NewStaticTokenBucket(size int) StaticTokenBucket { return StaticTokenBucket{bucket: bucket} } +func NewStaticTokenBucketWithMigration(oldBucket TokenBucket, size int) StaticTokenBucket { + if oldBucket != nil { + oldStaticBucket, ok := oldBucket.(StaticTokenBucket) + if ok { + oldSize := cap(oldStaticBucket.bucket) + migrateSize := oldSize + if size < migrateSize { + migrateSize = size + } + + bucket := make(chan struct{}, size) + for range size - migrateSize { + bucket <- struct{}{} + } + + go func() { + for range migrateSize { + <-oldStaticBucket.bucket + bucket <- struct{}{} + } + }() + return StaticTokenBucket{bucket: bucket} + } + } + return NewStaticTokenBucket(size) +} + func (b StaticTokenBucket) Take() <-chan struct{} { return b.bucket } From 425e09d8e18e85f37e75539762bd4e84b9c3a59d Mon Sep 17 00:00:00 2001 From: Mmx Date: Mon, 28 Oct 2024 18:11:19 +0800 Subject: [PATCH 2/2] improve(local): token bucket migration boundary handling --- drivers/local/token_bucket.go | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/drivers/local/token_bucket.go b/drivers/local/token_bucket.go index 32adfbc5a23..23c6ebd63b7 100644 --- a/drivers/local/token_bucket.go +++ b/drivers/local/token_bucket.go @@ -38,18 +38,23 @@ func NewStaticTokenBucketWithMigration(oldBucket TokenBucket, size int) StaticTo bucket <- struct{}{} } - go func() { - for range migrateSize { - <-oldStaticBucket.bucket - bucket <- struct{}{} - } - }() + if migrateSize != 0 { + go func() { + for range migrateSize { + <-oldStaticBucket.bucket + bucket <- struct{}{} + } + close(oldStaticBucket.bucket) + }() + } return StaticTokenBucket{bucket: bucket} } } return NewStaticTokenBucket(size) } +// Take channel maybe closed when local driver is modified. +// don't call Put method after the channel is closed. func (b StaticTokenBucket) Take() <-chan struct{} { return b.bucket } @@ -62,8 +67,10 @@ func (b StaticTokenBucket) Do(ctx context.Context, f func() error) error { select { case <-ctx.Done(): return ctx.Err() - case <-b.bucket: - defer b.Put() + case _, ok := <-b.Take(): + if ok { + defer b.Put() + } } return f() }