Skip to content

Commit

Permalink
Support multiple streams per node to comply with A71 proposal
Browse files Browse the repository at this point in the history
Each node can have multiple streams per target listener now. We need to
make sure we keep track of all of the and update snapshoy accorningly,
while we keep supporting multiple targets per stream to be backwards
compatible.
  • Loading branch information
ffilippopoulos committed Sep 17, 2024
1 parent 93e8c21 commit 8903541
Show file tree
Hide file tree
Showing 3 changed files with 187 additions and 103 deletions.
259 changes: 170 additions & 89 deletions xds/snapshotter.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,41 +63,43 @@ type Snapshotter struct {

// Node keeps the info for a node
type Node struct {
address string
resources *NodeSnapshotResources
}

// NodeSnapshot keeps resources and versions to help snapshotting per node
type NodeSnapshotResources struct {
services map[string][]types.Resource
servicesNames map[string][]string
address string
resources map[int64]*NodeSnapshotResources // map of node resources to snap per stream
serviceSnapVersion int32
endpoints map[string][]types.Resource
endpointsNames map[string][]string
endpointsSnapVersion int32
}

// Deep copy function for NodeSnapshotResources
func deepCopyNodeSnapshotResources(src *NodeSnapshotResources) *NodeSnapshotResources {
dst := &NodeSnapshotResources{
services: make(map[string][]types.Resource),
servicesNames: make(map[string][]string),
serviceSnapVersion: src.serviceSnapVersion,
endpoints: make(map[string][]types.Resource),
endpointsNames: make(map[string][]string),
endpointsSnapVersion: src.endpointsSnapVersion,
}
for k, v := range src.services {
dst.services[k] = append(dst.services[k], v...)
}
for k, v := range src.servicesNames {
dst.servicesNames[k] = append(dst.servicesNames[k], v...)
}
for k, v := range src.endpoints {
dst.endpoints[k] = append(dst.endpoints[k], v...)
}
for k, v := range src.endpointsNames {
dst.endpointsNames[k] = append(dst.endpointsNames[k], v...)
// NodeSnapshot keeps resources and versions to help snapshotting per node
type NodeSnapshotResources struct {
services map[string][]types.Resource
servicesNames map[string][]string
endpoints map[string][]types.Resource
endpointsNames map[string][]string
}

// Deep copy function for Node resources
func deepCopyNodeResources(src map[int64]*NodeSnapshotResources) map[int64]*NodeSnapshotResources {
dst := make(map[int64]*NodeSnapshotResources)
for sID, resources := range src {
r := &NodeSnapshotResources{
services: make(map[string][]types.Resource),
servicesNames: make(map[string][]string),
endpoints: make(map[string][]types.Resource),
endpointsNames: make(map[string][]string),
}
for k, v := range resources.services {
r.services[k] = append(r.services[k], v...)
}
for k, v := range resources.servicesNames {
r.servicesNames[k] = append(r.servicesNames[k], v...)
}
for k, v := range resources.endpoints {
r.endpoints[k] = append(r.endpoints[k], v...)
}
for k, v := range resources.endpointsNames {
r.endpointsNames[k] = append(r.endpointsNames[k], v...)
}
dst[sID] = r
}
return dst
}
Expand Down Expand Up @@ -202,9 +204,11 @@ func (s *Snapshotter) SnapServices(serviceStore XdsServiceStore) error {
s.nodes.Range(func(nID, n interface{}) bool {
nodeID := nID.(string)
node := n.(*Node)
for typeURL, resources := range node.resources.servicesNames {
if err := s.updateNodeServiceSnapshotResources(nodeID, typeURL, resources); err != nil {
log.Logger.Error("Failed to update service resources before snapping", "type", typeURL, "node", nodeID, "resources", resources, "error", err)
for sID, res := range node.resources {
for typeURL, resources := range res.servicesNames {
if err := s.updateNodeServiceSnapshotResources(nodeID, typeURL, sID, resources); err != nil {
log.Logger.Error("Failed to update service resources before snapping", "type", typeURL, "node", nodeID, "resources", resources, "error", err)
}
}
}
if err := s.nodeServiceSnapshot(nodeID); err != nil {
Expand Down Expand Up @@ -247,9 +251,11 @@ func (s *Snapshotter) SnapEndpoints(endpointStore XdsEndpointStore) error {
s.nodes.Range(func(nID, n interface{}) bool {
nodeID := nID.(string)
node := n.(*Node)
for typeURL, resources := range node.resources.endpointsNames {
if err := s.updateNodeEndpointsSnapshotResources(nodeID, typeURL, resources); err != nil {
log.Logger.Error("Failed to update endpoints resources before snapping", "type", typeURL, "node", nodeID, "resources", resources, "error", err)
for sID, res := range node.resources {
for typeURL, resources := range res.endpointsNames {
if err := s.updateNodeEndpointsSnapshotResources(nodeID, typeURL, sID, resources); err != nil {
log.Logger.Error("Failed to update endpoints resources before snapping", "type", typeURL, "node", nodeID, "resources", resources, "error", err)
}
}
}
if err := s.nodeEndpointsSnapshot(nodeID); err != nil {
Expand Down Expand Up @@ -277,7 +283,7 @@ func (s *Snapshotter) OnStreamOpen(ctx context.Context, id int64, typ string) er
func (s *Snapshotter) OnStreamClosed(id int64, node *core.Node) {
log.Logger.Info("OnStreamClosed", "id", id, "node", node)
s.streams.Delete(id)
go s.deleteNode(node.GetId())
s.deleteNodeStream(node.GetId(), id)
metricOnStreamClosedInc()
}

Expand Down Expand Up @@ -310,11 +316,10 @@ func (s *Snapshotter) OnStreamRequest(id int64, r *discovery.DiscoveryRequest) e
log.Logger.Info("Client using empty string as node id", "client", stream.peerAddress)
return nil
}
// Trim federation prefix for Listener requests

s.addNewNode(r.GetNode().GetId(), stream.peerAddress)
if s.needToUpdateSnapshot(r.GetNode().GetId(), r.GetTypeUrl(), r.GetResourceNames()) {
if err := s.updateNodeSnapshot(r.GetNode().GetId(), r.GetTypeUrl(), r.GetResourceNames()); err != nil {
s.addOrUpdateNode(r.GetNode().GetId(), stream.peerAddress, id)
if s.needToUpdateSnapshot(r.GetNode().GetId(), r.GetTypeUrl(), id, r.GetResourceNames()) {
if err := s.updateStreamNodeResources(r.GetNode().GetId(), r.GetTypeUrl(), id, r.GetResourceNames()); err != nil {
return err
}
}
Expand Down Expand Up @@ -358,38 +363,87 @@ func (s *Snapshotter) OnFetchResponse(req *discovery.DiscoveryRequest, resp *dis
log.Logger.Info("OnFetchResponse")
}

// addNewNode adds a new node with empty resources to the nodes map. It uses the
// nodeID to determine whether a new addition is needed. Will do nothing for existing nodes
func (s *Snapshotter) addNewNode(nodeID, address string) {
if _, ok := s.nodes.Load(nodeID); !ok {
func makeEmptyNodeResources() *NodeSnapshotResources {
services := map[string][]types.Resource{
resource.ClusterType: []types.Resource{},
resource.ListenerType: []types.Resource{},
resource.RouteType: []types.Resource{},
}
servicesNames := map[string][]string{
resource.ClusterType: []string{},
resource.ListenerType: []string{},
resource.RouteType: []string{},
}
enspointsResources := map[string][]types.Resource{
resource.EndpointType: []types.Resource{},
}
endpointsNames := map[string][]string{
resource.EndpointType: []string{},
}
return &NodeSnapshotResources{
services: services,
servicesNames: servicesNames,
endpoints: enspointsResources,
endpointsNames: endpointsNames,
}
}

// addOrUpdateNode will add a new node if not present, or update the node's per
// stream resources
func (s *Snapshotter) addOrUpdateNode(nodeID, address string, streamID int64) {
n, ok := s.nodes.Load(nodeID)
if !ok {
// Node not found, add a new one without any resources
log.Logger.Info("Node cache not found, initialising", "node", nodeID)
services := map[string][]types.Resource{
resource.ClusterType: []types.Resource{},
resource.ListenerType: []types.Resource{},
resource.RouteType: []types.Resource{},
}
servicesNames := map[string][]string{
resource.ClusterType: []string{},
resource.ListenerType: []string{},
resource.RouteType: []string{},
}
enspointsResources := map[string][]types.Resource{
resource.EndpointType: []types.Resource{},
}
endpointsNames := map[string][]string{
resource.EndpointType: []string{},
}
s.nodes.Store(nodeID, &Node{
address: address,
resources: &NodeSnapshotResources{
services: services,
servicesNames: servicesNames,
endpoints: enspointsResources,
endpointsNames: endpointsNames,
resources: map[int64]*NodeSnapshotResources{
streamID: makeEmptyNodeResources(),
},
})
return
}
node := n.(*Node)
nodeResources := deepCopyNodeResources(node.resources)
for sID, _ := range nodeResources {
if sID == streamID {
return // Stream already know for node
}
}
log.Logger.Info("New stream for node", "node", nodeID, "streamID", streamID)
nodeResources[streamID] = makeEmptyNodeResources()
updatedNode := &Node{
address: node.address,
resources: nodeResources,
}
s.nodes.Store(nodeID, updatedNode)
}

// deleteNodeStream removes a stream from a node's resources and if the list of streams is
// empty deletes the node
func (s *Snapshotter) deleteNodeStream(nodeID string, streamID int64) {
if nodeID == EmptyNodeID {
return
}
n, ok := s.nodes.Load(nodeID)
if !ok {
log.Logger.Warn("Tied to delete stream for non existing node")
return
}
node := n.(*Node)
nodeResources := deepCopyNodeResources(node.resources)
delete(nodeResources, streamID)
// if no more streams are open, delete the node
if len(nodeResources) == 0 {
s.deleteNode(nodeID)
return
}
// else just update the node
updatedNode := &Node{
address: node.address,
resources: nodeResources,
}
s.nodes.Store(nodeID, updatedNode)
}

// deleteNode removes a node from nodes map and clears existing snaphots for the node
Expand Down Expand Up @@ -441,21 +495,24 @@ func (s *Snapshotter) getResourcesFromCache(typeURL string, resources []string)

// updateNodeServiceSnapshotResources goes through the full snapshot and copies resources in the respective
// node resources struct
func (s *Snapshotter) updateNodeServiceSnapshotResources(nodeID, typeURL string, resources []string) error {
func (s *Snapshotter) updateNodeServiceSnapshotResources(nodeID, typeURL string, streamID int64, resources []string) error {
n, ok := s.nodes.Load(nodeID)
if !ok {
return fmt.Errorf("Cannot update service snapshot resources, node: %s not found", nodeID)
}
node := n.(*Node)
nodeResources := deepCopyNodeSnapshotResources(node.resources)
nodeResources := deepCopyNodeResources(node.resources)
if _, ok := nodeResources[streamID]; !ok {
return fmt.Errorf("Cannot find service resources to update for node: %s in stream: %d context", nodeID, streamID)
}

newSnapResources, err := s.getResourcesFromCache(typeURL, resources)
if err != nil {
return fmt.Errorf("Cannot get resources from cache: %s", err)
}

nodeResources.services[typeURL] = newSnapResources
nodeResources.servicesNames[typeURL] = resources
nodeResources[streamID].services[typeURL] = newSnapResources
nodeResources[streamID].servicesNames[typeURL] = resources
updatedNode := &Node{
address: node.address,
resources: nodeResources,
Expand All @@ -466,21 +523,24 @@ func (s *Snapshotter) updateNodeServiceSnapshotResources(nodeID, typeURL string,

// updateNodeEndpointsSnapshotResources goes through the full snapshot and copies resources in the respective
// node resources struct
func (s *Snapshotter) updateNodeEndpointsSnapshotResources(nodeID, typeURL string, resources []string) error {
func (s *Snapshotter) updateNodeEndpointsSnapshotResources(nodeID, typeURL string, streamID int64, resources []string) error {
n, ok := s.nodes.Load(nodeID)
if !ok {
return fmt.Errorf("Cannot update service snapshot resources, node: %s not found", nodeID)
return fmt.Errorf("Cannot update endpoint snapshot resources, node: %s not found", nodeID)
}
node := n.(*Node)
nodeResources := deepCopyNodeSnapshotResources(node.resources)
nodeResources := deepCopyNodeResources(node.resources)
if _, ok := nodeResources[streamID]; !ok {
return fmt.Errorf("Cannot find endpoint resources to update for node: %s in stream: %d context", nodeID, streamID)
}

newSnapResources, err := s.getResourcesFromCache(typeURL, resources)
if err != nil {
return fmt.Errorf("Cannot get resources from cache: %s", err)
}

nodeResources.endpoints[typeURL] = newSnapResources
nodeResources.endpointsNames[typeURL] = resources
nodeResources[streamID].endpoints[typeURL] = newSnapResources
nodeResources[streamID].endpointsNames[typeURL] = resources
updatedNode := &Node{
address: node.address,
resources: nodeResources,
Expand All @@ -497,8 +557,16 @@ func (s *Snapshotter) nodeServiceSnapshot(nodeID string) error {
return fmt.Errorf("Cannot create a new snapshot, node: %s not found", nodeID)
}
node := n.(*Node)
atomic.AddInt32(&node.resources.serviceSnapVersion, 1)
snapshot, err := cache.NewSnapshot(fmt.Sprint(node.resources.serviceSnapVersion), node.resources.services)
atomic.AddInt32(&node.serviceSnapVersion, 1)
aggrServices := map[string][]types.Resource{}
for _, r := range node.resources {
for typeUrl, resources := range r.services {
for _, resource := range resources {
aggrServices[typeUrl] = append(aggrServices[typeUrl], resource)
}
}
}
snapshot, err := cache.NewSnapshot(fmt.Sprint(node.serviceSnapVersion), aggrServices)
if err != nil {
return err
}
Expand All @@ -513,25 +581,33 @@ func (s *Snapshotter) nodeEndpointsSnapshot(nodeID string) error {
return fmt.Errorf("Cannot create a new snapshot, node: %s not found", nodeID)
}
node := n.(*Node)
atomic.AddInt32(&node.resources.endpointsSnapVersion, 1)
snapshot, err := cache.NewSnapshot(fmt.Sprint(node.resources.endpointsSnapVersion), node.resources.endpoints)
atomic.AddInt32(&node.endpointsSnapVersion, 1)
aggrEndpoints := map[string][]types.Resource{}
for _, r := range node.resources {
for typeUrl, resources := range r.endpoints {
for _, resource := range resources {
aggrEndpoints[typeUrl] = append(aggrEndpoints[typeUrl], resource)
}
}
}
snapshot, err := cache.NewSnapshot(fmt.Sprint(node.endpointsSnapVersion), aggrEndpoints)
if err != nil {
return err
}
return s.endpointsCache.SetSnapshot(ctx, nodeID, snapshot)
}

// updateNodeSnapshot will update the node snapshot for the requested resources type based on
// data found in the full resources snapshot
func (s *Snapshotter) updateNodeSnapshot(nodeID, typeURL string, resources []string) error {
// updateStreamNodeResources will update the resources tracked for the node inside a streams context and
// trigger a new snapshot
func (s *Snapshotter) updateStreamNodeResources(nodeID, typeURL string, streamID int64, resources []string) error {
if mapTypeURL(typeURL) == "services" {
if err := s.updateNodeServiceSnapshotResources(nodeID, typeURL, resources); err != nil {
if err := s.updateNodeServiceSnapshotResources(nodeID, typeURL, streamID, resources); err != nil {
return err
}
return s.nodeServiceSnapshot(nodeID)
}
if mapTypeURL(typeURL) == "endpoints" {
if err := s.updateNodeEndpointsSnapshotResources(nodeID, typeURL, resources); err != nil {
if err := s.updateNodeEndpointsSnapshotResources(nodeID, typeURL, streamID, resources); err != nil {
return err
}
return s.nodeEndpointsSnapshot(nodeID)
Expand All @@ -540,18 +616,23 @@ func (s *Snapshotter) updateNodeSnapshot(nodeID, typeURL string, resources []str
}

// needToUpdateSnapshot checks id a node snapshot needs updating based on the requested resources
// from the client
func (s *Snapshotter) needToUpdateSnapshot(nodeID, typeURL string, resources []string) bool {
// from the client inside a streams context
func (s *Snapshotter) needToUpdateSnapshot(nodeID, typeURL string, streamID int64, resources []string) bool {
n, ok := s.nodes.Load(nodeID)
if !ok {
return false
}
node := n.(*Node)
sNodeResources, ok := node.resources[streamID]
if !ok {
log.Logger.Warn("Cannot check if snapshot needs updating, strema not found", "id", streamID)
return false
}
if mapTypeURL(typeURL) == "services" {
return !resourcesMatch(node.resources.servicesNames[typeURL], resources)
return !resourcesMatch(sNodeResources.servicesNames[typeURL], resources)
}
if mapTypeURL(typeURL) == "endpoints" {
return !resourcesMatch(node.resources.endpointsNames[typeURL], resources)
return !resourcesMatch(sNodeResources.endpointsNames[typeURL], resources)
}
return false
}
Expand Down
Loading

0 comments on commit 8903541

Please sign in to comment.