Skip to content

Commit

Permalink
feat: support degradation config for Kitex client (#7)
Browse files Browse the repository at this point in the history
* feat: support degradation config for Kitex client

* feat: support degradation config for Kitex client

* feat: add license header

* Add unit test and docs

* typo fix

* fix lint problem and modify pr-check.yml

* delete degradation middleware and use acl middleware

* update returned error

* update test
  • Loading branch information
XiaoYi-byte authored May 30, 2024
1 parent ee1fe89 commit 7fe0870
Show file tree
Hide file tree
Showing 8 changed files with 231 additions and 2 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/pr-check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ jobs:
- uses: actions/checkout@v3

- name: Check License Header
uses: apache/skywalking-eyes/header@main
uses: apache/skywalking-eyes/header@v0.4.0
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}

Expand Down
20 changes: 20 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,26 @@ The echo method uses the following configuration (0.3, 100) and other methods us
}
}
```
##### Degradation: Category=degradation

[JSON Schema](https://github.com/cloudwego/kitex/blob/develop/pkg/circuitbreak/item_circuit_breaker.go#L30)

| Variable | Introduction |
|------------|------------------------------------|
| enable | Whether to enable degradation |
| percentage | The percentage of dropped requests |

Example:

> configPath: /KitexConfig/ClientName/ServiceName/degradation
```json
{
"enable": true,
"percentage": 30
}
```
Note: Degradation is not enabled by default.
### More Info

Refer to [example](https://github.com/kitex-contrib/config-etcd/tree/main/example) for more usage.
Expand Down
20 changes: 20 additions & 0 deletions README_CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,26 @@ echo 方法使用下面的配置(0.3、100),其他方法使用全局默认
}
}
```
##### 降级: Category=degradation

