Skip to content

Commit

Permalink
fix: deduplicate OpenAPI definitions for GVKParser
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Crenshaw <350466+crenshaw-dev@users.noreply.github.com>
  • Loading branch information
crenshaw-dev committed Jun 25, 2024
1 parent 83ce6ca commit f4b2a47
Show file tree
Hide file tree
Showing 3 changed files with 173 additions and 27 deletions.
12 changes: 2 additions & 10 deletions pkg/cache/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -707,11 +707,7 @@ func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo
err = runSynced(&c.lock, func() error {
openAPISchema, gvkParser, err := c.kubectl.LoadOpenAPISchema(c.config)
if err != nil {
e, ok := err.(*kube.CreateGVKParserError)
if !ok {
return err
}
c.log.Info("warning loading openapi schema: %s", e)
return err
}
if gvkParser != nil {
c.gvkParser = gvkParser
Expand Down Expand Up @@ -821,11 +817,7 @@ func (c *clusterCache) sync() error {

openAPISchema, gvkParser, err := c.kubectl.LoadOpenAPISchema(config)
if err != nil {
e, ok := err.(*kube.CreateGVKParserError)
if !ok {
return err
}
c.log.Info("warning loading openapi schema: %s", e.Error())
return err
}

if gvkParser != nil {
Expand Down
25 changes: 8 additions & 17 deletions pkg/utils/kube/ctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,20 +118,6 @@ func isSupportedVerb(apiResource *metav1.APIResource, verb string) bool {
return false
}

type CreateGVKParserError struct {
err error
}

func NewCreateGVKParserError(err error) *CreateGVKParserError {
return &CreateGVKParserError{
err: err,
}
}

func (e *CreateGVKParserError) Error() string {
return fmt.Sprintf("error creating gvk parser: %s", e.err)
}

// LoadOpenAPISchema will load all existing resource schemas from the cluster
// and return:
// - openapi.Resources: used for getting the proto.Schema from a GVK
Expand All @@ -148,17 +134,17 @@ func (k *KubectlCmd) LoadOpenAPISchema(config *rest.Config) (openapi.Resources,
if err != nil {
return nil, nil, fmt.Errorf("error getting openapi resources: %s", err)
}
gvkParser, err := newGVKParser(oapiGetter)
gvkParser, err := k.newGVKParser(oapiGetter)
if err != nil {
// return a specific error type to allow gracefully handle
// creating GVK Parser bug:
// https://github.com/kubernetes/kubernetes/issues/103597
return oapiResources, nil, NewCreateGVKParserError(err)
return oapiResources, nil, err
}
return oapiResources, gvkParser, nil
}

func newGVKParser(oapiGetter *openapi.CachedOpenAPIGetter) (*managedfields.GvkParser, error) {
func (k *KubectlCmd) newGVKParser(oapiGetter *openapi.CachedOpenAPIGetter) (*managedfields.GvkParser, error) {
doc, err := oapiGetter.OpenAPISchema()
if err != nil {
return nil, fmt.Errorf("error getting openapi schema: %s", err)
Expand All @@ -167,6 +153,11 @@ func newGVKParser(oapiGetter *openapi.CachedOpenAPIGetter) (*managedfields.GvkPa
if err != nil {
return nil, fmt.Errorf("error getting openapi data: %s", err)
}
var warnings []string
models, warnings = newUniqueModels(models)
for _, warning := range warnings {
k.Log.Info(warning)
}
gvkParser, err := managedfields.NewGVKParser(models, false)
if err != nil {
return nil, err
Expand Down
163 changes: 163 additions & 0 deletions pkg/utils/kube/uniqueprotomodels.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
package kube

import (
"fmt"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/kube-openapi/pkg/util/proto"
"sort"
)

/**
The upstream Kubernetes NewGVKParser method causes problems for Argo CD.
https://github.com/kubernetes/apimachinery/blob/eb26334eeb0f769be8f0c5665ff34713cfdec83e/pkg/util/managedfields/gvkparser.go#L73
The function fails in instances where it is probably more desirable for Argo CD to simply ignore the error and move on.
But since the upstream implementation doesn't offer the option to ignore the error, we have to mutate the input to the
function to completely avoid the case that can produce the error.
When encountering the error from NewGVKParser, we used to just set the internal gvkparser instance to nil, log the
error as info, and move on.
But Argo CD increasingly relies on the gvpparser to produce reliable diffs, especially with server-side diffing. And
we're better off with an incorrectly-initialized gvkparser than no gvkparser at all.
To understand why NewGVKParser fails, we need to understand how Kubernetes constructs its OpenAPI models.
Kubernetes contains a built-in OpenAPI document containing the `definitions` for every built-in Kubernetes API. This
document includes shared structs like APIResourceList.
Aggregated APIs produce their own OpenAPI documents, which are merged with the built-in OpenAPI document. The aggregated
API documents generally include all the definitions of all the structs which are used anywhere by the API. This often
includes some of the same structs as the built-in OpenAPI document.
So when Kubernetes constructs the complete OpenAPI document (the one served at /openapi/v2), it merges the built-in
OpenAPI document with the aggregated API OpenAPI documents.
When the aggregator encounters two different definitions for the same struct (as determined by a deep compare), it
appends a `_vX` suffix to the definition name in the OpenAPI document (where X is the count of the number of times the
aggregator has seen the same definition).
This behavior is fine from the perspective of a typical Kubernetes API user. They download the OpenAPI document, they
see that there are two different "opinions" about the structure of a struct, and they can choose which one they want to
rely on.
But Argo CD has to be generic. We need to take the provided OpenAPI document and use it to construct a GVKParser. And
the GVKParser (reasonably) rejects the OpenAPI document if it contains two definitions for the same struct.
So we have to do some work to make the OpenAPI document palatable to the GVKParser. We have to remove the duplicate
definitions. Specifically, we take the first one and log a warning for each subsequent definition with the same gvk.
In practice, this probably generally appears when a common aggregated API was built at a time significantly before the
current Kubernetes version. The most common case is that the metrics server is built against an older version of the
Kubernetes libraries, using old versions of the structs. When the metrics server is updated to use the latest version of
the Kubernetes libraries, the problems go away, because the aggregated API and Kubernetes agree about the shape of the
struct.
Using the first encountered definition is imperfect and could result in unreliable diffs. But it's better than
constructing completely-wrong diffs due to the lack of a gvkparser.
*/

// uniqueModels is a model provider that ensures that no two models share the same gvk. Use newUniqueModels to
// initialize it and enforce uniqueness.
type uniqueModels struct {
models map[string]proto.Schema
}

// LookupModel is public through the interface of Models. It
// returns a visitable schema from the given model name.
// Copied verbatim from here: https://github.com/kubernetes/kube-openapi/blob/b456828f718bab62dc3013d192665eb3d17f8fe9/pkg/util/proto/document.go#L322-L326
func (d *uniqueModels) LookupModel(model string) proto.Schema {
return d.models[model]
}

// Copied verbatim from here: https://github.com/kubernetes/kube-openapi/blob/b456828f718bab62dc3013d192665eb3d17f8fe9/pkg/util/proto/document.go#L328-L337
func (d *uniqueModels) ListModels() []string {
models := []string{}

for model := range d.models {
models = append(models, model)
}

sort.Strings(models)
return models
}

// newUniqueModels returns a new uniqueModels instance and a list of warnings for models that share the same gvk.
func newUniqueModels(models proto.Models) (proto.Models, []string) {
var warnings []string
gvks := map[schema.GroupVersionKind]string{}
um := &uniqueModels{models: map[string]proto.Schema{}}
for _, modelName := range models.ListModels() {
model := models.LookupModel(modelName)
if model == nil {
panic(fmt.Sprintf("ListModels returns a model that can't be looked-up for: %v", modelName))
}
gvkList := parseGroupVersionKind(model)
for _, gvk := range gvkList {
if len(gvk.Kind) > 0 {
_, ok := gvks[gvk]
if ok {
warnings = append(warnings, fmt.Sprintf("encountered duplicate OpenAPI model %v for GVK %v", modelName, gvk))
}
gvks[gvk] = modelName
um.models[modelName] = model
}
}
}
return um, warnings
}

// groupVersionKindExtensionKey is the key used to lookup the
// GroupVersionKind value for an object definition from the
// definition's "extensions" map.
// Copied verbatim from: https://github.com/kubernetes/apimachinery/blob/eb26334eeb0f769be8f0c5665ff34713cfdec83e/pkg/util/managedfields/gvkparser.go#L29-L32
const groupVersionKindExtensionKey = "x-kubernetes-group-version-kind"

// Get and parse GroupVersionKind from the extension. Returns empty if it doesn't have one.
// Copied verbatim from: https://github.com/kubernetes/apimachinery/blob/eb26334eeb0f769be8f0c5665ff34713cfdec83e/pkg/util/managedfields/gvkparser.go#L82-L128
func parseGroupVersionKind(s proto.Schema) []schema.GroupVersionKind {
extensions := s.GetExtensions()

gvkListResult := []schema.GroupVersionKind{}

// Get the extensions
gvkExtension, ok := extensions[groupVersionKindExtensionKey]
if !ok {
return []schema.GroupVersionKind{}
}

// gvk extension must be a list of at least 1 element.
gvkList, ok := gvkExtension.([]interface{})
if !ok {
return []schema.GroupVersionKind{}
}

for _, gvk := range gvkList {
// gvk extension list must be a map with group, version, and
// kind fields
gvkMap, ok := gvk.(map[interface{}]interface{})
if !ok {
continue
}
group, ok := gvkMap["group"].(string)
if !ok {
continue
}
version, ok := gvkMap["version"].(string)
if !ok {
continue
}
kind, ok := gvkMap["kind"].(string)
if !ok {
continue
}

gvkListResult = append(gvkListResult, schema.GroupVersionKind{
Group: group,
Version: version,
Kind: kind,
})
}

return gvkListResult
}

0 comments on commit f4b2a47

Please sign in to comment.