Skip to content

Commit

Permalink
Fix script router dead lock (#2716)
Browse files Browse the repository at this point in the history
  • Loading branch information
YarBor authored Jul 29, 2024
1 parent ee3d011 commit 8747aa9
Show file tree
Hide file tree
Showing 4 changed files with 262 additions and 20 deletions.
4 changes: 2 additions & 2 deletions cluster/router/script/instance/instances_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ func init() {

type ScriptInstances interface {
Run(rawScript string, invokers []protocol.Invoker, invocation protocol.Invocation) ([]protocol.Invoker, error)
Compile(name, rawScript string) error
Destroy(name, rawScript string)
Compile(rawScript string) error
Destroy(rawScript string)
}

var factory map[string]ScriptInstances
Expand Down
7 changes: 4 additions & 3 deletions cluster/router/script/instance/js_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func (p *program) addCount(i int) int {

func newJsInstances() *jsInstances {
return &jsInstances{
program: map[string]*program{},
insPool: &sync.Pool{New: func() any {
return newJsInstance()
}},
Expand Down Expand Up @@ -113,7 +114,7 @@ func (i *jsInstances) Run(rawScript string, invokers []protocol.Invoker, invocat
return result, nil
}

func (i *jsInstances) Compile(key, rawScript string) error {
func (i *jsInstances) Compile(rawScript string) error {
var (
ok bool
pg *program
Expand All @@ -134,7 +135,7 @@ func (i *jsInstances) Compile(key, rawScript string) error {
if pg, ok = i.program[rawScript]; ok {
pg.addCount(1)
} else {
newPg, err := goja.Compile(key+`_jsScriptRoute`, jsScriptPrefix+rawScript, true)
newPg, err := goja.Compile("", jsScriptPrefix+rawScript, true)
if err != nil {
return err
}
Expand All @@ -144,7 +145,7 @@ func (i *jsInstances) Compile(key, rawScript string) error {
}
}

func (i *jsInstances) Destroy(_, rawScript string) {
func (i *jsInstances) Destroy(rawScript string) {
i.pgLock.Lock()
if pg, ok := i.program[rawScript]; ok {
if pg.addCount(-1) == 0 {
Expand Down
34 changes: 19 additions & 15 deletions cluster/router/script/route.go → cluster/router/script/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,12 @@ import (

// ScriptRouter only takes effect on consumers and only supports application granular management.
type ScriptRouter struct {
mu sync.RWMutex
scriptType string
applicationName string // applicationName to application - name
enabled bool // enabled
rawScript string
applicationName string

mu sync.RWMutex
enabled bool
scriptType string
rawScript string
}

func NewScriptRouter() *ScriptRouter {
Expand Down Expand Up @@ -86,8 +87,9 @@ func (s *ScriptRouter) Process(event *config_center.ConfigChangeEvent) {
in, err := ins.GetInstances(s.scriptType)
if err != nil {
logger.Errorf("GetInstances failed to Destroy: %v", err)
} else {
in.Destroy(s.rawScript)
}
in.Destroy(s.applicationName, s.rawScript)
}
// check new config
if "" == cfg.ScriptType {
Expand All @@ -106,17 +108,19 @@ func (s *ScriptRouter) Process(event *config_center.ConfigChangeEvent) {
logger.Infof("`enabled` field equiles false, this rule will be ignored :%s", cfg.Script)
}
// rewrite to ScriptRouter
s.enabled = *cfg.Enabled
s.rawScript = cfg.Script
s.scriptType = cfg.ScriptType
s.enabled = *cfg.Enabled

// compile script
in, err := ins.GetInstances(s.scriptType)
if err != nil {
logger.Errorf("GetInstances failed: %v", err)
s.enabled = false
return
}
if s.enabled {
err = in.Compile(s.applicationName, cfg.Script)
err = in.Compile(s.rawScript)
// fail, disable rule
if err != nil {
s.enabled = false
Expand All @@ -128,7 +132,7 @@ func (s *ScriptRouter) Process(event *config_center.ConfigChangeEvent) {
in, _ := ins.GetInstances(s.scriptType)

if in != nil && s.enabled {
in.Destroy(s.applicationName, s.rawScript)
in.Destroy(s.rawScript)
}
s.enabled = false
s.rawScript = ""
Expand Down Expand Up @@ -195,21 +199,21 @@ func (s *ScriptRouter) Notify(invokers []protocol.Invoker) {
return
}

s.mu.Lock()
defer s.mu.Unlock()

var (
listenTarget, value string
err error
)
if providerApplication != s.applicationName {
if s.applicationName != "" {
dynamicConfiguration.RemoveListener(strings.Join([]string{s.applicationName, constant.ScriptRouterRuleSuffix}, ""), s)
}

listenTarget := strings.Join([]string{providerApplication, constant.ScriptRouterRuleSuffix}, "")
listenTarget = strings.Join([]string{providerApplication, constant.ScriptRouterRuleSuffix}, "")
dynamicConfiguration.AddListener(listenTarget, s)
s.applicationName = providerApplication
value, err := dynamicConfiguration.GetRule(listenTarget)
value, err = dynamicConfiguration.GetRule(listenTarget)
if err != nil {
logger.Errorf("Failed to query Script rule, applicationName=%s, listening=%s, err=%v", s.applicationName, listenTarget, err)
return
}
s.Process(&config_center.ConfigChangeEvent{Key: listenTarget, Value: value, ConfigType: remoting.EventTypeUpdate})
}
Expand Down
237 changes: 237 additions & 0 deletions cluster/router/script/router_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package script

import (
"context"
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/config_center"
"dubbo.apache.org/dubbo-go/v3/protocol"
"dubbo.apache.org/dubbo-go/v3/protocol/invocation"
"dubbo.apache.org/dubbo-go/v3/remoting"
"github.com/stretchr/testify/assert"
"testing"
)

var url1 = func() *common.URL {
i, _ := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?anyhost=true&" +
"application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&" +
"environment=dev&interface=com.ikurento.user.UserProvider&ip=192.168.56.1&methods=GetUser%2C&" +
"module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&" +
"side=provider&timeout=3000&timestamp=1556509797245")
return i
}
var url2 = func() *common.URL {
u, _ := common.NewURL("dubbo://127.0.0.1:20001/com.ikurento.user.UserProvider?anyhost=true&" +
"application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&" +
"environment=dev&interface=com.ikurento.user.UserProvider&ip=192.168.56.1&methods=GetUser%2C&" +
"module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1448&revision=0.0.1&" +
"side=provider&timeout=3000&timestamp=1556509797246")
return u
}
var url3 = func() *common.URL {
i, _ := common.NewURL("dubbo://127.0.0.1:20002/com.ikurento.user.UserProvider?anyhost=true&" +
"application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&" +
"environment=dev&interface=com.ikurento.user.UserProvider&ip=192.168.56.1&methods=GetUser%2C&" +
"module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1449&revision=0.0.1&" +
"side=provider&timeout=3000&timestamp=1556509797247")
return i
}

func getRouteCheckArgs() ([]protocol.Invoker, protocol.Invocation, context.Context) {
return []protocol.Invoker{
protocol.NewBaseInvoker(url1()), protocol.NewBaseInvoker(url2()), protocol.NewBaseInvoker(url3()),
}, invocation.NewRPCInvocation("GetUser", nil, map[string]interface{}{
"attachmentKey": []string{"attachmentValue"},
}),
context.TODO()
}

func TestScriptRouter_Route(t *testing.T) {
type fields struct {
cfgContent string
}
type args struct {
invokers []protocol.Invoker
invocation protocol.Invocation
}
tests := []struct {
name string
fields fields
args args
want func([]protocol.Invoker) bool
}{
{
name: "base test",
fields: fields{cfgContent: `configVersion: v3.0
key: dubbo.io
type: javascript
enabled: true
script: |
(function route(invokers,invocation,context) {
var result = [];
for (var i = 0; i < invokers.length; i++) {
invokers[i].GetURL().Port = "20001"
result.push(invokers[i]);
}
return result;
}(invokers,invocation,context));
`},
args: func() args {
res := args{}
res.invokers, res.invocation, _ = getRouteCheckArgs()
return res
}(),
want: func(invokers []protocol.Invoker) bool {
for _, invoker := range invokers {
if invoker.GetURL().Port != "20001" {
return false
}
}
return true
},
}, {
name: "disable test",
fields: fields{cfgContent: `configVersion: v3.0
key: dubbo.io
type: javascript
enabled: false
script: |
(function route(invokers,invocation,context) {
var result = [];
for (var i = 0; i < invokers.length; i++) {
invokers[i].GetURL().Port = "20001"
result.push(invokers[i]);
}
return result;
}(invokers,invocation,context));
`},
args: func() args {
res := args{}
res.invokers, res.invocation, _ = getRouteCheckArgs()
return res
}(),
want: func(invokers []protocol.Invoker) bool {
expect_invokers, _, _ := getRouteCheckArgs()
return checkInvokersSame(invokers, expect_invokers)
},
}, {
name: "bad input",
fields: fields{cfgContent: `configVersion: v3.0
key: dubbo.io
type: javascript
enabled: true
script: |
badInPut
(().Port = "20001"
result.push(invokers[i]);
}
}
}
return result;
}(invokers,invocation,context));
`},
args: func() args {
res := args{}
res.invokers, res.invocation, _ = getRouteCheckArgs()
return res
}(),
want: func(invokers []protocol.Invoker) bool {
expect_invokers, _, _ := getRouteCheckArgs()
return checkInvokersSame(invokers, expect_invokers)
},
}, {
name: "bad call and recover",
fields: fields{cfgContent: `configVersion: v3.0
key: dubbo.io
type: javascript
enabled: true
script: |
(function route(invokers, invocation, context) {
var result = [];
for (var i = 0; i < invokers.length; i++) {
if ("127.0.0.1" === invokers[i].GetURL().Ip) {
if (invokers[i].GetURLS(" Err Here ").Port !== "20000") {
invokers[i].GetURLS().Ip = "anotherIP"
result.push(invokers[i]);
}
}
}
return result;
}(invokers, invocation, context));
`},
args: func() args {
res := args{}
res.invokers, res.invocation, _ = getRouteCheckArgs()
return res
}(),
want: func(invokers []protocol.Invoker) bool {
expect_invokers, _, _ := getRouteCheckArgs()
return checkInvokersSame(invokers, expect_invokers)
},
}, {
name: "bad type",
fields: fields{cfgContent: `configVersion: v3.0
key: dubbo.io
type: errorType # <---
enabled: true
script: |
(function route(invokers,invocation,context) {
var result = [];
for (var i = 0; i < invokers.length; i++) {
invokers[i].GetURL().Port = "20001"
result.push(invokers[i]);
}
return result;
}(invokers,invocation,context));
`},
args: func() args {
res := args{}
res.invokers, res.invocation, _ = getRouteCheckArgs()
return res
}(),
want: func(invokers []protocol.Invoker) bool {
expect_invokers, _, _ := getRouteCheckArgs()
return checkInvokersSame(invokers, expect_invokers)
},
},
}
for _, tt := range tests {
s := &ScriptRouter{}
t.Run(tt.name, func(t *testing.T) {
s.Process(&config_center.ConfigChangeEvent{Key: "", Value: tt.fields.cfgContent, ConfigType: remoting.EventTypeUpdate})
got := s.Route(tt.args.invokers, nil, tt.args.invocation)
assert.True(t, tt.want(got))
})
}
}

func checkInvokersSame(invokers []protocol.Invoker, otherInvokers []protocol.Invoker) bool {
k := map[string]struct{}{}
for _, invoker := range otherInvokers {
k[invoker.GetURL().String()] = struct{}{}
}
for _, invoker := range invokers {
_, ok := k[invoker.GetURL().String()]
if !ok {
return false
}
delete(k, invoker.GetURL().String())
}
return len(k) == 0
}

0 comments on commit 8747aa9

Please sign in to comment.