[JSON Schema](https://github.com/cloudwego/kitex/blob/develop/pkg/circuitbreak/item_circuit_breaker.go#L30)

| 参数 | 说明 |
|------------|----------|
| enable | 是否开启降级策略 |
| percentage | 丢弃请求的比例 |

例子:

> configPath: /KitexConfig/ClientName/ServiceName/degradation
```json
{
"enable": true,
"percentage": 30
}
```
注:默认不开启降级(enable为false)
### 更多信息

更多示例请参考 [example](https://github.com/kitex-contrib/config-etcd/tree/main/example)
Expand Down
66 changes: 66 additions & 0 deletions client/degradation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright 2024 CloudWeGo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package client

import (
"context"

"github.com/cloudwego/kitex/client"
"github.com/cloudwego/kitex/pkg/klog"
"github.com/kitex-contrib/config-etcd/etcd"
"github.com/kitex-contrib/config-etcd/pkg/degradation"
"github.com/kitex-contrib/config-etcd/utils"
)

func WithDegradation(dest, src string, etcdClient etcd.Client, uniqueID int64, opts utils.Options) []client.Option {
param, err := etcdClient.ClientConfigParam(&etcd.ConfigParamConfig{
Category: degradationConfigName,
ServerServiceName: dest,
ClientServiceName: src,
})
if err != nil {
panic(err)
}
for _, f := range opts.EtcdCustomFunctions {
f(&param)
}
key := param.Prefix + "/" + param.Path
container := initDegradationOptions(key, dest, uniqueID, etcdClient)
return []client.Option{
client.WithACLRules(container.GetAclRule()),
client.WithCloseCallbacks(func() error {
// cancel the configuration listener when client is closed.
etcdClient.DeregisterConfig(key, uniqueID)
return nil
}),
}
}

func initDegradationOptions(key, dest string, uniqueID int64, etcdClient etcd.Client) *degradation.Container {
container := degradation.NewContainer()
onChangeCallback := func(restoreDefault bool, data string, parser etcd.ConfigParser) {
config := &degradation.Config{}
if !restoreDefault {
err := parser.Decode(data, config)
if err != nil {
klog.Warnf("[etcd] %s server etcd degradation config: unmarshal data %s failed: %s, skip...", key, data, err)
return
}
}
container.NotifyPolicyChange(config)
}
etcdClient.RegisterConfigCallback(context.Background(), key, uniqueID, onChangeCallback)
return container
}
2 changes: 2 additions & 0 deletions client/suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ const (
retryConfigName = "retry"
rpcTimeoutConfigName = "rpc_timeout"
circuitBreakerConfigName = "circuit_break"
degradationConfigName = "degradation"
)

// EtcdClientSuite etcd client config suite, configure retry timeout limit and circuitbreak dynamically from etcd.
Expand Down Expand Up @@ -58,5 +59,6 @@ func (s *EtcdClientSuite) Options() []client.Option {
opts = append(opts, WithRetryPolicy(s.service, s.client, s.etcdClient, s.uid, s.opts)...)
opts = append(opts, WithRPCTimeout(s.service, s.client, s.etcdClient, s.uid, s.opts)...)
opts = append(opts, WithCircuitBreaker(s.service, s.client, s.etcdClient, s.uid, s.opts)...)
opts = append(opts, WithDegradation(s.service, s.client, s.etcdClient, s.uid, s.opts)...)
return opts
}
81 changes: 81 additions & 0 deletions pkg/degradation/item_degradation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright 2024 CloudWeGo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package degradation

import (
"context"
"errors"
"sync/atomic"

"github.com/bytedance/gopkg/lang/fastrand"
"github.com/cloudwego/configmanager/iface"
"github.com/cloudwego/kitex/pkg/acl"
)

var errRejected = errors.New("rejected by client degradation config")

var defaultConfig = &Config{
Enable: false,
Percentage: 0,
}

type Config struct {
Enable bool `json:"enable"`
Percentage int `json:"percentage"`
}

// DeepCopy returns a copy of the current Config
func (c *Config) DeepCopy() iface.ConfigValueItem {
result := &Config{
Enable: c.Enable,
Percentage: c.Percentage,
}
return result
}

// EqualsTo returns true if the current Config equals to the other Config
func (c *Config) EqualsTo(other iface.ConfigValueItem) bool {
o := other.(*Config)
return c.Enable == o.Enable && c.Percentage == o.Percentage
}

// Container is a wrapper for Config
type Container struct {
config atomic.Value
}

func NewContainer() *Container {
c := &Container{}
c.config.Store(defaultConfig)
return c
}

// NotifyPolicyChange to receive policy when it changes
func (c *Container) NotifyPolicyChange(cfg *Config) {
c.config.Store(cfg)
}

func (c *Container) GetAclRule() acl.RejectFunc {
return func(ctx context.Context, request interface{}) (reason error) {
cfg := c.config.Load().(*Config)
if !cfg.Enable {
return nil
}
if fastrand.Intn(100) < cfg.Percentage {
return errRejected
}
return nil
}
}
40 changes: 40 additions & 0 deletions pkg/degradation/item_degradation_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright 2024 CloudWeGo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package degradation

import (
"context"
"errors"
"testing"

"github.com/cloudwego/kitex/pkg/acl"
"github.com/cloudwego/thriftgo/pkg/test"
)

var errFake = errors.New("fake error")

func invoke(ctx context.Context, request, response interface{}) error {
return errFake
}

func TestNewContainer(t *testing.T) {
container := NewContainer()
aclMiddleware := acl.NewACLMiddleware([]acl.RejectFunc{container.GetAclRule()})
test.Assert(t, errors.Is(aclMiddleware(invoke)(context.Background(), nil, nil), errFake))
container.NotifyPolicyChange(&Config{Enable: false, Percentage: 100})
test.Assert(t, errors.Is(aclMiddleware(invoke)(context.Background(), nil, nil), errFake))
container.NotifyPolicyChange(&Config{Enable: true, Percentage: 100})
test.Assert(t, errors.Is(aclMiddleware(invoke)(context.Background(), nil, nil), errRejected))
}
2 changes: 1 addition & 1 deletion server/suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func NewSuite(service string, cli etcd.Client,
return su
}

// Options return a list client.Option
// Options return a list server.Option
func (s *EtcdServerSuite) Options() []server.Option {
opts := make([]server.Option, 0, 2)
opts = append(opts, WithLimiter(s.service, s.etcdClient, s.uid, s.opts))
Expand Down

0 comments on commit 7fe0870

Please sign in to comment.