Skip to content

Commit

Permalink
feat: 调整节点查询时过滤位置
Browse files Browse the repository at this point in the history
  • Loading branch information
如漫 committed Apr 11, 2024
1 parent 24e81b8 commit 7287cf8
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 42 deletions.
40 changes: 40 additions & 0 deletions clients/naming_client/naming_cache/service_info_holder.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package naming_cache

import (
"fmt"
"os"
"reflect"
"sort"
Expand Down Expand Up @@ -83,6 +84,45 @@ func (s *ServiceInfoHolder) ProcessService(service *model.Service) {
}
}

// 灰度节点过滤
//过滤不符合当前节点标签的实例
tag := os.Getenv("ALICLOUD_SERVICE_TAG")
if tag == "" {
tag = "base"
}
fmt.Printf("[NamingGrpcProxy.Subscribe] instance tag: %v\n", tag)
tagMapList := make([]model.Instance, 0) //标签节点列表
backUpMapList := make([]model.Instance, 0) //普通节点列表

for _, host := range service.Hosts {
// 如果没有metadata, 认为是普通实例
if host.Metadata == nil {
backUpMapList = append(backUpMapList, host)
continue
}

instanceTag, ok := host.Metadata["alicloud.service.tag"]
fmt.Printf("[NamingGrpcProxy.Subscribe] host: %v, metadata : %v\n", host, instanceTag)
if !ok || instanceTag == "base" || instanceTag == "" { //普通节点,加入到backUp列表中
backUpMapList = append(backUpMapList, host)
continue
}

if instanceTag == tag {
tagMapList = append(tagMapList, host)
}
}

if tag != "base" && len(tagMapList) != 0 {
fmt.Printf("[NamingGrpcProxy.Subscribe] change host list, tag: %v, list: %v\n", tag, tagMapList)
service.Hosts = tagMapList
}
if (tag == "base" || tag == "") && len(backUpMapList) != 0 {
fmt.Printf("[NamingGrpcProxy.Subscribe] change host list, tag: %v, list: %v\n", tag, backUpMapList)
service.Hosts = backUpMapList
}

//继续后续处理
cacheKey := util.GetServiceCacheKey(util.GetGroupName(service.Name, service.GroupName), service.Clusters)
oldDomain, ok := s.ServiceInfoMap.Load(cacheKey)
if ok && oldDomain.(model.Service).LastRefTime >= service.LastRefTime {
Expand Down
42 changes: 0 additions & 42 deletions clients/naming_client/naming_grpc/naming_grpc_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package naming_grpc

import (
"context"
"fmt"
"os"
"time"

Expand Down Expand Up @@ -218,47 +217,6 @@ func (proxy *NamingGrpcProxy) Subscribe(serviceName, groupName string, clusters
return model.Service{}, err
}
subscribeServiceResponse := response.(*rpc_response.SubscribeServiceResponse)

//过滤不符合当前节点标签的实例
if len(subscribeServiceResponse.ServiceInfo.Hosts) != 0 {
tag := os.Getenv("ALICLOUD_SERVICE_TAG")
if tag == "" {
tag = "base"
}
fmt.Printf("[NamingGrpcProxy.Subscribe] instance tag: %v\n", tag)
tagMapList := make([]model.Instance, 0) //标签节点列表
backUpMapList := make([]model.Instance, 0) //普通节点列表

for _, host := range subscribeServiceResponse.ServiceInfo.Hosts {
// 如果没有metadata, 认为是普通实例
if host.Metadata == nil {
backUpMapList = append(backUpMapList, host)
continue
}

instanceTag, ok := host.Metadata["alicloud.service.tag"]
fmt.Printf("[NamingGrpcProxy.Subscribe] host: %v, metadata : %v\n", host, instanceTag)
if !ok || instanceTag == "base" || instanceTag == "" { //普通节点,加入到backUp列表中
backUpMapList = append(backUpMapList, host)
continue
}

if instanceTag == tag {
tagMapList = append(tagMapList, host)
}
}

if tag != "base" && len(tagMapList) != 0 {
fmt.Printf("[NamingGrpcProxy.Subscribe] change host list, tag: %v, list: %v\n", tag, tagMapList)
subscribeServiceResponse.ServiceInfo.Hosts = tagMapList
}
if (tag == "base" || tag == "") && len(backUpMapList) != 0 {
fmt.Printf("[NamingGrpcProxy.Subscribe] change host list, tag: %v, list: %v\n", tag, tagMapList)
subscribeServiceResponse.ServiceInfo.Hosts = backUpMapList
}
}

fmt.Printf("[NamingGrpcProxy.Subscribe] final service info: %v\n", subscribeServiceResponse.ServiceInfo)
return subscribeServiceResponse.ServiceInfo, nil
}

Expand Down
49 changes: 49 additions & 0 deletions example/test_client/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package main

import (
"fmt"
"github.com/nacos-group/nacos-sdk-go/v2/clients"
"github.com/nacos-group/nacos-sdk-go/v2/common/constant"
"github.com/nacos-group/nacos-sdk-go/v2/vo"
"time"
)

func main() {
clientConfig := constant.ClientConfig{
NamespaceId: "public", // 当存在多个 Namespace 时填写对应 Namespace ID,否则使用 public
TimeoutMs: 5000,
NotLoadCacheAtStart: true,
LogDir: "data/nacos/log",
CacheDir: "data/nacos/cache",
LogLevel: "debug",
}

serverConfig := []constant.ServerConfig{
{
IpAddr: "mse-a49bd920-p.nacos-ans.mse.aliyuncs.com", // Nacos 服务的IP地址
Port: 8848,
},
}

var err error
namingClient, err := clients.NewNamingClient(
vo.NacosClientParam{
ServerConfigs: serverConfig,
ClientConfig: &clientConfig})
if err != nil {
panic(err)
}

for {
instance, err := namingClient.SelectOneHealthyInstance(vo.SelectOneHealthInstanceParam{
ServiceName: "gin-server-a.default.svc.cluster.local",
})
if err != nil {
fmt.Printf("select one healthy instance err: %v\n", err)
} else {
fmt.Printf("selected instance info: %v\n", instance)
}

time.Sleep(time.Second * 5)
}
}

0 comments on commit 7287cf8

Please sign in to comment.