Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support app connection labels #754

Merged
merged 3 commits into from
May 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion clients/config_client/config_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
"github.com/stretchr/testify/assert"
)

var serverConfigWithOptions = constant.NewServerConfig("mse-xxx-p.nacos-ans.mse.aliyuncs.com", 8848)
var serverConfigWithOptions = constant.NewServerConfig("127.0.0.1", 8848)

var clientConfigWithOptions = constant.NewClientConfig(
constant.WithTimeoutMs(10*1000),
Expand Down Expand Up @@ -81,6 +81,7 @@ func createConfigClientTestTls() *ConfigClient {
_ = nc.SetClientConfig(*clientTLsConfigWithOptions)
_ = nc.SetHttpAgent(&http_agent.HttpAgent{})
client, _ := NewConfigClient(&nc)
client.configProxy = &MockConfigProxy{}
return client
}

Expand All @@ -90,6 +91,7 @@ func createConfigClientCommon() *ConfigClient {
_ = nc.SetClientConfig(*clientConfigWithOptions)
_ = nc.SetHttpAgent(&http_agent.HttpAgent{})
client, _ := NewConfigClient(&nc)
client.configProxy = &MockConfigProxy{}
return client
}

Expand Down
2 changes: 1 addition & 1 deletion clients/config_client/config_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func (cp *ConfigProxy) createRpcClient(ctx context.Context, taskId string, clien
"taskId": taskId,
}

iRpcClient, _ := rpc.CreateClient(ctx, "config-"+taskId+"-"+client.uid, rpc.GRPC, labels, cp.nacosServer, &cp.clientConfig.TLSCfg)
iRpcClient, _ := rpc.CreateClient(ctx, "config-"+taskId+"-"+client.uid, rpc.GRPC, labels, cp.nacosServer, &cp.clientConfig.TLSCfg, cp.clientConfig.AppConnLabels)
rpcClient := iRpcClient.GetRpcClient()
if rpcClient.IsInitialized() {
rpcClient.RegisterServerRequestHandler(func() rpc_request.IRequest {
Expand Down
2 changes: 1 addition & 1 deletion clients/naming_client/naming_grpc/naming_grpc_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func NewNamingGrpcProxy(ctx context.Context, clientCfg constant.ClientConfig, na
constant.LABEL_MODULE: constant.LABEL_MODULE_NAMING,
}

iRpcClient, err := rpc.CreateClient(ctx, uid.String(), rpc.GRPC, labels, srvProxy.nacosServer, &clientCfg.TLSCfg)
iRpcClient, err := rpc.CreateClient(ctx, uid.String(), rpc.GRPC, labels, srvProxy.nacosServer, &clientCfg.TLSCfg, clientCfg.AppConnLabels)
if err != nil {
return nil, err
}
Expand Down
6 changes: 6 additions & 0 deletions common/constant/client_config_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,3 +226,9 @@ func WithTLS(tlsCfg TLSConfig) ClientOption {
config.TLSCfg = tlsCfg
}
}

func WithAppConnLabels(appConnLabels map[string]string) ClientOption {
return func(config *ClientConfig) {
config.AppConnLabels = appConnLabels
}
}
1 change: 1 addition & 0 deletions common/constant/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type ClientConfig struct {
EndpointContextPath string // the address server endpoint contextPath
EndpointQueryParams string // the address server endpoint query params
ClusterName string // the address server clusterName
AppConnLabels map[string]string // app conn labels
}

type ClientLogSamplingConfig struct {
Expand Down
105 changes: 104 additions & 1 deletion common/remote/rpc/rpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@ package rpc

import (
"context"
"fmt"
"math"
"os"
"reflect"
"strings"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -147,24 +150,124 @@ func getClient(clientName string) IRpcClient {
return clientMap[clientName]
}

func CreateClient(ctx context.Context, clientName string, connectionType ConnectionType, labels map[string]string, nacosServer *nacos_server.NacosServer, tlsConfig *constant.TLSConfig) (IRpcClient, error) {
func CreateClient(ctx context.Context, clientName string, connectionType ConnectionType, labels map[string]string, nacosServer *nacos_server.NacosServer, tlsConfig *constant.TLSConfig, appConnLabels map[string]string) (IRpcClient, error) {
cMux.Lock()
defer cMux.Unlock()
if _, ok := clientMap[clientName]; !ok {
logger.Infof("init rpc client for name ", clientName)
var rpcClient IRpcClient
if GRPC == connectionType {
rpcClient = NewGrpcClient(ctx, clientName, nacosServer, tlsConfig)
}
if rpcClient == nil {
return nil, errors.New("unsupported connection type")
}

logger.Infof("get app conn labels from client config %v ", appConnLabels)
appConnLabelsEnv := getAppLabelsFromEnv()
logger.Infof("get app conn labels from env %v ", appConnLabelsEnv)

appConnLabelsFinal := mergerAppLabels(appConnLabels, appConnLabelsEnv)
logger.Infof("final app conn labels : %v ", appConnLabelsFinal)

appConnLabelsFinal = addPrefixForEachKey(appConnLabelsFinal, "app_")
if len(appConnLabelsFinal) != 0 {
rpcClient.putAllLabels(appConnLabelsFinal)
}

rpcClient.putAllLabels(labels)
clientMap[clientName] = rpcClient
return rpcClient, nil
}
return clientMap[clientName], nil
}

func mergerAppLabels(appLabelsAppointed map[string]string, appLabelsEnv map[string]string) map[string]string {
preferred := strings.ToLower(os.Getenv("nacos_app_conn_labels_preferred"))

var preferFirst bool
if preferred != "env" {
preferFirst = true
} else {
preferFirst = false
}
return mergeMaps(appLabelsAppointed, appLabelsEnv, preferFirst)
}

func mergeMaps(map1, map2 map[string]string, preferFirst bool) map[string]string {
result := make(map[string]string, 8)

for k, v := range map1 {
result[k] = v
}

for k, v := range map2 {
_, ok := map1[k]
if preferFirst && ok {
continue
}
result[k] = v
}

return result
}

func getAppLabelsFromEnv() map[string]string {
configMap := make(map[string]string, 8)

// nacos_config_gray_label
grayLabel := os.Getenv("nacos_config_gray_label")
if grayLabel != "" {
configMap["nacos_config_gray_label"] = grayLabel
}

// nacos_app_conn_labels
connLabels := os.Getenv("nacos_app_conn_labels")
if connLabels != "" {
labelsMap := parseLabels(connLabels)
for k, v := range labelsMap {
configMap[k] = v
}
}

return configMap
}

func parseLabels(rawLabels string) map[string]string {
if strings.TrimSpace(rawLabels) == "" {
return make(map[string]string, 2)
}

resultMap := make(map[string]string, 2)
labels := strings.Split(rawLabels, ",")
for _, label := range labels {
if strings.TrimSpace(label) != "" {
kv := strings.Split(label, "=")
if len(kv) == 2 {
resultMap[strings.TrimSpace(kv[0])] = strings.TrimSpace(kv[1])
} else {
fmt.Println("unknown label format:", label)
}
}
}
return resultMap
}

func addPrefixForEachKey(m map[string]string, prefix string) map[string]string {
if len(m) == 0 {
return m
}

newMap := make(map[string]string, len(m))
for k, v := range m {
if strings.TrimSpace(k) != "" {
newKey := prefix + k
newMap[newKey] = v
}
}
return newMap
}

func (r *RpcClient) Start() {
if ok := atomic.CompareAndSwapInt32((*int32)(&r.rpcClientStatus), (int32)(INITIALIZED), (int32)(STARTING)); !ok {
return
Expand Down
Loading