Skip to content

Commit

Permalink
xds framework util
Browse files Browse the repository at this point in the history
Signed-off-by: daizhenyu <1449308021@qq.com>
  • Loading branch information
daizhenyu committed Aug 30, 2024
1 parent 378b8e7 commit ecb7a61
Show file tree
Hide file tree
Showing 10 changed files with 1,053 additions and 94 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Copyright (C) 2024-2024 Sermant Authors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.sermant.implement.service.xds.utils;

import io.envoyproxy.envoy.config.cluster.v3.Cluster;
import io.envoyproxy.envoy.config.cluster.v3.Cluster.LbPolicy;
import io.sermant.core.service.xds.entity.XdsCluster;
import io.sermant.core.service.xds.entity.XdsLbPolicy;
import io.sermant.core.service.xds.entity.XdsServiceCluster;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

/**
* Convert cds protocol data to Sermant data model
*
* @author daizhenyu
* @since 2024-05-10
**/
public class CdsProtocolTransformer {
private static final int CLUSTER_SUBSET_INDEX = 2;

private static final String VERTICAL_LINE_SEPARATOR = "\\|";

private CdsProtocolTransformer() {
}

/**
* get the mapping between service name of k8s and cluster of istio
*
* @param clusters clusters
* @return XdsServiceCluster map
*/
public static Map<String, XdsServiceCluster> getServiceClusters(List<Cluster> clusters) {
Map<String, Set<XdsCluster>> xdsClusters = clusters.stream()
.filter(Objects::nonNull)
.map(cluster -> parseCluster(cluster))
.filter(xdsCluster -> xdsCluster.getServiceName() != null)
.collect(Collectors.groupingBy(
XdsCluster::getServiceName,
Collectors.toSet()
));
Map<String, XdsServiceCluster> xdsServiceClusterMap = new HashMap<>();
for (Entry<String, Set<XdsCluster>> clusterEntry : xdsClusters.entrySet()) {
XdsServiceCluster serviceCluster = new XdsServiceCluster();
serviceCluster.setBaseClusterName(getServiceBaseClusterName(clusterEntry.getValue()));
Map<String, XdsCluster> clusterMap = clusterEntry.getValue().stream()
.collect(Collectors.toMap(
XdsCluster::getClusterName,
xdsCluster -> xdsCluster
));
serviceCluster.setClusters(clusterMap);
xdsServiceClusterMap.put(clusterEntry.getKey(), serviceCluster);
}
return xdsServiceClusterMap;
}

private static XdsCluster parseCluster(Cluster cluster) {
XdsCluster xdsCluster = new XdsCluster();
Optional<String> serviceNameFromCluster = XdsCommonUtils.getServiceNameFromCluster(cluster.getName());
if (!serviceNameFromCluster.isPresent()) {
return xdsCluster;
}
xdsCluster.setClusterName(cluster.getName());
xdsCluster.setServiceName(serviceNameFromCluster.get());
xdsCluster.setLocalityLb(cluster.getCommonLbConfig().hasLocalityWeightedLbConfig());
xdsCluster.setLbPolicy(parseClusterLbPolicy(cluster.getLbPolicy()));
return xdsCluster;
}

private static String getServiceBaseClusterName(Set<XdsCluster> xdsClusters) {
for (XdsCluster cluster : xdsClusters) {
String clusterName = cluster.getClusterName();
String[] splitCluster = clusterName.split(VERTICAL_LINE_SEPARATOR);
if (splitCluster[CLUSTER_SUBSET_INDEX].equals("")) {
return clusterName;
}
}
return "";
}

private static XdsLbPolicy parseClusterLbPolicy(LbPolicy lbPolicy) {
if (lbPolicy == LbPolicy.RANDOM) {
return XdsLbPolicy.RANDOM;
}
if (lbPolicy == LbPolicy.ROUND_ROBIN) {
return XdsLbPolicy.ROUND_ROBIN;
}
if (lbPolicy == LbPolicy.LEAST_REQUEST) {
return XdsLbPolicy.LEAST_REQUEST;
}
if (lbPolicy == LbPolicy.RING_HASH) {
return XdsLbPolicy.RING_HASH;
}
if (lbPolicy == LbPolicy.MAGLEV) {
return XdsLbPolicy.MAGLEV;
}
return XdsLbPolicy.UNRECOGNIZED;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,112 +16,104 @@

package io.sermant.implement.service.xds.utils;

import io.envoyproxy.envoy.config.cluster.v3.Cluster;
import io.envoyproxy.envoy.config.core.v3.HealthStatus;
import io.envoyproxy.envoy.config.core.v3.Locality;
import io.envoyproxy.envoy.config.core.v3.SocketAddress;
import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment;
import io.envoyproxy.envoy.config.endpoint.v3.LbEndpoint;
import io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints;
import io.sermant.core.service.xds.entity.ServiceInstance;
import io.sermant.core.service.xds.entity.XdsClusterLoadAssigment;
import io.sermant.core.service.xds.entity.XdsLocality;
import io.sermant.core.service.xds.entity.XdsServiceClusterLoadAssigment;
import io.sermant.core.utils.CollectionUtils;
import io.sermant.core.utils.StringUtils;
import io.sermant.implement.service.xds.entity.XdsServiceInstance;

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

/**
* Convert xDS protocol data to Sermant data model
* Convert eds protocol data to Sermant data model
*
* @author daizhenyu
* @since 2024-05-10
**/
public class XdsProtocolTransformer {
private static final int SERVICE_HOST_INDEX = 3;
public class EdsProtocolTransformer {
private static final int CLUSTER_SUBSET_INDEX = 2;

private static final int SERVICE_NAME_INDEX = 0;
private static final String VERTICAL_LINE_SEPARATOR = "\\|";

private static final int EXPECT_LENGTH = 4;

private XdsProtocolTransformer() {
}

/**
* get the mapping between service name of k8s and cluster of istio
*
* @param clusters clusters
* @return mapping
*/
public static Map<String, Set<String>> getService2ClusterMapping(List<Cluster> clusters) {
Map<String, Set<String>> nameMapping = new HashMap<>();
for (Cluster cluster : clusters) {
if (cluster == null) {
continue;
}
Optional<String> serviceNameFromCluster = getServiceNameFromCluster(cluster.getName());
if (!serviceNameFromCluster.isPresent()) {
continue;
}
String serviceName = serviceNameFromCluster.get();
nameMapping.computeIfAbsent(serviceName, key -> new HashSet<>()).add(cluster.getName());
}
return nameMapping;
private EdsProtocolTransformer() {
}

/**
* get the instance of one service by xds protocol
* get service instances by xds protocol
*
* @param loadAssignments eds data
* @return instance of service
* @return instances of service
*/
public static Set<ServiceInstance> getServiceInstances(
public static XdsServiceClusterLoadAssigment getServiceInstances(
List<ClusterLoadAssignment> loadAssignments) {
return loadAssignments.stream()
XdsServiceClusterLoadAssigment serviceClusterLoadAssigment = new XdsServiceClusterLoadAssigment();

Map<String, XdsClusterLoadAssigment> clusterLoadAssigmentMap = loadAssignments.stream()
.filter(Objects::nonNull)
.flatMap(loadAssignment -> getServiceInstancesFromLoadAssignment(loadAssignment).stream())
.collect(Collectors.toSet());
.map(loadAssignment -> parseClusterLoadAssignment(loadAssignment))
.filter(clusterInstance -> clusterInstance.getClusterName() != null)
.collect(Collectors.toMap(XdsClusterLoadAssigment::getClusterName,
clusterInstance -> clusterInstance));
serviceClusterLoadAssigment.setClusterLoadAssigments(clusterLoadAssigmentMap);
serviceClusterLoadAssigment.setBaseClusterName(getServiceBaseClusterName(clusterLoadAssigmentMap));
return serviceClusterLoadAssigment;
}

private static Set<ServiceInstance> getServiceInstancesFromLoadAssignment(ClusterLoadAssignment loadAssignment) {
private static XdsClusterLoadAssigment parseClusterLoadAssignment(ClusterLoadAssignment loadAssignment) {
String clusterName = loadAssignment.getClusterName();
Optional<String> serviceNameOptional = getServiceNameFromCluster(clusterName);
Optional<String> serviceNameOptional = XdsCommonUtils.getServiceNameFromCluster(clusterName);
XdsClusterLoadAssigment xdsClusterLoadAssigment = new XdsClusterLoadAssigment();
if (!serviceNameOptional.isPresent()) {
return Collections.EMPTY_SET;
return xdsClusterLoadAssigment;
}
String serviceName = serviceNameOptional.get();
return processClusterLoadAssignment(loadAssignment, serviceName, clusterName);
xdsClusterLoadAssigment.setClusterName(clusterName);
xdsClusterLoadAssigment.setServiceName(serviceName);
xdsClusterLoadAssigment
.setLocalityInstances(parseLocalityLbEndpointsList(loadAssignment, serviceName, clusterName));
return xdsClusterLoadAssigment;
}

private static Set<ServiceInstance> processClusterLoadAssignment(ClusterLoadAssignment loadAssignment,
private static Map<XdsLocality, Set<ServiceInstance>> parseLocalityLbEndpointsList(
ClusterLoadAssignment loadAssignment,
String serviceName, String clusterName) {
List<LocalityLbEndpoints> localityLbEndpointList = loadAssignment.getEndpointsList();
if (CollectionUtils.isEmpty(localityLbEndpointList)) {
return Collections.EMPTY_SET;
return Collections.EMPTY_MAP;
}
return localityLbEndpointList.stream()
.filter(Objects::nonNull)
.flatMap(localityLbEndpoints -> processLocalityLbEndpoints(localityLbEndpoints, serviceName,
clusterName).stream())
.collect(Collectors.toSet());
.collect(Collectors.toMap(
localityLbEndpoints ->
parseLocality(localityLbEndpoints),
localityLbEndpoints -> parseLocalityLbEndpoints(localityLbEndpoints, serviceName, clusterName)
));
}

private static Set<ServiceInstance> processLocalityLbEndpoints(LocalityLbEndpoints localityLbEndpoints,
private static Set<ServiceInstance> parseLocalityLbEndpoints(LocalityLbEndpoints localityLbEndpoints,
String serviceName, String clusterName) {
List<LbEndpoint> lbEndpointsList = localityLbEndpoints.getLbEndpointsList();
if (CollectionUtils.isEmpty(lbEndpointsList)) {
return Collections.EMPTY_SET;
}
return lbEndpointsList.stream()
.filter(Objects::nonNull)
.map(lbEndpoint -> transformEndpoint2Instance(lbEndpoint, serviceName, clusterName,
.map(lbEndpoint -> parseLbEndpoint(lbEndpoint, serviceName, clusterName,
getInitializedMetadata(localityLbEndpoints)))
.collect(Collectors.toSet());
}
Expand All @@ -137,7 +129,7 @@ private static Map<String, String> getInitializedMetadata(LocalityLbEndpoints lo
return metadata;
}

private static ServiceInstance transformEndpoint2Instance(LbEndpoint endpoint, String serviceName,
private static ServiceInstance parseLbEndpoint(LbEndpoint endpoint, String serviceName,
String clusterName, Map<String, String> metadata) {
XdsServiceInstance instance = new XdsServiceInstance();
SocketAddress socketAddress = endpoint.getEndpoint().getAddress().getSocketAddress();
Expand All @@ -158,16 +150,24 @@ private static ServiceInstance transformEndpoint2Instance(LbEndpoint endpoint, S
return instance;
}

private static Optional<String> getServiceNameFromCluster(String clusterName) {
if (StringUtils.isEmpty(clusterName)) {
return Optional.empty();
private static String getServiceBaseClusterName(Map<String, XdsClusterLoadAssigment> instanceMap) {
for (Entry<String, XdsClusterLoadAssigment> instanceEntry : instanceMap.entrySet()) {
String[] splitCluster = instanceEntry.getKey().split(VERTICAL_LINE_SEPARATOR);
if (splitCluster[CLUSTER_SUBSET_INDEX].equals("")) {
return instanceEntry.getKey();
}
}
return "";
}

// cluster name format: "outbound|8080||xds-service.default.svc.cluster.local", xds-service is service name
String[] clusterSplit = clusterName.split("\\|");
if (clusterSplit.length != EXPECT_LENGTH) {
return Optional.empty();
}
return Optional.of(clusterSplit[SERVICE_HOST_INDEX].split("\\.")[SERVICE_NAME_INDEX]);
private static XdsLocality parseLocality(LocalityLbEndpoints localityLbEndpoints) {
XdsLocality xdsLocality = new XdsLocality();
Locality locality = localityLbEndpoints.getLocality();
xdsLocality.setRegion(locality.getRegion());
xdsLocality.setZone(locality.getZone());
xdsLocality.setSubZone(locality.getSubZone());
xdsLocality.setLocalityPriority(localityLbEndpoints.getPriority());
xdsLocality.setLoadBalanceWeight(localityLbEndpoints.getLoadBalancingWeight().getValue());
return xdsLocality;
}
}
Loading

0 comments on commit ecb7a61

Please sign in to comment.