From 920908f1891d2c905963f78e7ce74e539157def1 Mon Sep 17 00:00:00 2001 From: Mog Liang Date: Tue, 5 Dec 2023 02:32:21 +0000 Subject: [PATCH 01/15] deploy etcd CA cert if k3s use embeded etcd --- bootstrap/api/v1beta1/kthreesconfig_types.go | 6 ++++++ bootstrap/controllers/kthreesconfig_controller.go | 2 +- .../controllers/kthreescontrolplane_controller.go | 2 +- pkg/secret/certificates.go | 11 ++++++++++- 4 files changed, 18 insertions(+), 3 deletions(-) diff --git a/bootstrap/api/v1beta1/kthreesconfig_types.go b/bootstrap/api/v1beta1/kthreesconfig_types.go index 59df6c19..022d9016 100644 --- a/bootstrap/api/v1beta1/kthreesconfig_types.go +++ b/bootstrap/api/v1beta1/kthreesconfig_types.go @@ -51,6 +51,12 @@ type KThreesConfigSpec struct { Version string `json:"version,omitempty"` } +// TODO +// Will need extend this func when implementing other k3s database options. +func (c *KThreesConfigSpec) IsEtcdManaged() bool { + return true +} + type KThreesServerConfig struct { // KubeAPIServerArgs is a customized flag for kube-apiserver process // +optional diff --git a/bootstrap/controllers/kthreesconfig_controller.go b/bootstrap/controllers/kthreesconfig_controller.go index b506af50..808f402b 100644 --- a/bootstrap/controllers/kthreesconfig_controller.go +++ b/bootstrap/controllers/kthreesconfig_controller.go @@ -411,7 +411,7 @@ func (r *KThreesConfigReconciler) handleClusterNotInitialized(ctx context.Contex // injects into config.ClusterConfiguration values from top level object r.reconcileTopLevelObjectSettings(scope.Cluster, machine, scope.Config) - certificates := secret.NewCertificatesForInitialControlPlane() + certificates := secret.NewCertificatesForInitialControlPlane(&scope.Config.Spec) err := certificates.LookupOrGenerate( ctx, r.Client, diff --git a/controlplane/controllers/kthreescontrolplane_controller.go b/controlplane/controllers/kthreescontrolplane_controller.go index a880a425..ac13fe93 100644 --- a/controlplane/controllers/kthreescontrolplane_controller.go +++ b/controlplane/controllers/kthreescontrolplane_controller.go @@ -402,7 +402,7 @@ func (r *KThreesControlPlaneReconciler) reconcile(ctx context.Context, cluster * return reconcile.Result{}, err } - certificates := secret.NewCertificatesForInitialControlPlane() + certificates := secret.NewCertificatesForInitialControlPlane(&kcp.Spec.KThreesConfigSpec) controllerRef := metav1.NewControllerRef(kcp, controlplanev1.GroupVersion.WithKind("KThreesControlPlane")) if err := certificates.LookupOrGenerate(ctx, r.Client, util.ObjectKey(cluster), *controllerRef); err != nil { logger.Error(err, "unable to lookup or create cluster certificates") diff --git a/pkg/secret/certificates.go b/pkg/secret/certificates.go index 91b0aaf3..7deebf39 100644 --- a/pkg/secret/certificates.go +++ b/pkg/secret/certificates.go @@ -65,7 +65,7 @@ var ( type Certificates []*Certificate // NewCertificatesForInitialControlPlane returns a list of certificates configured for a control plane node. -func NewCertificatesForInitialControlPlane() Certificates { +func NewCertificatesForInitialControlPlane(config *bootstrapv1.KThreesConfigSpec) Certificates { certificatesDir := DefaultCertificatesDir certificates := Certificates{ @@ -81,6 +81,15 @@ func NewCertificatesForInitialControlPlane() Certificates { }, } + if config.IsEtcdManaged() { + etcdCert := &Certificate{ + Purpose: EtcdCA, + CertFile: filepath.Join(certificatesDir, "etcd", "server-ca.crt"), + KeyFile: filepath.Join(certificatesDir, "etcd", "server-ca.key"), + } + certificates = append(certificates, etcdCert) + } + return certificates } From 387183d54d0f4ae931a6daded2e0ba634c63f3ac Mon Sep 17 00:00:00 2001 From: Mog Liang Date: Mon, 4 Dec 2023 03:04:02 +0000 Subject: [PATCH 02/15] copy proxy package from kubeadm cp provider --- go.mod | 1 + go.sum | 2 + pkg/proxy/addr.go | 59 +++++++++++++++++ pkg/proxy/conn.go | 87 +++++++++++++++++++++++++ pkg/proxy/dial.go | 158 +++++++++++++++++++++++++++++++++++++++++++++ pkg/proxy/proxy.go | 46 +++++++++++++ 6 files changed, 353 insertions(+) create mode 100644 pkg/proxy/addr.go create mode 100644 pkg/proxy/conn.go create mode 100644 pkg/proxy/dial.go create mode 100644 pkg/proxy/proxy.go diff --git a/go.mod b/go.mod index 8834db04..4f087e3d 100644 --- a/go.mod +++ b/go.mod @@ -46,6 +46,7 @@ require ( github.com/json-iterator/go v1.1.12 // indirect github.com/mailru/easyjson v0.7.7 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.2 // indirect + github.com/moby/spdystream v0.2.0 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect diff --git a/go.sum b/go.sum index 1f3833a0..a194e48a 100644 --- a/go.sum +++ b/go.sum @@ -297,6 +297,8 @@ github.com/mitchellh/mapstructure v0.0.0-20160808181253-ca63d7c062ee/go.mod h1:F github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= github.com/mitchellh/reflectwalk v1.0.2 h1:G2LzWKi524PWgd3mLHV8Y5k7s6XUvT0Gef6zxSIeXaQ= +github.com/moby/spdystream v0.2.0 h1:cjW1zVyyoiM0T7b6UoySUFqzXMoqRckQtXwGPiBhOM8= +github.com/moby/spdystream v0.2.0/go.mod h1:f7i0iNDQJ059oMTcWxx8MA/zKFIuD/lY+0GqbN2Wy8c= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= diff --git a/pkg/proxy/addr.go b/pkg/proxy/addr.go new file mode 100644 index 00000000..5d228e27 --- /dev/null +++ b/pkg/proxy/addr.go @@ -0,0 +1,59 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package proxy implements kubeadm proxy functionality. +package proxy + +import ( + "fmt" + "net" + + corev1 "k8s.io/api/core/v1" + "k8s.io/client-go/tools/portforward" +) + +const scheme string = "proxy" + +// Addr defines a proxy net/addr format. +type Addr struct { + net.Addr + port string + identifier uint32 +} + +// Network returns a fake network. +func (a Addr) Network() string { + return portforward.PortForwardProtocolV1Name +} + +// String returns encoded information about the connection. +func (a Addr) String() string { + return fmt.Sprintf( + "%s://%d.%s.local:%s", + scheme, + a.identifier, + portforward.PortForwardProtocolV1Name, + a.port, + ) +} + +// NewAddrFromConn creates an Addr from the given connection. +func NewAddrFromConn(c *Conn) Addr { + return Addr{ + port: c.stream.Headers().Get(corev1.PortHeader), + identifier: c.stream.Identifier(), + } +} diff --git a/pkg/proxy/conn.go b/pkg/proxy/conn.go new file mode 100644 index 00000000..7fe2596c --- /dev/null +++ b/pkg/proxy/conn.go @@ -0,0 +1,87 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package proxy + +import ( + "net" + "time" + + kerrors "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/apimachinery/pkg/util/httpstream" +) + +// Conn is a Kubernetes API server proxied type of net/conn. +type Conn struct { + connection httpstream.Connection + stream httpstream.Stream + readDeadline time.Time + writeDeadline time.Time +} + +// Read from the connection. +func (c *Conn) Read(b []byte) (n int, err error) { + return c.stream.Read(b) +} + +// Close the underlying proxied connection. +func (c *Conn) Close() error { + return kerrors.NewAggregate([]error{c.stream.Close(), c.connection.Close()}) +} + +// Write to the connection. +func (c *Conn) Write(b []byte) (n int, err error) { + return c.stream.Write(b) +} + +// LocalAddr returns a fake address representing the proxied connection. +func (c *Conn) LocalAddr() net.Addr { + return NewAddrFromConn(c) +} + +// RemoteAddr returns a fake address representing the proxied connection. +func (c *Conn) RemoteAddr() net.Addr { + return NewAddrFromConn(c) +} + +// SetDeadline sets the read and write deadlines to the specified interval. +func (c *Conn) SetDeadline(t time.Time) error { + // TODO: Handle deadlines + c.readDeadline = t + c.writeDeadline = t + return nil +} + +// SetWriteDeadline sets the read and write deadlines to the specified interval. +func (c *Conn) SetWriteDeadline(t time.Time) error { + c.writeDeadline = t + return nil +} + +// SetReadDeadline sets the read and write deadlines to the specified interval. +func (c *Conn) SetReadDeadline(t time.Time) error { + c.readDeadline = t + return nil +} + +// NewConn creates a new net/conn interface based on an underlying Kubernetes +// API server proxy connection. +func NewConn(connection httpstream.Connection, stream httpstream.Stream) *Conn { + return &Conn{ + connection: connection, + stream: stream, + } +} diff --git a/pkg/proxy/dial.go b/pkg/proxy/dial.go new file mode 100644 index 00000000..e4c1024f --- /dev/null +++ b/pkg/proxy/dial.go @@ -0,0 +1,158 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package proxy + +import ( + "context" + "fmt" + "net" + "net/http" + "time" + + "github.com/pkg/errors" + corev1 "k8s.io/api/core/v1" + kerrors "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/portforward" + "k8s.io/client-go/transport/spdy" +) + +const defaultTimeout = 10 * time.Second + +// Dialer creates connections using Kubernetes API Server port-forwarding. +type Dialer struct { + proxy Proxy + clientset *kubernetes.Clientset + proxyTransport http.RoundTripper + upgrader spdy.Upgrader + timeout time.Duration +} + +// NewDialer creates a new dialer for a given API server scope. +func NewDialer(p Proxy, options ...func(*Dialer) error) (*Dialer, error) { + if p.Port == 0 { + return nil, errors.New("port required") + } + + dialer := &Dialer{ + proxy: p, + } + + for _, option := range options { + err := option(dialer) + if err != nil { + return nil, err + } + } + + if dialer.timeout == 0 { + dialer.timeout = defaultTimeout + } + p.KubeConfig.Timeout = dialer.timeout + clientset, err := kubernetes.NewForConfig(p.KubeConfig) + if err != nil { + return nil, err + } + proxyTransport, upgrader, err := spdy.RoundTripperFor(p.KubeConfig) + if err != nil { + return nil, err + } + dialer.proxyTransport = proxyTransport + dialer.upgrader = upgrader + dialer.clientset = clientset + return dialer, nil +} + +// DialContextWithAddr is a GO grpc compliant dialer construct. +func (d *Dialer) DialContextWithAddr(ctx context.Context, addr string) (net.Conn, error) { + return d.DialContext(ctx, scheme, addr) +} + +// DialContext creates proxied port-forwarded connections. +// ctx is currently unused, but fulfils the type signature used by GRPC. +func (d *Dialer) DialContext(_ context.Context, _ string, addr string) (net.Conn, error) { + req := d.clientset.CoreV1().RESTClient(). + Post(). + Resource(d.proxy.Kind). + Namespace(d.proxy.Namespace). + Name(addr). + SubResource("portforward") + + dialer := spdy.NewDialer(d.upgrader, &http.Client{Transport: d.proxyTransport}, "POST", req.URL()) + + // Create a new connection from the dialer. + // + // Warning: Any early return should close this connection, otherwise we're going to leak them. + connection, _, err := dialer.Dial(portforward.PortForwardProtocolV1Name) + if err != nil { + return nil, errors.Wrap(err, "error upgrading connection") + } + + // Create the headers. + headers := http.Header{} + + // Set the header port number to match the proxy one. + headers.Set(corev1.PortHeader, fmt.Sprintf("%d", d.proxy.Port)) + + // We only create a single stream over the connection + headers.Set(corev1.PortForwardRequestIDHeader, "0") + + // Create the error stream. + headers.Set(corev1.StreamType, corev1.StreamTypeError) + errorStream, err := connection.CreateStream(headers) + if err != nil { + return nil, kerrors.NewAggregate([]error{ + err, + connection.Close(), + }) + } + // Close the error stream right away, we're not writing to it. + if err := errorStream.Close(); err != nil { + return nil, kerrors.NewAggregate([]error{ + err, + connection.Close(), + }) + } + + // Create the data stream. + // + // NOTE: Given that we're reusing the headers, + // we need to overwrite the stream type before creating it. + headers.Set(corev1.StreamType, corev1.StreamTypeData) + dataStream, err := connection.CreateStream(headers) + if err != nil { + return nil, kerrors.NewAggregate([]error{ + errors.Wrap(err, "error creating forwarding stream"), + connection.Close(), + }) + } + + // Create the net.Conn and return. + return NewConn(connection, dataStream), nil +} + +// DialTimeout sets the timeout. +func DialTimeout(duration time.Duration) func(*Dialer) error { + return func(d *Dialer) error { + return d.setTimeout(duration) + } +} + +func (d *Dialer) setTimeout(duration time.Duration) error { + d.timeout = duration + return nil +} diff --git a/pkg/proxy/proxy.go b/pkg/proxy/proxy.go new file mode 100644 index 00000000..381f7780 --- /dev/null +++ b/pkg/proxy/proxy.go @@ -0,0 +1,46 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package proxy + +import ( + "time" + + "k8s.io/client-go/rest" +) + +// Proxy defines the API server port-forwarded proxy. +type Proxy struct { + + // Kind is the kind of Kubernetes resource + Kind string + + // Namespace is the namespace in which the Kubernetes resource exists + Namespace string + + // ResourceName is the name of the Kubernetes resource + ResourceName string + + // KubeConfig is the config to connect to the API server + KubeConfig *rest.Config + + // KeepAlive specifies how often a keep alive message is sent to hold + // the connection open + KeepAlive *time.Duration + + // Port is the port to be forwarded from the relevant resource + Port int +} From 45dddcc03fe52d1963131c58f103297ae5bc3277 Mon Sep 17 00:00:00 2001 From: Mog Liang Date: Mon, 4 Dec 2023 05:01:43 +0000 Subject: [PATCH 03/15] copy proxy & etcd packages from kubeadm cp provider --- .golangci.yml | 1 + controlplane/etcd/doc.go | 20 ++ controlplane/etcd/etcd.go | 288 +++++++++++++++++++++++++++ controlplane/etcd/etcd_test.go | 106 ++++++++++ controlplane/etcd/fake/client.go | 68 +++++++ controlplane/etcd/util/util.go | 53 +++++ {pkg => controlplane}/proxy/addr.go | 0 {pkg => controlplane}/proxy/conn.go | 0 {pkg => controlplane}/proxy/dial.go | 0 {pkg => controlplane}/proxy/proxy.go | 0 go.mod | 7 + go.sum | 30 +++ 12 files changed, 573 insertions(+) create mode 100644 controlplane/etcd/doc.go create mode 100644 controlplane/etcd/etcd.go create mode 100644 controlplane/etcd/etcd_test.go create mode 100644 controlplane/etcd/fake/client.go create mode 100644 controlplane/etcd/util/util.go rename {pkg => controlplane}/proxy/addr.go (100%) rename {pkg => controlplane}/proxy/conn.go (100%) rename {pkg => controlplane}/proxy/dial.go (100%) rename {pkg => controlplane}/proxy/proxy.go (100%) diff --git a/.golangci.yml b/.golangci.yml index 9b75d52d..e16bcda0 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -88,6 +88,7 @@ linters-settings: - github.com/cluster-api-provider-k3s/cluster-api-k3s - github.com/google/uuid + - github.com/pkg/errors gci: sections: - standard diff --git a/controlplane/etcd/doc.go b/controlplane/etcd/doc.go new file mode 100644 index 00000000..4f91c689 --- /dev/null +++ b/controlplane/etcd/doc.go @@ -0,0 +1,20 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +/* +Package etcd provides a connection to an etcd member. +*/ +package etcd diff --git a/controlplane/etcd/etcd.go b/controlplane/etcd/etcd.go new file mode 100644 index 00000000..279d9fd7 --- /dev/null +++ b/controlplane/etcd/etcd.go @@ -0,0 +1,288 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package etcd + +import ( + "context" + "crypto/tls" + "net" + "time" + + "github.com/pkg/errors" + "go.etcd.io/etcd/api/v3/etcdserverpb" + clientv3 "go.etcd.io/etcd/client/v3" + "google.golang.org/grpc" + kerrors "k8s.io/apimachinery/pkg/util/errors" + + "github.com/cluster-api-provider-k3s/cluster-api-k3s/controlplane/proxy" +) + +// GRPCDial is a function that creates a connection to a given endpoint. +type GRPCDial func(ctx context.Context, addr string) (net.Conn, error) + +// etcd wraps the etcd client from etcd's clientv3 package. +// This interface is implemented by both the clientv3 package and the backoff adapter that adds retries to the client. +type etcd interface { + AlarmList(ctx context.Context) (*clientv3.AlarmResponse, error) + Close() error + Endpoints() []string + MemberList(ctx context.Context) (*clientv3.MemberListResponse, error) + MemberRemove(ctx context.Context, id uint64) (*clientv3.MemberRemoveResponse, error) + MemberUpdate(ctx context.Context, id uint64, peerURLs []string) (*clientv3.MemberUpdateResponse, error) + MoveLeader(ctx context.Context, id uint64) (*clientv3.MoveLeaderResponse, error) + Status(ctx context.Context, endpoint string) (*clientv3.StatusResponse, error) +} + +// Client wraps an etcd client formatting its output to something more consumable. +type Client struct { + EtcdClient etcd + Endpoint string + LeaderID uint64 + Errors []string + CallTimeout time.Duration +} + +// MemberAlarm represents an alarm type association with a cluster member. +type MemberAlarm struct { + // MemberID is the ID of the member associated with the raised alarm. + MemberID uint64 + + // Type is the type of alarm which has been raised. + Type AlarmType +} + +// AlarmType defines the type of alarm for etcd. +type AlarmType int32 + +const ( + // AlarmOK denotes that the cluster member is OK. + AlarmOK AlarmType = iota + + // AlarmNoSpace denotes that the cluster member has run out of disk space. + AlarmNoSpace + + // AlarmCorrupt denotes that the cluster member has corrupted data. + AlarmCorrupt +) + +// DefaultCallTimeout represents the duration that the etcd client waits at most +// for read and write operations to etcd. +const DefaultCallTimeout = 15 * time.Second + +// AlarmTypeName provides a text translation for AlarmType codes. +var AlarmTypeName = map[AlarmType]string{ + AlarmOK: "NONE", + AlarmNoSpace: "NOSPACE", + AlarmCorrupt: "CORRUPT", +} + +// Adapted from kubeadm. + +// Member struct defines an etcd member; it is used to avoid spreading +// github.com/coreos/etcd dependencies. +type Member struct { + // ClusterID is the ID of the cluster to which this member belongs + ClusterID uint64 + + // ID is the ID of this cluster member + ID uint64 + + // Name is the human-readable name of the member. If the member is not started, the name will be an empty string. + Name string + + // PeerURLs is the list of URLs the member exposes to the cluster for communication. + PeerURLs []string + + // ClientURLs is the list of URLs the member exposes to clients for communication. If the member is not started, clientURLs will be empty. + ClientURLs []string + + // IsLearner indicates if the member is raft learner. + IsLearner bool + + // Alarms is the list of alarms for a member. + Alarms []AlarmType +} + +// pbMemberToMember converts the protobuf representation of a cluster member to a Member struct. +func pbMemberToMember(m *etcdserverpb.Member) *Member { + return &Member{ + ID: m.GetID(), + Name: m.GetName(), + PeerURLs: m.GetPeerURLs(), + ClientURLs: m.GetClientURLs(), + IsLearner: m.GetIsLearner(), + Alarms: []AlarmType{}, + } +} + +// ClientConfiguration describes the configuration for an etcd client. +type ClientConfiguration struct { + Endpoint string + Proxy proxy.Proxy + TLSConfig *tls.Config + DialTimeout time.Duration + CallTimeout time.Duration +} + +// NewClient creates a new etcd client with the given configuration. +func NewClient(ctx context.Context, config ClientConfiguration) (*Client, error) { + dialer, err := proxy.NewDialer(config.Proxy) + if err != nil { + return nil, errors.Wrap(err, "unable to create a dialer for etcd client") + } + + etcdClient, err := clientv3.New(clientv3.Config{ + Endpoints: []string{config.Endpoint}, // NOTE: endpoint is used only as a host for certificate validation, the network connection is defined by DialOptions. + DialTimeout: config.DialTimeout, + DialOptions: []grpc.DialOption{ + grpc.WithBlock(), // block until the underlying connection is up + grpc.WithContextDialer(dialer.DialContextWithAddr), + }, + TLS: config.TLSConfig, + }) + if err != nil { + return nil, errors.Wrap(err, "unable to create etcd client") + } + + callTimeout := config.CallTimeout + if callTimeout == 0 { + callTimeout = DefaultCallTimeout + } + + client, err := newEtcdClient(ctx, etcdClient, callTimeout) + if err != nil { + closeErr := etcdClient.Close() + return nil, errors.Wrap(kerrors.NewAggregate([]error{err, closeErr}), "unable to create etcd client") + } + return client, nil +} + +func newEtcdClient(ctx context.Context, etcdClient etcd, callTimeout time.Duration) (*Client, error) { + endpoints := etcdClient.Endpoints() + if len(endpoints) == 0 { + return nil, errors.New("etcd client was not configured with any endpoints") + } + + ctx, cancel := context.WithTimeout(ctx, callTimeout) + defer cancel() + + status, err := etcdClient.Status(ctx, endpoints[0]) + if err != nil { + return nil, errors.Wrap(err, "failed to get etcd status") + } + + return &Client{ + Endpoint: endpoints[0], + EtcdClient: etcdClient, + LeaderID: status.Leader, + Errors: status.Errors, + CallTimeout: callTimeout, + }, nil +} + +// Close closes the etcd client. +func (c *Client) Close() error { + return c.EtcdClient.Close() +} + +// Members retrieves a list of etcd members. +func (c *Client) Members(ctx context.Context) ([]*Member, error) { + ctx, cancel := context.WithTimeout(ctx, c.CallTimeout) + defer cancel() + + response, err := c.EtcdClient.MemberList(ctx) + if err != nil { + return nil, errors.Wrap(err, "failed to get list of members for etcd cluster") + } + + alarms, err := c.Alarms(ctx) + if err != nil { + return nil, err + } + + clusterID := response.Header.GetClusterId() + members := make([]*Member, 0) + for _, m := range response.Members { + newMember := pbMemberToMember(m) + newMember.ClusterID = clusterID + for _, c := range alarms { + if c.MemberID == newMember.ID { + newMember.Alarms = append(newMember.Alarms, c.Type) + } + } + members = append(members, newMember) + } + + return members, nil +} + +// MoveLeader moves the leader to the provided member ID. +func (c *Client) MoveLeader(ctx context.Context, newLeaderID uint64) error { + ctx, cancel := context.WithTimeout(ctx, c.CallTimeout) + defer cancel() + + _, err := c.EtcdClient.MoveLeader(ctx, newLeaderID) + return errors.Wrapf(err, "failed to move etcd leader: %v", newLeaderID) +} + +// RemoveMember removes a given member. +func (c *Client) RemoveMember(ctx context.Context, id uint64) error { + ctx, cancel := context.WithTimeout(ctx, c.CallTimeout) + defer cancel() + + _, err := c.EtcdClient.MemberRemove(ctx, id) + return errors.Wrapf(err, "failed to remove member: %v", id) +} + +// UpdateMemberPeerURLs updates the list of peer URLs. +func (c *Client) UpdateMemberPeerURLs(ctx context.Context, id uint64, peerURLs []string) ([]*Member, error) { + ctx, cancel := context.WithTimeout(ctx, c.CallTimeout) + defer cancel() + + response, err := c.EtcdClient.MemberUpdate(ctx, id, peerURLs) + if err != nil { + return nil, errors.Wrapf(err, "failed to update etcd member %v's peer list to %+v", id, peerURLs) + } + + members := make([]*Member, 0, len(response.Members)) + for _, m := range response.Members { + members = append(members, pbMemberToMember(m)) + } + + return members, nil +} + +// Alarms retrieves all alarms on a cluster. +func (c *Client) Alarms(ctx context.Context) ([]MemberAlarm, error) { + ctx, cancel := context.WithTimeout(ctx, c.CallTimeout) + defer cancel() + + alarmResponse, err := c.EtcdClient.AlarmList(ctx) + if err != nil { + return nil, errors.Wrap(err, "failed to get alarms for etcd cluster") + } + + memberAlarms := make([]MemberAlarm, 0, len(alarmResponse.Alarms)) + for _, a := range alarmResponse.Alarms { + memberAlarms = append(memberAlarms, MemberAlarm{ + MemberID: a.GetMemberID(), + Type: AlarmType(a.GetAlarm()), + }) + } + + return memberAlarms, nil +} diff --git a/controlplane/etcd/etcd_test.go b/controlplane/etcd/etcd_test.go new file mode 100644 index 00000000..6f96ac21 --- /dev/null +++ b/controlplane/etcd/etcd_test.go @@ -0,0 +1,106 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package etcd + +import ( + "testing" + + . "github.com/onsi/gomega" + "github.com/pkg/errors" + "go.etcd.io/etcd/api/v3/etcdserverpb" + clientv3 "go.etcd.io/etcd/client/v3" + ctrl "sigs.k8s.io/controller-runtime" + + etcdfake "github.com/cluster-api-provider-k3s/cluster-api-k3s/controlplane/etcd/fake" +) + +var ( + ctx = ctrl.SetupSignalHandler() +) + +func TestEtcdMembers_WithErrors(t *testing.T) { + g := NewWithT(t) + + fakeEtcdClient := &etcdfake.FakeEtcdClient{ + EtcdEndpoints: []string{"https://etcd-instance:2379"}, + MemberListResponse: &clientv3.MemberListResponse{ + Header: &etcdserverpb.ResponseHeader{}, + Members: []*etcdserverpb.Member{ + {ID: 1234, Name: "foo", PeerURLs: []string{"https://1.2.3.4:2000"}}, + }, + }, + MoveLeaderResponse: &clientv3.MoveLeaderResponse{}, + MemberRemoveResponse: &clientv3.MemberRemoveResponse{}, + StatusResponse: &clientv3.StatusResponse{}, + ErrorResponse: errors.New("something went wrong"), + } + + client, err := newEtcdClient(ctx, fakeEtcdClient, DefaultCallTimeout) + g.Expect(err).ToNot(HaveOccurred()) + + members, err := client.Members(ctx) + g.Expect(err).To(HaveOccurred()) + g.Expect(members).To(BeEmpty()) + + err = client.MoveLeader(ctx, 1) + g.Expect(err).To(HaveOccurred()) + + err = client.RemoveMember(ctx, 1234) + g.Expect(err).To(HaveOccurred()) +} + +func TestEtcdMembers_WithSuccess(t *testing.T) { + g := NewWithT(t) + + fakeEtcdClient := &etcdfake.FakeEtcdClient{ + EtcdEndpoints: []string{"https://etcd-instance:2379"}, + MemberListResponse: &clientv3.MemberListResponse{ + Header: &etcdserverpb.ResponseHeader{}, + Members: []*etcdserverpb.Member{ + {ID: 1234, Name: "foo", PeerURLs: []string{"https://1.2.3.4:2000"}}, + }, + }, + MoveLeaderResponse: &clientv3.MoveLeaderResponse{}, + MemberUpdateResponse: &clientv3.MemberUpdateResponse{ + Header: &etcdserverpb.ResponseHeader{}, + Members: []*etcdserverpb.Member{ + {ID: 1234, Name: "foo", PeerURLs: []string{"https://1.2.3.4:2000", "https://4.5.6.7:2000"}}, + }, + }, + MemberRemoveResponse: &clientv3.MemberRemoveResponse{}, + AlarmResponse: &clientv3.AlarmResponse{}, + StatusResponse: &clientv3.StatusResponse{}, + } + + client, err := newEtcdClient(ctx, fakeEtcdClient, DefaultCallTimeout) + g.Expect(err).ToNot(HaveOccurred()) + + members, err := client.Members(ctx) + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(members).To(HaveLen(1)) + + err = client.MoveLeader(ctx, 1) + g.Expect(err).ToNot(HaveOccurred()) + + err = client.RemoveMember(ctx, 1234) + g.Expect(err).ToNot(HaveOccurred()) + + updatedMembers, err := client.UpdateMemberPeerURLs(ctx, 1234, []string{"https://4.5.6.7:2000"}) + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(updatedMembers[0].PeerURLs).To(HaveLen(2)) + g.Expect(updatedMembers[0].PeerURLs).To(Equal([]string{"https://1.2.3.4:2000", "https://4.5.6.7:2000"})) +} diff --git a/controlplane/etcd/fake/client.go b/controlplane/etcd/fake/client.go new file mode 100644 index 00000000..5bc83ed4 --- /dev/null +++ b/controlplane/etcd/fake/client.go @@ -0,0 +1,68 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package fake implements testing fakes. +package fake + +import ( + "context" + + clientv3 "go.etcd.io/etcd/client/v3" +) + +type FakeEtcdClient struct { + AlarmResponse *clientv3.AlarmResponse + EtcdEndpoints []string + MemberListResponse *clientv3.MemberListResponse + MemberRemoveResponse *clientv3.MemberRemoveResponse + MemberUpdateResponse *clientv3.MemberUpdateResponse + MoveLeaderResponse *clientv3.MoveLeaderResponse + StatusResponse *clientv3.StatusResponse + ErrorResponse error + MovedLeader uint64 + RemovedMember uint64 +} + +func (c *FakeEtcdClient) Endpoints() []string { + return c.EtcdEndpoints +} + +func (c *FakeEtcdClient) MoveLeader(_ context.Context, i uint64) (*clientv3.MoveLeaderResponse, error) { + c.MovedLeader = i + return c.MoveLeaderResponse, c.ErrorResponse +} + +func (c *FakeEtcdClient) Close() error { + return nil +} + +func (c *FakeEtcdClient) AlarmList(_ context.Context) (*clientv3.AlarmResponse, error) { + return c.AlarmResponse, c.ErrorResponse +} + +func (c *FakeEtcdClient) MemberList(_ context.Context) (*clientv3.MemberListResponse, error) { + return c.MemberListResponse, c.ErrorResponse +} +func (c *FakeEtcdClient) MemberRemove(_ context.Context, i uint64) (*clientv3.MemberRemoveResponse, error) { + c.RemovedMember = i + return c.MemberRemoveResponse, c.ErrorResponse +} +func (c *FakeEtcdClient) MemberUpdate(_ context.Context, _ uint64, _ []string) (*clientv3.MemberUpdateResponse, error) { + return c.MemberUpdateResponse, c.ErrorResponse +} +func (c *FakeEtcdClient) Status(_ context.Context, _ string) (*clientv3.StatusResponse, error) { + return c.StatusResponse, nil +} diff --git a/controlplane/etcd/util/util.go b/controlplane/etcd/util/util.go new file mode 100644 index 00000000..326fe18a --- /dev/null +++ b/controlplane/etcd/util/util.go @@ -0,0 +1,53 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package util implements etcd utility functions. +package util + +import ( + "k8s.io/apimachinery/pkg/util/sets" + + "github.com/cluster-api-provider-k3s/cluster-api-k3s/controlplane/etcd" +) + +// MemberForName returns the etcd member with the matching name. +func MemberForName(members []*etcd.Member, name string) *etcd.Member { + for _, m := range members { + if m.Name == name { + return m + } + } + return nil +} + +// MemberNames returns a list of all the etcd member names. +func MemberNames(members []*etcd.Member) []string { + names := make([]string, 0, len(members)) + for _, m := range members { + names = append(names, m.Name) + } + return names +} + +// MemberEqual returns true if the lists of members match. +// +// This function only checks that set of names of each member +// within the lists is the same. +func MemberEqual(members1, members2 []*etcd.Member) bool { + names1 := sets.Set[string]{}.Insert(MemberNames(members1)...) + names2 := sets.Set[string]{}.Insert(MemberNames(members2)...) + return names1.Equal(names2) +} diff --git a/pkg/proxy/addr.go b/controlplane/proxy/addr.go similarity index 100% rename from pkg/proxy/addr.go rename to controlplane/proxy/addr.go diff --git a/pkg/proxy/conn.go b/controlplane/proxy/conn.go similarity index 100% rename from pkg/proxy/conn.go rename to controlplane/proxy/conn.go diff --git a/pkg/proxy/dial.go b/controlplane/proxy/dial.go similarity index 100% rename from pkg/proxy/dial.go rename to controlplane/proxy/dial.go diff --git a/pkg/proxy/proxy.go b/controlplane/proxy/proxy.go similarity index 100% rename from pkg/proxy/proxy.go rename to controlplane/proxy/proxy.go diff --git a/go.mod b/go.mod index 4f087e3d..cc7ff54c 100644 --- a/go.mod +++ b/go.mod @@ -24,6 +24,8 @@ require ( github.com/blang/semver/v4 v4.0.0 // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/coredns/caddy v1.1.0 // indirect + github.com/coreos/go-semver v0.3.0 // indirect + github.com/coreos/go-systemd/v22 v22.3.2 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/emicklei/go-restful/v3 v3.9.0 // indirect github.com/evanphx/json-patch v5.6.0+incompatible // indirect @@ -57,6 +59,9 @@ require ( github.com/prometheus/common v0.37.0 // indirect github.com/prometheus/procfs v0.8.0 // indirect github.com/spf13/pflag v1.0.5 // indirect + go.etcd.io/etcd/api/v3 v3.5.6 // indirect + go.etcd.io/etcd/client/pkg/v3 v3.5.6 // indirect + go.etcd.io/etcd/client/v3 v3.5.6 // indirect go.uber.org/atomic v1.9.0 // indirect go.uber.org/multierr v1.8.0 // indirect go.uber.org/zap v1.24.0 // indirect @@ -68,6 +73,8 @@ require ( golang.org/x/time v0.3.0 // indirect gomodules.xyz/jsonpatch/v2 v2.2.0 // indirect google.golang.org/appengine v1.6.7 // indirect + google.golang.org/genproto v0.0.0-20221227171554-f9683d7f8bef // indirect + google.golang.org/grpc v1.52.0 // indirect google.golang.org/protobuf v1.28.1 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect diff --git a/go.sum b/go.sum index a194e48a..eea44f2f 100644 --- a/go.sum +++ b/go.sum @@ -75,14 +75,19 @@ github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDk github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= github.com/cncf/xds/go v0.0.0-20210312221358-fbca930ec8ed/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= +github.com/cncf/xds/go v0.0.0-20210805033703-aa0b78936158/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/coredns/caddy v1.1.0 h1:ezvsPrT/tA/7pYDBZxu0cT0VmWk75AfIaf6GSYCNMf0= github.com/coredns/caddy v1.1.0/go.mod h1:A6ntJQlAWuQfFlsd9hvigKbo2WS0VUs2l1e2F+BawD4= github.com/coredns/corefile-migration v1.0.20 h1:MdOkT6F3ehju/n9tgxlGct8XAajOX2vN+wG7To4BWSI= github.com/coredns/corefile-migration v1.0.20/go.mod h1:XnhgULOEouimnzgn0t4WPuFDN2/PJQcTxdWKC5eXNGE= github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk= github.com/coreos/etcd v3.3.13+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= +github.com/coreos/go-semver v0.3.0 h1:wkHLiw0WNATZnSG7epLsujiMCgPAc9xhjJ4tgnAxmfM= github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= +github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf h1:iW4rZ826su+pqaw19uhpSCzhj44qo35pNgKFGqzDKkU= +github.com/coreos/go-systemd/v22 v22.3.2 h1:D9/bQk5vlXQFZ6Kwuu6zaiXJ9oTPe68++AzAJc1DzSI= +github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA= github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -92,13 +97,16 @@ github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZm github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no= github.com/docker/distribution v2.8.2+incompatible h1:T3de5rq0dB1j30rp0sA2rER+m322EBzniBPB6ZIzuh8= github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE= +github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/emicklei/go-restful/v3 v3.9.0 h1:XwGDlfxEnQZzuopoqxwSEllNcCOM9DhhFyhFIIGKwxE= github.com/emicklei/go-restful/v3 v3.9.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= +github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= github.com/envoyproxy/go-control-plane v0.9.9-0.20210512163311-63b5d3c536b0/go.mod h1:hliV/p42l8fGbc6Y9bQ70uLwIvmJyVE5k4iMKlh8wCQ= +github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021/go.mod h1:AFq3mo9L8Lqqiid3OhADV3RfLJnjiw63cSpi+fDTRC0= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/evanphx/json-patch v0.5.2/go.mod h1:ZWS5hhDbVDyob71nXKNL0+PWn6ToqBHMikGIFbs31qQ= github.com/evanphx/json-patch v5.6.0+incompatible h1:jBYDEEiFBPxA0v50tFdvOzQQTCvpL6mnFh5mB2/l16U= @@ -143,6 +151,7 @@ github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg78 github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEevZMzYi5KSi8KkcZtzBcTgAUUtapy0OI= github.com/gobuffalo/flect v1.0.2 h1:eqjPGSo2WmjgY2XlpGwo2NXgL3RucAKo4k4qQMNA5sA= github.com/gobuffalo/flect v1.0.2/go.mod h1:A5msMlrHtLqh9umBSnvabjsMrCcCpAyzglnDvkbYKHs= +github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= @@ -338,6 +347,7 @@ github.com/prometheus/client_golang v0.9.3/go.mod h1:/TN21ttK/J9q6uSwhBd54HahCDf github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo= github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M= github.com/prometheus/client_golang v1.11.0/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0= +github.com/prometheus/client_golang v1.11.1/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0= github.com/prometheus/client_golang v1.12.1/go.mod h1:3Z9XVyYiZYEO+YQWt3RD2R3jrbd179Rt297l4aS6nDY= github.com/prometheus/client_golang v1.14.0 h1:nJdhIvne2eSX/XRAFV9PcvFFRbrjbcTUj0VP62TMhnw= github.com/prometheus/client_golang v1.14.0/go.mod h1:8vpkKitgIVNcqrRBWh1C4TIUQgYNtG/XQE4E/Zae36Y= @@ -413,7 +423,14 @@ github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= +go.etcd.io/etcd/api/v3 v3.5.6 h1:Cy2qx3npLcYqTKqGJzMypnMv2tiRyifZJ17BlWIWA7A= +go.etcd.io/etcd/api/v3 v3.5.6/go.mod h1:KFtNaxGDw4Yx/BA4iPPwevUTAuqcsPxzyX8PHydchN8= +go.etcd.io/etcd/client/pkg/v3 v3.5.6 h1:TXQWYceBKqLp4sa87rcPs11SXxUA/mHwH975v+BDvLU= +go.etcd.io/etcd/client/pkg/v3 v3.5.6/go.mod h1:ggrwbk069qxpKPq8/FKkQ3Xq9y39kbFR4LnKszpRXeQ= +go.etcd.io/etcd/client/v3 v3.5.6 h1:coLs69PWCXE9G4FKquzNaSHrRyMCAXwF+IX1tAPVO8E= +go.etcd.io/etcd/client/v3 v3.5.6/go.mod h1:f6GRinRMCsFVv9Ht42EyY7nfsVGwrNO0WEoS2pRKzQk= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8= go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= @@ -431,6 +448,7 @@ go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9i go.uber.org/multierr v1.8.0 h1:dg6GjLku4EH+249NNmoIciG9N/jURbDG+pFlTkhzIC8= go.uber.org/multierr v1.8.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak= go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= +go.uber.org/zap v1.17.0/go.mod h1:MXVU+bhUf/A7Xi2HNOnopQOrmycQ5Ih87HtOu4q5SSo= go.uber.org/zap v1.19.0/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI= go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60= go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg= @@ -464,6 +482,7 @@ golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHl golang.org/x/lint v0.0.0-20191125180803-fdd1cda4f05f/go.mod h1:5qLYkcX4OjUUV8bRuDixDT3tpyyb+LUpUlRWLxfhWrs= golang.org/x/lint v0.0.0-20200130185559-910be7a94367/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY= golang.org/x/lint v0.0.0-20200302205851-738671d3881b/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY= +golang.org/x/lint v0.0.0-20210508222113-6edffad5e616/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY= golang.org/x/mobile v0.0.0-20190312151609-d3739f865fa6/go.mod h1:z+o9i4GpDbdi3rU15maQ/Ox0txvL9dWGYEHz965HBQE= golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028/go.mod h1:E/iHnbuqvinMTCcRqshq8CkpyQDoeVncDDYHnLhea+o= golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc= @@ -472,6 +491,7 @@ golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzB golang.org/x/mod v0.1.1-0.20191107180719-034126e5016b/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -532,6 +552,7 @@ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -575,6 +596,7 @@ golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210112080510-489259a85091/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210403161142-5e06dd20ab57/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -650,6 +672,7 @@ golang.org/x/tools v0.0.0-20200804011535-6c149bb5ef0d/go.mod h1:njjCfa9FT2d7l9Bc golang.org/x/tools v0.0.0-20200825202427-b303f430e36d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/tools v0.1.2/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.7.0 h1:W4OVu8VVOaIO0yzWMNdepAulS7YfoS3Zabrm8DOXXU4= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -711,8 +734,10 @@ google.golang.org/genproto v0.0.0-20200618031413-b414f8b61790/go.mod h1:jDfRM7Fc google.golang.org/genproto v0.0.0-20200729003335-053ba62fc06f/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20200804131852-c06518451d9c/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20200825200019-8632dd797987/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= +google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c/go.mod h1:UODoCrxHCcBojKKwX1terBiRUaqAsFqJiF615XL43r0= google.golang.org/genproto v0.0.0-20220107163113-42d7afdf6368/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc= google.golang.org/genproto v0.0.0-20221227171554-f9683d7f8bef h1:uQ2vjV/sHTsWSqdKeLqmwitzgvjMl7o4IdtHwUDXSJY= +google.golang.org/genproto v0.0.0-20221227171554-f9683d7f8bef/go.mod h1:RGgjbofJ8xD9Sq1VVhDM1Vok1vRONV+rg+CjzG4SZKM= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM= @@ -727,7 +752,11 @@ google.golang.org/grpc v1.30.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM google.golang.org/grpc v1.31.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak= google.golang.org/grpc v1.33.1/go.mod h1:fr5YgcSWrqhRRxogOsw7RzIpsmvOZ6IcH4kBYTpR3n0= google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= +google.golang.org/grpc v1.38.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM= google.golang.org/grpc v1.40.0/go.mod h1:ogyxbiOoUXAkP+4+xa6PZSE9DZgIHtSpzjDTB9KAK34= +google.golang.org/grpc v1.41.0/go.mod h1:U3l9uK9J0sini8mHphKoXyaqDA/8VyGnDee1zzIUK6k= +google.golang.org/grpc v1.52.0 h1:kd48UiU7EHsV4rnLyOJRuP/Il/UHE7gdDAQ+SZI7nZk= +google.golang.org/grpc v1.52.0/go.mod h1:pu6fVzoFb+NBYNAvQL08ic+lvB2IojljRYuun5vorUY= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= @@ -810,5 +839,6 @@ sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2 h1:iXTIw73aPyC+oRdyqqvVJuloN sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2/go.mod h1:B8JuhiUyNFVKdsE8h686QcCxMaH6HrOAZj4vswFpcB0= sigs.k8s.io/structured-merge-diff/v4 v4.2.3 h1:PRbqxJClWWYMNV1dhaG4NsibJbArud9kFxnAMREiWFE= sigs.k8s.io/structured-merge-diff/v4 v4.2.3/go.mod h1:qjx8mGObPmV2aSZepjQjbmb2ihdVs8cGKBraizNC69E= +sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc= sigs.k8s.io/yaml v1.3.0 h1:a2VclLzOGrwOHDiV8EfBGhvjHvP46CtW5j6POvhYGGo= sigs.k8s.io/yaml v1.3.0/go.mod h1:GeOyir5tyXNByN85N/dRIT9es5UQNerPYEKK56eTBm8= From 54654fe3d705021e4101c6595e44450321202d89 Mon Sep 17 00:00:00 2001 From: Mog Liang Date: Tue, 5 Dec 2023 09:08:31 +0000 Subject: [PATCH 04/15] add etcd_client_generator.go --- {controlplane => pkg}/etcd/doc.go | 0 {controlplane => pkg}/etcd/etcd.go | 2 +- {controlplane => pkg}/etcd/etcd_test.go | 2 +- {controlplane => pkg}/etcd/fake/client.go | 0 {controlplane => pkg}/etcd/util/util.go | 2 +- pkg/k3s/etcd_client_generator.go | 214 ++++++++++++++++++++++ {controlplane => pkg}/proxy/addr.go | 0 {controlplane => pkg}/proxy/conn.go | 0 {controlplane => pkg}/proxy/dial.go | 0 {controlplane => pkg}/proxy/proxy.go | 0 10 files changed, 217 insertions(+), 3 deletions(-) rename {controlplane => pkg}/etcd/doc.go (100%) rename {controlplane => pkg}/etcd/etcd.go (99%) rename {controlplane => pkg}/etcd/etcd_test.go (97%) rename {controlplane => pkg}/etcd/fake/client.go (100%) rename {controlplane => pkg}/etcd/util/util.go (95%) create mode 100644 pkg/k3s/etcd_client_generator.go rename {controlplane => pkg}/proxy/addr.go (100%) rename {controlplane => pkg}/proxy/conn.go (100%) rename {controlplane => pkg}/proxy/dial.go (100%) rename {controlplane => pkg}/proxy/proxy.go (100%) diff --git a/controlplane/etcd/doc.go b/pkg/etcd/doc.go similarity index 100% rename from controlplane/etcd/doc.go rename to pkg/etcd/doc.go diff --git a/controlplane/etcd/etcd.go b/pkg/etcd/etcd.go similarity index 99% rename from controlplane/etcd/etcd.go rename to pkg/etcd/etcd.go index 279d9fd7..2ab2cb90 100644 --- a/controlplane/etcd/etcd.go +++ b/pkg/etcd/etcd.go @@ -28,7 +28,7 @@ import ( "google.golang.org/grpc" kerrors "k8s.io/apimachinery/pkg/util/errors" - "github.com/cluster-api-provider-k3s/cluster-api-k3s/controlplane/proxy" + "github.com/cluster-api-provider-k3s/cluster-api-k3s/pkg/proxy" ) // GRPCDial is a function that creates a connection to a given endpoint. diff --git a/controlplane/etcd/etcd_test.go b/pkg/etcd/etcd_test.go similarity index 97% rename from controlplane/etcd/etcd_test.go rename to pkg/etcd/etcd_test.go index 6f96ac21..deb07d9a 100644 --- a/controlplane/etcd/etcd_test.go +++ b/pkg/etcd/etcd_test.go @@ -25,7 +25,7 @@ import ( clientv3 "go.etcd.io/etcd/client/v3" ctrl "sigs.k8s.io/controller-runtime" - etcdfake "github.com/cluster-api-provider-k3s/cluster-api-k3s/controlplane/etcd/fake" + etcdfake "github.com/cluster-api-provider-k3s/cluster-api-k3s/pkg/etcd/fake" ) var ( diff --git a/controlplane/etcd/fake/client.go b/pkg/etcd/fake/client.go similarity index 100% rename from controlplane/etcd/fake/client.go rename to pkg/etcd/fake/client.go diff --git a/controlplane/etcd/util/util.go b/pkg/etcd/util/util.go similarity index 95% rename from controlplane/etcd/util/util.go rename to pkg/etcd/util/util.go index 326fe18a..3ac2f80e 100644 --- a/controlplane/etcd/util/util.go +++ b/pkg/etcd/util/util.go @@ -20,7 +20,7 @@ package util import ( "k8s.io/apimachinery/pkg/util/sets" - "github.com/cluster-api-provider-k3s/cluster-api-k3s/controlplane/etcd" + "github.com/cluster-api-provider-k3s/cluster-api-k3s/pkg/etcd" ) // MemberForName returns the etcd member with the matching name. diff --git a/pkg/k3s/etcd_client_generator.go b/pkg/k3s/etcd_client_generator.go new file mode 100644 index 00000000..17e12f65 --- /dev/null +++ b/pkg/k3s/etcd_client_generator.go @@ -0,0 +1,214 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package k3s + +import ( + "context" + "crypto/tls" + "fmt" + "time" + + "github.com/pkg/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + kerrors "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + + "github.com/cluster-api-provider-k3s/cluster-api-k3s/pkg/etcd" + "github.com/cluster-api-provider-k3s/cluster-api-k3s/pkg/proxy" +) + +// EtcdClientGenerator generates etcd clients that connect to specific etcd members on particular control plane nodes. +type EtcdClientGenerator struct { + restConfig *rest.Config + tlsConfig *tls.Config + createClient clientCreator + etcdPodMap *map[string]string +} + +type clientCreator func(ctx context.Context, endpoint string) (*etcd.Client, error) + +var errEtcdNodeConnection = errors.New("failed to connect to etcd node") + +// NewEtcdClientGenerator returns a new etcdClientGenerator instance. +func NewEtcdClientGenerator(restConfig *rest.Config, tlsConfig *tls.Config, etcdDialTimeout, etcdCallTimeout time.Duration) *EtcdClientGenerator { + ecg := &EtcdClientGenerator{restConfig: restConfig, tlsConfig: tlsConfig} + ecg.createClient = func(ctx context.Context, endpoint string) (*etcd.Client, error) { + p := proxy.Proxy{ + Kind: "pods", + Namespace: metav1.NamespaceSystem, + KubeConfig: ecg.restConfig, + Port: 2379, + } + return etcd.NewClient(ctx, etcd.ClientConfiguration{ + Endpoint: endpoint, + Proxy: p, + TLSConfig: tlsConfig, + DialTimeout: etcdDialTimeout, + CallTimeout: etcdCallTimeout, + }) + } + + return ecg +} + +func (c *EtcdClientGenerator) findEtcdProxyPod(ctx context.Context, nodeName string) (string, error) { + if c.etcdPodMap == nil { + clientset, err := kubernetes.NewForConfig(c.restConfig) + if err != nil { + return "", errors.Wrap(err, "unable to create client to target cluster") + } + + pods, err := clientset.CoreV1().Pods("kube-system").List(ctx, metav1.ListOptions{LabelSelector: "app=etcd-proxy"}) + if err != nil { + return "", errors.Wrap(err, "unable to list etcd-proxy pods in target cluster") + } + + if len(pods.Items) == 0 { + return "", errors.New("there isn't any etcd-proxy pods in target cluster") + } + + etcdmap := make(map[string]string) + for _, pod := range pods.Items { + etcdmap[pod.Spec.NodeName] = pod.Name + } + + c.etcdPodMap = &etcdmap + } + + podName, exists := (*c.etcdPodMap)[nodeName] + if !exists { + return "", errors.New(fmt.Sprintf("unable to find etcd proxy pod for node %s", nodeName)) + } + + return podName, nil +} + +// forFirstAvailableNode takes a list of nodes and returns a client for the first one that connects. +func (c *EtcdClientGenerator) forFirstAvailableNode(ctx context.Context, nodeNames []string) (*etcd.Client, error) { + // This is an additional safeguard for avoiding this func to return nil, nil. + if len(nodeNames) == 0 { + return nil, errors.New("invalid argument: forLeader can't be called with an empty list of nodes") + } + + clientset, err := kubernetes.NewForConfig(c.restConfig) + if err != nil { + return nil, errors.Wrap(err, "unable to create client to target cluster") + } + + pods, err := clientset.CoreV1().Pods("kube-system").List(ctx, metav1.ListOptions{LabelSelector: "app=etcd-proxy"}) + if err != nil { + return nil, errors.Wrap(err, "unable to list etcd-proxy pods in target cluster") + } + + etcdmap := make(map[string]string) + for _, pod := range pods.Items { + etcdmap[pod.Spec.NodeName] = pod.Name + } + + // Loop through the existing control plane nodes. + var errs []error + for _, name := range nodeNames { + podName, err := c.findEtcdProxyPod(ctx, name) + if err != nil { + errs = append(errs, err) + continue + } + + client, err := c.createClient(ctx, podName) + if err != nil { + errs = append(errs, err) + continue + } + return client, nil + } + return nil, errors.Wrap(kerrors.NewAggregate(errs), "could not establish a connection to any etcd node") +} + +// forLeader takes a list of nodes and returns a client to the leader node. +func (c *EtcdClientGenerator) forLeader(ctx context.Context, nodeNames []string) (*etcd.Client, error) { + // This is an additional safeguard for avoiding this func to return nil, nil. + if len(nodeNames) == 0 { + return nil, errors.New("invalid argument: forLeader can't be called with an empty list of nodes") + } + + nodes := sets.Set[string]{} + for _, n := range nodeNames { + nodes.Insert(n) + } + + // Loop through the existing control plane nodes. + var errs []error + for _, nodeName := range nodeNames { + cl, err := c.getLeaderClient(ctx, nodeName, nodes) + if err != nil { + if errors.Is(err, errEtcdNodeConnection) { + errs = append(errs, err) + continue + } + return nil, err + } + + return cl, nil + } + return nil, errors.Wrap(kerrors.NewAggregate(errs), "could not establish a connection to the etcd leader") +} + +// getLeaderClient provides an etcd client connected to the leader. It returns an +// errEtcdNodeConnection if there was a connection problem with the given etcd +// node, which should be considered non-fatal by the caller. +func (c *EtcdClientGenerator) getLeaderClient(ctx context.Context, nodeName string, allNodes sets.Set[string]) (*etcd.Client, error) { + // Get a temporary client to the etcd instance hosted on the node. + client, err := c.forFirstAvailableNode(ctx, []string{nodeName}) + if err != nil { + return nil, kerrors.NewAggregate([]error{err, errEtcdNodeConnection}) + } + defer client.Close() + + // Get the list of members. + members, err := client.Members(ctx) + if err != nil { + return nil, kerrors.NewAggregate([]error{err, errEtcdNodeConnection}) + } + + // Get the leader member. + var leaderMember *etcd.Member + for _, member := range members { + if member.ID == client.LeaderID { + leaderMember = member + break + } + } + + // If we found the leader, and it is one of the nodes, + // get a connection to the etcd leader via the node hosting it. + if leaderMember != nil { + if !allNodes.Has(leaderMember.Name) { + return nil, errors.Errorf("etcd leader is reported as %x with name %q, but we couldn't find a corresponding Node in the cluster", leaderMember.ID, leaderMember.Name) + } + client, err = c.forFirstAvailableNode(ctx, []string{leaderMember.Name}) + return client, err + } + + // If it is not possible to get a connection to the leader via existing nodes, + // it means that the control plane is an invalid state, with an etcd member - the current leader - + // without a corresponding node. + // TODO: In future we can eventually try to automatically remediate this condition by moving the leader + // to another member with a corresponding node. + return nil, errors.Errorf("etcd leader is reported as %x, but we couldn't find any matching member", client.LeaderID) +} diff --git a/controlplane/proxy/addr.go b/pkg/proxy/addr.go similarity index 100% rename from controlplane/proxy/addr.go rename to pkg/proxy/addr.go diff --git a/controlplane/proxy/conn.go b/pkg/proxy/conn.go similarity index 100% rename from controlplane/proxy/conn.go rename to pkg/proxy/conn.go diff --git a/controlplane/proxy/dial.go b/pkg/proxy/dial.go similarity index 100% rename from controlplane/proxy/dial.go rename to pkg/proxy/dial.go diff --git a/controlplane/proxy/proxy.go b/pkg/proxy/proxy.go similarity index 100% rename from controlplane/proxy/proxy.go rename to pkg/proxy/proxy.go From a7239ffa59a73c05a03f617f12fefd46eb0debc6 Mon Sep 17 00:00:00 2001 From: Mog Liang Date: Wed, 6 Dec 2023 01:44:45 +0000 Subject: [PATCH 05/15] install etcd-proxy daemonset --- .../controllers/kthreesconfig_controller.go | 11 ++++ pkg/etcd/etcd-proxy.go | 10 ++++ pkg/etcd/etcd-proxy.yaml | 50 +++++++++++++++++++ 3 files changed, 71 insertions(+) create mode 100644 pkg/etcd/etcd-proxy.go create mode 100644 pkg/etcd/etcd-proxy.yaml diff --git a/bootstrap/controllers/kthreesconfig_controller.go b/bootstrap/controllers/kthreesconfig_controller.go index 808f402b..dcc2608f 100644 --- a/bootstrap/controllers/kthreesconfig_controller.go +++ b/bootstrap/controllers/kthreesconfig_controller.go @@ -43,6 +43,7 @@ import ( bootstrapv1 "github.com/cluster-api-provider-k3s/cluster-api-k3s/bootstrap/api/v1beta1" "github.com/cluster-api-provider-k3s/cluster-api-k3s/pkg/cloudinit" + "github.com/cluster-api-provider-k3s/cluster-api-k3s/pkg/etcd" "github.com/cluster-api-provider-k3s/cluster-api-k3s/pkg/k3s" "github.com/cluster-api-provider-k3s/cluster-api-k3s/pkg/kubeconfig" "github.com/cluster-api-provider-k3s/cluster-api-k3s/pkg/locking" @@ -455,6 +456,16 @@ func (r *KThreesConfigReconciler) handleClusterNotInitialized(ctx context.Contex return ctrl.Result{}, err } + if scope.Config.Spec.IsEtcdManaged() { + etcdProxyFile := bootstrapv1.File{ + Path: etcd.EtcdProxyDaemonsetYamlLocation, + Content: etcd.EtcdProxyDaemonsetYaml, + Owner: "root:root", + Permissions: "0640", + } + files = append(files, etcdProxyFile) + } + cpinput := &cloudinit.ControlPlaneInput{ BaseUserData: cloudinit.BaseUserData{ PreK3sCommands: scope.Config.Spec.PreK3sCommands, diff --git a/pkg/etcd/etcd-proxy.go b/pkg/etcd/etcd-proxy.go new file mode 100644 index 00000000..67c096b6 --- /dev/null +++ b/pkg/etcd/etcd-proxy.go @@ -0,0 +1,10 @@ +package etcd + +import ( + _ "embed" +) + +const EtcdProxyDaemonsetYamlLocation = "/var/lib/rancher/k3s/server/manifests/etcd-proxy.yaml" + +//go:embed etcd-proxy.yaml +var EtcdProxyDaemonsetYaml string diff --git a/pkg/etcd/etcd-proxy.yaml b/pkg/etcd/etcd-proxy.yaml new file mode 100644 index 00000000..00fa4ae9 --- /dev/null +++ b/pkg/etcd/etcd-proxy.yaml @@ -0,0 +1,50 @@ +apiVersion: apps/v1 +kind: DaemonSet +metadata: + name: etcd-proxy + namespace: kube-system + labels: + app: etcd-proxy +spec: + selector: + matchLabels: + app: etcd-proxy + template: + metadata: + labels: + app: etcd-proxy + spec: + nodeSelector: + node-role.kubernetes.io/etcd: "true" + tolerations: + - key: node-role.kubernetes.io/control-plane + operator: Exists + effect: NoSchedule + - key: node-role.kubernetes.io/master + operator: Exists + effect: NoSchedule + containers: + - name: etcd-proxy + image: alpine/socat + env: + - name: HOSTIP + valueFrom: + fieldRef: + fieldPath: status.hostIP + args: + - TCP4-LISTEN:2379,fork,reuseaddr + - TCP4:$(HOSTIP):2379 + resources: + limits: + memory: 200Mi + requests: + cpu: 100m + memory: 200Mi + volumeMounts: + - name: varlog + mountPath: /var/log + terminationGracePeriodSeconds: 30 + volumes: + - name: varlog + hostPath: + path: /var/log From 599c2f1607021cecfa6ac40c4cd241212ffccfea Mon Sep 17 00:00:00 2001 From: Mog Liang Date: Tue, 5 Dec 2023 02:06:16 +0000 Subject: [PATCH 06/15] enable etcd operation when scale down --- controlplane/controllers/scale.go | 17 +-- pkg/k3s/control_plane.go | 2 +- pkg/k3s/workload_cluster.go | 18 ++- pkg/k3s/workload_cluster_etcd.go | 224 ++++++++++++++++++++++++++++++ 4 files changed, 242 insertions(+), 19 deletions(-) create mode 100644 pkg/k3s/workload_cluster_etcd.go diff --git a/controlplane/controllers/scale.go b/controlplane/controllers/scale.go index a1666bbd..10798ad0 100644 --- a/controlplane/controllers/scale.go +++ b/controlplane/controllers/scale.go @@ -114,12 +114,6 @@ func (r *KThreesControlPlaneReconciler) scaleDownControlPlane( return result, err } - // workloadCluster, err := r.managementCluster.GetWorkloadCluster(ctx, util.ObjectKey(cluster)) - // if err != nil { - // logger.Error(err, "Failed to create client to workload cluster") - // return ctrl.Result{}, fmt.Errorf(err, "failed to create client to workload cluster") - // } - if machineToDelete == nil { logger.Info("Failed to pick control plane Machine to delete") return ctrl.Result{}, fmt.Errorf("failed to pick control plane Machine to delete: %w", err) @@ -127,19 +121,26 @@ func (r *KThreesControlPlaneReconciler) scaleDownControlPlane( // TODO figure out etcd complexities // If KCP should manage etcd, If etcd leadership is on machine that is about to be deleted, move it to the newest member available. - /** if controlPlane.IsEtcdManaged() { + logger.Info("will call etcd") + workloadCluster, err := r.managementCluster.GetWorkloadCluster(ctx, util.ObjectKey(cluster)) + if err != nil { + logger.Error(err, "Failed to create client to workload cluster") + return ctrl.Result{}, fmt.Errorf("failed to create client to workload cluster: %w", err) + } + etcdLeaderCandidate := controlPlane.Machines.Newest() if err := workloadCluster.ForwardEtcdLeadership(ctx, machineToDelete, etcdLeaderCandidate); err != nil { logger.Error(err, "Failed to move leadership to candidate machine", "candidate", etcdLeaderCandidate.Name) return ctrl.Result{}, err } + logger.Info("etcd move etcd leader succeed") if err := workloadCluster.RemoveEtcdMemberForMachine(ctx, machineToDelete); err != nil { logger.Error(err, "Failed to remove etcd member for machine") return ctrl.Result{}, err } + logger.Info("etcd remove etcd member succeed") } - **/ logger = logger.WithValues("machine", machineToDelete) if err := r.Client.Delete(ctx, machineToDelete); err != nil && !apierrors.IsNotFound(err) { diff --git a/pkg/k3s/control_plane.go b/pkg/k3s/control_plane.go index 162a0b6e..b3bf7bfe 100644 --- a/pkg/k3s/control_plane.go +++ b/pkg/k3s/control_plane.go @@ -314,7 +314,7 @@ func getKThreesConfigs(ctx context.Context, cl client.Client, machines Filterabl // IsEtcdManaged returns true if the control plane relies on a managed etcd. func (c *ControlPlane) IsEtcdManaged() bool { - return false + return true } // UnhealthyMachines returns the list of control plane machines marked as unhealthy by MHC. diff --git a/pkg/k3s/workload_cluster.go b/pkg/k3s/workload_cluster.go index 1ffeb3c8..f6c0aa70 100644 --- a/pkg/k3s/workload_cluster.go +++ b/pkg/k3s/workload_cluster.go @@ -35,24 +35,22 @@ type WorkloadCluster interface { ClusterStatus(ctx context.Context) (ClusterStatus, error) UpdateAgentConditions(ctx context.Context, controlPlane *ControlPlane) UpdateEtcdConditions(ctx context.Context, controlPlane *ControlPlane) - // Upgrade related tasks. - // RemoveEtcdMemberForMachine(ctx context.Context, machine *clusterv1.Machine) error + // Etcd tasks + RemoveEtcdMemberForMachine(ctx context.Context, machine *clusterv1.Machine) error + ForwardEtcdLeadership(ctx context.Context, machine *clusterv1.Machine, leaderCandidate *clusterv1.Machine) error + ReconcileEtcdMembers(ctx context.Context, nodeNames []string) ([]string, error) - // ForwardEtcdLeadership(ctx context.Context, machine *clusterv1.Machine, leaderCandidate *clusterv1.Machine) error - // AllowBootstrapTokensToGetNodes(ctx context.Context) error - - // State recovery tasks. - // ReconcileEtcdMembers(ctx context.Context, nodeNames []string) ([]string, error) + // AllowBootstrapTokensToGetNodes(ctx context.Context) error } // Workload defines operations on workload clusters. type Workload struct { WorkloadCluster - Client ctrlclient.Client - CoreDNSMigrator coreDNSMigrator - // etcdClientGenerator etcdClientFor + Client ctrlclient.Client + CoreDNSMigrator coreDNSMigrator + etcdClientGenerator etcdClientFor } // ClusterStatus holds stats information about the cluster. diff --git a/pkg/k3s/workload_cluster_etcd.go b/pkg/k3s/workload_cluster_etcd.go new file mode 100644 index 00000000..8e92e95c --- /dev/null +++ b/pkg/k3s/workload_cluster_etcd.go @@ -0,0 +1,224 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package k3s + +import ( + "context" + + "github.com/pkg/errors" + kerrors "k8s.io/apimachinery/pkg/util/errors" + clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1" + + "github.com/cluster-api-provider-k3s/cluster-api-k3s/pkg/etcd" + etcdutil "github.com/cluster-api-provider-k3s/cluster-api-k3s/pkg/etcd/util" +) + +type etcdClientFor interface { + forFirstAvailableNode(ctx context.Context, nodeNames []string) (*etcd.Client, error) + forLeader(ctx context.Context, nodeNames []string) (*etcd.Client, error) +} + +// ReconcileEtcdMembers iterates over all etcd members and finds members that do not have corresponding nodes. +// If there are any such members, it deletes them from etcd and removes their nodes from the kubeadm configmap so that kubeadm does not run etcd health checks on them. +func (w *Workload) ReconcileEtcdMembers(ctx context.Context, nodeNames []string) ([]string, error) { + allRemovedMembers := []string{} + allErrs := []error{} + for _, nodeName := range nodeNames { + removedMembers, errs := w.reconcileEtcdMember(ctx, nodeNames, nodeName) + allRemovedMembers = append(allRemovedMembers, removedMembers...) + allErrs = append(allErrs, errs...) + } + + return allRemovedMembers, kerrors.NewAggregate(allErrs) +} + +func (w *Workload) reconcileEtcdMember(ctx context.Context, nodeNames []string, nodeName string) ([]string, []error) { + // Create the etcd Client for the etcd Pod scheduled on the Node + etcdClient, err := w.etcdClientGenerator.forFirstAvailableNode(ctx, []string{nodeName}) + if err != nil { + return nil, nil + } + defer etcdClient.Close() + + members, err := etcdClient.Members(ctx) + if err != nil { + return nil, nil + } + + // Check if any member's node is missing from workload cluster + // If any, delete it with best effort + removedMembers := []string{} + errs := []error{} +loopmembers: + for _, member := range members { + // If this member is just added, it has a empty name until the etcd pod starts. Ignore it. + if member.Name == "" { + continue + } + + for _, nodeName := range nodeNames { + if member.Name == nodeName { + // We found the matching node, continue with the outer loop. + continue loopmembers + } + } + + // If we're here, the node cannot be found. + removedMembers = append(removedMembers, member.Name) + if err := w.removeMemberForNode(ctx, member.Name); err != nil { + errs = append(errs, err) + } + } + return removedMembers, errs +} + +// RemoveEtcdMemberForMachine removes the etcd member from the target cluster's etcd cluster. +// Removing the last remaining member of the cluster is not supported. +func (w *Workload) RemoveEtcdMemberForMachine(ctx context.Context, machine *clusterv1.Machine) error { + if machine == nil || machine.Status.NodeRef == nil { + // Nothing to do, no node for Machine + return nil + } + return w.removeMemberForNode(ctx, machine.Status.NodeRef.Name) +} + +func (w *Workload) removeMemberForNode(ctx context.Context, name string) error { + controlPlaneNodes, err := w.getControlPlaneNodes(ctx) + if err != nil { + return err + } + if len(controlPlaneNodes.Items) < 2 { + return ErrControlPlaneMinNodes + } + + // Exclude node being removed from etcd client node list + var remainingNodes []string + for _, n := range controlPlaneNodes.Items { + if n.Name != name { + remainingNodes = append(remainingNodes, n.Name) + } + } + etcdClient, err := w.etcdClientGenerator.forFirstAvailableNode(ctx, remainingNodes) + if err != nil { + return errors.Wrap(err, "failed to create etcd client") + } + defer etcdClient.Close() + + // List etcd members. This checks that the member is healthy, because the request goes through consensus. + members, err := etcdClient.Members(ctx) + if err != nil { + return errors.Wrap(err, "failed to list etcd members using etcd client") + } + member := etcdutil.MemberForName(members, name) + + // The member has already been removed, return immediately + if member == nil { + return nil + } + + if err := etcdClient.RemoveMember(ctx, member.ID); err != nil { + return errors.Wrap(err, "failed to remove member from etcd") + } + + return nil +} + +// ForwardEtcdLeadership forwards etcd leadership to the first follower. +func (w *Workload) ForwardEtcdLeadership(ctx context.Context, machine *clusterv1.Machine, leaderCandidate *clusterv1.Machine) error { + if machine == nil || machine.Status.NodeRef == nil { + return nil + } + if leaderCandidate == nil { + return errors.New("leader candidate cannot be nil") + } + if leaderCandidate.Status.NodeRef == nil { + return errors.New("leader has no node reference") + } + + nodes, err := w.getControlPlaneNodes(ctx) + if err != nil { + return errors.Wrap(err, "failed to list control plane nodes") + } + nodeNames := make([]string, 0, len(nodes.Items)) + for _, node := range nodes.Items { + nodeNames = append(nodeNames, node.Name) + } + etcdClient, err := w.etcdClientGenerator.forLeader(ctx, nodeNames) + if err != nil { + return errors.Wrap(err, "failed to create etcd client") + } + defer etcdClient.Close() + + members, err := etcdClient.Members(ctx) + if err != nil { + return errors.Wrap(err, "failed to list etcd members using etcd client") + } + + currentMember := etcdutil.MemberForName(members, machine.Status.NodeRef.Name) + if currentMember == nil || currentMember.ID != etcdClient.LeaderID { + // nothing to do, this is not the etcd leader + return nil + } + + // Move the leader to the provided candidate. + nextLeader := etcdutil.MemberForName(members, leaderCandidate.Status.NodeRef.Name) + if nextLeader == nil { + return errors.Errorf("failed to get etcd member from node %q", leaderCandidate.Status.NodeRef.Name) + } + if err := etcdClient.MoveLeader(ctx, nextLeader.ID); err != nil { + return errors.Wrapf(err, "failed to move leader") + } + return nil +} + +// EtcdMemberStatus contains status information for a single etcd member. +type EtcdMemberStatus struct { + Name string + Responsive bool +} + +// EtcdMembers returns the current set of members in an etcd cluster. +// +// NOTE: This methods uses control plane machines/nodes only to get in contact with etcd, +// but then it relies on etcd as ultimate source of truth for the list of members. +// This is intended to allow informed decisions on actions impacting etcd quorum. +func (w *Workload) EtcdMembers(ctx context.Context) ([]string, error) { + nodes, err := w.getControlPlaneNodes(ctx) + if err != nil { + return nil, errors.Wrap(err, "failed to list control plane nodes") + } + nodeNames := make([]string, 0, len(nodes.Items)) + for _, node := range nodes.Items { + nodeNames = append(nodeNames, node.Name) + } + etcdClient, err := w.etcdClientGenerator.forLeader(ctx, nodeNames) + if err != nil { + return nil, errors.Wrap(err, "failed to create etcd client") + } + defer etcdClient.Close() + + members, err := etcdClient.Members(ctx) + if err != nil { + return nil, errors.Wrap(err, "failed to list etcd members using etcd client") + } + + names := []string{} + for _, member := range members { + names = append(names, member.Name) + } + return names, nil +} From 1b3bc3baf1b1677790b55c0b6a65e025497d2e89 Mon Sep 17 00:00:00 2001 From: Mog Liang Date: Thu, 7 Dec 2023 01:54:43 +0000 Subject: [PATCH 07/15] create NewEtcdClientGenerator in workload --- pkg/k3s/management_cluster.go | 64 +++++++++++++++++++++++++++++++++-- pkg/k3s/workload_cluster.go | 61 ++++++++++++++++++++++++++++++++- 2 files changed, 121 insertions(+), 4 deletions(-) diff --git a/pkg/k3s/management_cluster.go b/pkg/k3s/management_cluster.go index ea37a963..c936fa90 100644 --- a/pkg/k3s/management_cluster.go +++ b/pkg/k3s/management_cluster.go @@ -2,15 +2,21 @@ package k3s import ( "context" + "crypto/tls" + "crypto/x509" "fmt" "time" + "github.com/pkg/errors" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/client-go/kubernetes/scheme" clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1" "sigs.k8s.io/cluster-api/controllers/remote" "sigs.k8s.io/controller-runtime/pkg/client" "github.com/cluster-api-provider-k3s/cluster-api-k3s/pkg/machinefilters" + "github.com/cluster-api-provider-k3s/cluster-api-k3s/pkg/secret" ) // ManagementCluster defines all behaviors necessary for something to function as a management cluster. @@ -25,7 +31,9 @@ type ManagementCluster interface { type Management struct { ManagementCluster - Client client.Reader + Client client.Reader + EtcdDialTimeout time.Duration + EtcdCallTimeout time.Duration } // RemoteClusterConnectionError represents a failure to connect to a remote cluster. @@ -81,8 +89,58 @@ func (m *Management) GetWorkloadCluster(ctx context.Context, clusterKey client.O return nil, &RemoteClusterConnectionError{Name: clusterKey.String(), Err: err} } - return &Workload{ + workload := &Workload{ Client: c, CoreDNSMigrator: &CoreDNSMigrator{}, - }, nil + } + + // Retrieves the etcd CA key Pair + crtData, keyData, err := m.getEtcdCAKeyPair(ctx, clusterKey) + if err != nil { + return nil, err + } + + // If etcd CA is not nil, then it's managed etcd + if crtData != nil { + clientCert, err := generateClientCert(crtData, keyData) + if err != nil { + return nil, err + } + + caPool := x509.NewCertPool() + caPool.AppendCertsFromPEM(crtData) + tlsConfig := &tls.Config{ + RootCAs: caPool, + Certificates: []tls.Certificate{clientCert}, + MinVersion: tls.VersionTLS12, + } + tlsConfig.InsecureSkipVerify = true + workload.etcdClientGenerator = NewEtcdClientGenerator(restConfig, tlsConfig, m.EtcdDialTimeout, m.EtcdCallTimeout) + } + + return workload, nil +} + +func (m *Management) getEtcdCAKeyPair(ctx context.Context, clusterKey client.ObjectKey) ([]byte, []byte, error) { + etcdCASecret := &corev1.Secret{} + etcdCAObjectKey := client.ObjectKey{ + Namespace: clusterKey.Namespace, + Name: fmt.Sprintf("%s-etcd", clusterKey.Name), + } + + // Try to get the certificate via the uncached client. + if err := m.Client.Get(ctx, etcdCAObjectKey, etcdCASecret); err != nil { + if apierrors.IsNotFound(err) { + return nil, nil, nil + } else { + return nil, nil, errors.Wrapf(err, "failed to get secret; etcd CA bundle %s/%s", etcdCAObjectKey.Namespace, etcdCAObjectKey.Name) + } + } + + crtData, ok := etcdCASecret.Data[secret.TLSCrtDataName] + if !ok { + return nil, nil, errors.Errorf("etcd tls crt does not exist for cluster %s/%s", clusterKey.Namespace, clusterKey.Name) + } + keyData := etcdCASecret.Data[secret.TLSKeyDataName] + return crtData, keyData, nil } diff --git a/pkg/k3s/workload_cluster.go b/pkg/k3s/workload_cluster.go index f6c0aa70..6f7fdd3d 100644 --- a/pkg/k3s/workload_cluster.go +++ b/pkg/k3s/workload_cluster.go @@ -2,16 +2,25 @@ package k3s import ( "context" - "errors" + "crypto" + "crypto/rand" + "crypto/rsa" + "crypto/tls" + "crypto/x509" + "crypto/x509/pkix" "fmt" + "math/big" "strings" + "time" + "github.com/pkg/errors" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1" "sigs.k8s.io/cluster-api/util" + "sigs.k8s.io/cluster-api/util/certs" "sigs.k8s.io/cluster-api/util/conditions" ctrlclient "sigs.k8s.io/controller-runtime/pkg/client" @@ -364,3 +373,53 @@ func (w *Workload) updateManagedEtcdConditions(ctx context.Context, controlPlane conditions.MarkTrue(machine, controlplanev1.MachineEtcdMemberHealthyCondition) } } + +func generateClientCert(caCertEncoded, caKeyEncoded []byte) (tls.Certificate, error) { + // TODO: need to cache clientkey to clusterCacheTracker to avoid recreating key frequently + clientKey, err := certs.NewPrivateKey() + if err != nil { + return tls.Certificate{}, errors.Wrapf(err, "error creating client key") + } + + caCert, err := certs.DecodeCertPEM(caCertEncoded) + if err != nil { + return tls.Certificate{}, err + } + caKey, err := certs.DecodePrivateKeyPEM(caKeyEncoded) + if err != nil { + return tls.Certificate{}, err + } + x509Cert, err := newClientCert(caCert, clientKey, caKey) + if err != nil { + return tls.Certificate{}, err + } + return tls.X509KeyPair(certs.EncodeCertPEM(x509Cert), certs.EncodePrivateKeyPEM(clientKey)) +} + +func newClientCert(caCert *x509.Certificate, key *rsa.PrivateKey, caKey crypto.Signer) (*x509.Certificate, error) { + cfg := certs.Config{ + CommonName: "cluster-api.x-k8s.io", + } + + now := time.Now().UTC() + + tmpl := x509.Certificate{ + SerialNumber: new(big.Int).SetInt64(0), + Subject: pkix.Name{ + CommonName: cfg.CommonName, + Organization: cfg.Organization, + }, + NotBefore: now.Add(time.Minute * -5), + NotAfter: now.Add(time.Hour * 24 * 365 * 10), // 10 years + KeyUsage: x509.KeyUsageDigitalSignature, + ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth}, + } + + b, err := x509.CreateCertificate(rand.Reader, &tmpl, caCert, key.Public(), caKey) + if err != nil { + return nil, errors.Wrapf(err, "failed to create signed client certificate: %+v", tmpl) + } + + c, err := x509.ParseCertificate(b) + return c, errors.WithStack(err) +} From 2e6e12b8e091bc95ea90648cc889b96307ee42bc Mon Sep 17 00:00:00 2001 From: Mog Liang Date: Thu, 7 Dec 2023 02:25:50 +0000 Subject: [PATCH 08/15] fix etcd member naming issue --- pkg/etcd/util/util.go | 18 ++++++++++-- pkg/k3s/etcd_client_generator.go | 8 ++++-- pkg/k3s/workload_cluster_etcd.go | 47 ++++---------------------------- 3 files changed, 26 insertions(+), 47 deletions(-) diff --git a/pkg/etcd/util/util.go b/pkg/etcd/util/util.go index 3ac2f80e..e25cdd04 100644 --- a/pkg/etcd/util/util.go +++ b/pkg/etcd/util/util.go @@ -18,15 +18,29 @@ limitations under the License. package util import ( + "strings" + "k8s.io/apimachinery/pkg/util/sets" "github.com/cluster-api-provider-k3s/cluster-api-k3s/pkg/etcd" ) +// TODO: find document confirmation of mapping between etcd member and node in k3s. +func NodeNameFromMember(member *etcd.Member) string { + memberName := member.Name + lastIndex := strings.LastIndex(memberName, "-") + + if lastIndex != -1 { + memberName = memberName[:lastIndex] + } + + return memberName +} + // MemberForName returns the etcd member with the matching name. func MemberForName(members []*etcd.Member, name string) *etcd.Member { for _, m := range members { - if m.Name == name { + if NodeNameFromMember(m) == name { return m } } @@ -37,7 +51,7 @@ func MemberForName(members []*etcd.Member, name string) *etcd.Member { func MemberNames(members []*etcd.Member) []string { names := make([]string, 0, len(members)) for _, m := range members { - names = append(names, m.Name) + names = append(names, NodeNameFromMember(m)) } return names } diff --git a/pkg/k3s/etcd_client_generator.go b/pkg/k3s/etcd_client_generator.go index 17e12f65..0f10fff9 100644 --- a/pkg/k3s/etcd_client_generator.go +++ b/pkg/k3s/etcd_client_generator.go @@ -30,6 +30,7 @@ import ( "k8s.io/client-go/rest" "github.com/cluster-api-provider-k3s/cluster-api-k3s/pkg/etcd" + "github.com/cluster-api-provider-k3s/cluster-api-k3s/pkg/etcd/util" "github.com/cluster-api-provider-k3s/cluster-api-k3s/pkg/proxy" ) @@ -198,10 +199,11 @@ func (c *EtcdClientGenerator) getLeaderClient(ctx context.Context, nodeName stri // If we found the leader, and it is one of the nodes, // get a connection to the etcd leader via the node hosting it. if leaderMember != nil { - if !allNodes.Has(leaderMember.Name) { - return nil, errors.Errorf("etcd leader is reported as %x with name %q, but we couldn't find a corresponding Node in the cluster", leaderMember.ID, leaderMember.Name) + nodeName := util.NodeNameFromMember(leaderMember) + if !allNodes.Has(nodeName) { + return nil, errors.Errorf("etcd leader is reported as %x with node name %q, but we couldn't find a corresponding Node in the cluster", leaderMember.ID, nodeName) } - client, err = c.forFirstAvailableNode(ctx, []string{leaderMember.Name}) + client, err = c.forFirstAvailableNode(ctx, []string{nodeName}) return client, err } diff --git a/pkg/k3s/workload_cluster_etcd.go b/pkg/k3s/workload_cluster_etcd.go index 8e92e95c..1f9990eb 100644 --- a/pkg/k3s/workload_cluster_etcd.go +++ b/pkg/k3s/workload_cluster_etcd.go @@ -65,21 +65,22 @@ func (w *Workload) reconcileEtcdMember(ctx context.Context, nodeNames []string, errs := []error{} loopmembers: for _, member := range members { + curNodeName := etcdutil.NodeNameFromMember(member) // If this member is just added, it has a empty name until the etcd pod starts. Ignore it. - if member.Name == "" { + if curNodeName == "" { continue } for _, nodeName := range nodeNames { - if member.Name == nodeName { + if curNodeName == nodeName { // We found the matching node, continue with the outer loop. continue loopmembers } } // If we're here, the node cannot be found. - removedMembers = append(removedMembers, member.Name) - if err := w.removeMemberForNode(ctx, member.Name); err != nil { + removedMembers = append(removedMembers, curNodeName) + if err := w.removeMemberForNode(ctx, curNodeName); err != nil { errs = append(errs, err) } } @@ -184,41 +185,3 @@ func (w *Workload) ForwardEtcdLeadership(ctx context.Context, machine *clusterv1 } return nil } - -// EtcdMemberStatus contains status information for a single etcd member. -type EtcdMemberStatus struct { - Name string - Responsive bool -} - -// EtcdMembers returns the current set of members in an etcd cluster. -// -// NOTE: This methods uses control plane machines/nodes only to get in contact with etcd, -// but then it relies on etcd as ultimate source of truth for the list of members. -// This is intended to allow informed decisions on actions impacting etcd quorum. -func (w *Workload) EtcdMembers(ctx context.Context) ([]string, error) { - nodes, err := w.getControlPlaneNodes(ctx) - if err != nil { - return nil, errors.Wrap(err, "failed to list control plane nodes") - } - nodeNames := make([]string, 0, len(nodes.Items)) - for _, node := range nodes.Items { - nodeNames = append(nodeNames, node.Name) - } - etcdClient, err := w.etcdClientGenerator.forLeader(ctx, nodeNames) - if err != nil { - return nil, errors.Wrap(err, "failed to create etcd client") - } - defer etcdClient.Close() - - members, err := etcdClient.Members(ctx) - if err != nil { - return nil, errors.Wrap(err, "failed to list etcd members using etcd client") - } - - names := []string{} - for _, member := range members { - names = append(names, member.Name) - } - return names, nil -} From 5af3411fd04a3bcca6cd63f26f147a465d6b1f7e Mon Sep 17 00:00:00 2001 From: Mog Liang Date: Thu, 7 Dec 2023 02:38:08 +0000 Subject: [PATCH 09/15] better logging --- controlplane/controllers/scale.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/controlplane/controllers/scale.go b/controlplane/controllers/scale.go index 10798ad0..9828bf05 100644 --- a/controlplane/controllers/scale.go +++ b/controlplane/controllers/scale.go @@ -122,7 +122,6 @@ func (r *KThreesControlPlaneReconciler) scaleDownControlPlane( // TODO figure out etcd complexities // If KCP should manage etcd, If etcd leadership is on machine that is about to be deleted, move it to the newest member available. if controlPlane.IsEtcdManaged() { - logger.Info("will call etcd") workloadCluster, err := r.managementCluster.GetWorkloadCluster(ctx, util.ObjectKey(cluster)) if err != nil { logger.Error(err, "Failed to create client to workload cluster") @@ -134,12 +133,12 @@ func (r *KThreesControlPlaneReconciler) scaleDownControlPlane( logger.Error(err, "Failed to move leadership to candidate machine", "candidate", etcdLeaderCandidate.Name) return ctrl.Result{}, err } - logger.Info("etcd move etcd leader succeed") + logger.Info("etcd move etcd leader succeed, node to delete %s", machineToDelete.Status.NodeRef.Name) if err := workloadCluster.RemoveEtcdMemberForMachine(ctx, machineToDelete); err != nil { logger.Error(err, "Failed to remove etcd member for machine") return ctrl.Result{}, err } - logger.Info("etcd remove etcd member succeed") + logger.Info("etcd remove etcd member succeed, node to delete %s", machineToDelete.Status.NodeRef.Name) } logger = logger.WithValues("machine", machineToDelete) From 61ad1b3b4d658267b91fcd16f86bf76bb013a03c Mon Sep 17 00:00:00 2001 From: Mog Liang Date: Thu, 7 Dec 2023 05:57:39 +0000 Subject: [PATCH 10/15] add cp EtcdDialTimeout EtcdCallTimeout options --- .../kthreescontrolplane_controller.go | 15 +++++++++++++-- controlplane/main.go | 18 +++++++++++++++--- 2 files changed, 28 insertions(+), 5 deletions(-) diff --git a/controlplane/controllers/kthreescontrolplane_controller.go b/controlplane/controllers/kthreescontrolplane_controller.go index ac13fe93..8760a045 100644 --- a/controlplane/controllers/kthreescontrolplane_controller.go +++ b/controlplane/controllers/kthreescontrolplane_controller.go @@ -61,6 +61,9 @@ type KThreesControlPlaneReconciler struct { controller controller.Controller recorder record.EventRecorder + EtcdDialTimeout time.Duration + EtcdCallTimeout time.Duration + managementCluster k3s.ManagementCluster managementClusterUncached k3s.ManagementCluster } @@ -290,11 +293,19 @@ func (r *KThreesControlPlaneReconciler) SetupWithManager(mgr ctrl.Manager) error r.recorder = mgr.GetEventRecorderFor("k3s-control-plane-controller") if r.managementCluster == nil { - r.managementCluster = &k3s.Management{Client: r.Client} + r.managementCluster = &k3s.Management{ + Client: r.Client, + EtcdDialTimeout: r.EtcdDialTimeout, + EtcdCallTimeout: r.EtcdCallTimeout, + } } if r.managementClusterUncached == nil { - r.managementClusterUncached = &k3s.Management{Client: mgr.GetAPIReader()} + r.managementClusterUncached = &k3s.Management{ + Client: mgr.GetAPIReader(), + EtcdDialTimeout: r.EtcdDialTimeout, + EtcdCallTimeout: r.EtcdCallTimeout, + } } return nil diff --git a/controlplane/main.go b/controlplane/main.go index a793d7a0..5efdc621 100644 --- a/controlplane/main.go +++ b/controlplane/main.go @@ -32,6 +32,7 @@ import ( bootstrapv1beta1 "github.com/cluster-api-provider-k3s/cluster-api-k3s/bootstrap/api/v1beta1" controlplanev1beta1 "github.com/cluster-api-provider-k3s/cluster-api-k3s/controlplane/api/v1beta1" "github.com/cluster-api-provider-k3s/cluster-api-k3s/controlplane/controllers" + "github.com/cluster-api-provider-k3s/cluster-api-k3s/pkg/etcd" ) var ( @@ -53,6 +54,8 @@ func main() { var metricsAddr string var enableLeaderElection bool var syncPeriod time.Duration + var etcdDialTimeout time.Duration + var etcdCallTimeout time.Duration flag.StringVar(&metricsAddr, "metrics-addr", ":8080", "The address the metric endpoint binds to.") flag.BoolVar(&enableLeaderElection, "enable-leader-election", false, @@ -61,6 +64,13 @@ func main() { flag.DurationVar(&syncPeriod, "sync-period", 10*time.Minute, "The minimum interval at which watched resources are reconciled (e.g. 15m)") + + flag.DurationVar(&etcdDialTimeout, "etcd-dial-timeout-duration", 10*time.Second, + "Duration that the etcd client waits at most to establish a connection with etcd") + + flag.DurationVar(&etcdCallTimeout, "etcd-call-timeout-duration", etcd.DefaultCallTimeout, + "Duration that the etcd client waits at most for read and write operations to etcd.") + flag.Parse() ctrl.SetLogger(zap.New(zap.UseDevMode(true))) @@ -79,9 +89,11 @@ func main() { } if err = (&controllers.KThreesControlPlaneReconciler{ - Client: mgr.GetClient(), - Log: ctrl.Log.WithName("controllers").WithName("KThreesControlPlane"), - Scheme: mgr.GetScheme(), + Client: mgr.GetClient(), + Log: ctrl.Log.WithName("controllers").WithName("KThreesControlPlane"), + Scheme: mgr.GetScheme(), + EtcdDialTimeout: etcdDialTimeout, + EtcdCallTimeout: etcdCallTimeout, }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "KThreesControlPlane") os.Exit(1) From 1064b1981b0c5c9d93b9d98f723c3e820cfe58bb Mon Sep 17 00:00:00 2001 From: Mog Liang Date: Thu, 7 Dec 2023 06:12:06 +0000 Subject: [PATCH 11/15] fix --- pkg/k3s/control_plane.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/k3s/control_plane.go b/pkg/k3s/control_plane.go index b3bf7bfe..a1b03a00 100644 --- a/pkg/k3s/control_plane.go +++ b/pkg/k3s/control_plane.go @@ -314,7 +314,7 @@ func getKThreesConfigs(ctx context.Context, cl client.Client, machines Filterabl // IsEtcdManaged returns true if the control plane relies on a managed etcd. func (c *ControlPlane) IsEtcdManaged() bool { - return true + return c.KCP.Spec.KThreesConfigSpec.IsEtcdManaged() } // UnhealthyMachines returns the list of control plane machines marked as unhealthy by MHC. From 2891694dc1739d377cbb924165d92c3fc393bc98 Mon Sep 17 00:00:00 2001 From: Mog Liang Date: Mon, 11 Dec 2023 06:50:14 +0000 Subject: [PATCH 12/15] check if etcdca exists in cp reconcile loop --- bootstrap/api/v1beta1/kthreesconfig_types.go | 2 +- .../controllers/kthreesconfig_controller.go | 12 +++++++++- pkg/k3s/control_plane.go | 22 ++++++++++++++++++- pkg/secret/certificates.go | 2 +- 4 files changed, 34 insertions(+), 4 deletions(-) diff --git a/bootstrap/api/v1beta1/kthreesconfig_types.go b/bootstrap/api/v1beta1/kthreesconfig_types.go index 022d9016..77b4458f 100644 --- a/bootstrap/api/v1beta1/kthreesconfig_types.go +++ b/bootstrap/api/v1beta1/kthreesconfig_types.go @@ -53,7 +53,7 @@ type KThreesConfigSpec struct { // TODO // Will need extend this func when implementing other k3s database options. -func (c *KThreesConfigSpec) IsEtcdManaged() bool { +func (c *KThreesConfigSpec) IsEtcdEmbedded() bool { return true } diff --git a/bootstrap/controllers/kthreesconfig_controller.go b/bootstrap/controllers/kthreesconfig_controller.go index dcc2608f..de2756c3 100644 --- a/bootstrap/controllers/kthreesconfig_controller.go +++ b/bootstrap/controllers/kthreesconfig_controller.go @@ -252,6 +252,16 @@ func (r *KThreesConfigReconciler) joinControlplane(ctx context.Context, scope *S return err } + if scope.Config.Spec.IsEtcdEmbedded() { + etcdProxyFile := bootstrapv1.File{ + Path: etcd.EtcdProxyDaemonsetYamlLocation, + Content: etcd.EtcdProxyDaemonsetYaml, + Owner: "root:root", + Permissions: "0640", + } + files = append(files, etcdProxyFile) + } + cpInput := &cloudinit.ControlPlaneInput{ BaseUserData: cloudinit.BaseUserData{ PreK3sCommands: scope.Config.Spec.PreK3sCommands, @@ -456,7 +466,7 @@ func (r *KThreesConfigReconciler) handleClusterNotInitialized(ctx context.Contex return ctrl.Result{}, err } - if scope.Config.Spec.IsEtcdManaged() { + if scope.Config.Spec.IsEtcdEmbedded() { etcdProxyFile := bootstrapv1.File{ Path: etcd.EtcdProxyDaemonsetYamlLocation, Content: etcd.EtcdProxyDaemonsetYaml, diff --git a/pkg/k3s/control_plane.go b/pkg/k3s/control_plane.go index a1b03a00..8606cc9a 100644 --- a/pkg/k3s/control_plane.go +++ b/pkg/k3s/control_plane.go @@ -26,6 +26,7 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/types" kerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apiserver/pkg/storage/names" clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1" @@ -52,6 +53,11 @@ type ControlPlane struct { Machines FilterableMachineCollection machinesPatchHelpers map[string]*patch.Helper + // check if mgmt cluster has target cluster's etcd ca. + // for old cluster created before connect-etcd feature, mgmt cluster don't + // store etcd ca, controlplane reconcile loop need bypass any etcd operations. + hasEtcdCA bool + // reconciliationTime is the time of the current reconciliation, and should be used for all "now" calculations reconciliationTime metav1.Time @@ -80,11 +86,25 @@ func NewControlPlane(ctx context.Context, client client.Client, cluster *cluster patchHelpers[machine.Name] = patchHelper } + hasEtcdCA := false + etcdCASecret := &corev1.Secret{} + etcdCAObjectKey := types.NamespacedName{ + Namespace: cluster.Namespace, + Name: fmt.Sprintf("%s-etcd", cluster.Name), + } + + if err := client.Get(ctx, etcdCAObjectKey, etcdCASecret); err == nil { + hasEtcdCA = true + } else if !apierrors.IsNotFound(err) { + return nil, err + } + return &ControlPlane{ KCP: kcp, Cluster: cluster, Machines: ownedMachines, machinesPatchHelpers: patchHelpers, + hasEtcdCA: hasEtcdCA, kthreesConfigs: kthreesConfigs, infraResources: infraObjects, reconciliationTime: metav1.Now(), @@ -314,7 +334,7 @@ func getKThreesConfigs(ctx context.Context, cl client.Client, machines Filterabl // IsEtcdManaged returns true if the control plane relies on a managed etcd. func (c *ControlPlane) IsEtcdManaged() bool { - return c.KCP.Spec.KThreesConfigSpec.IsEtcdManaged() + return c.KCP.Spec.KThreesConfigSpec.IsEtcdEmbedded() && c.hasEtcdCA } // UnhealthyMachines returns the list of control plane machines marked as unhealthy by MHC. diff --git a/pkg/secret/certificates.go b/pkg/secret/certificates.go index 7deebf39..2bc073b0 100644 --- a/pkg/secret/certificates.go +++ b/pkg/secret/certificates.go @@ -81,7 +81,7 @@ func NewCertificatesForInitialControlPlane(config *bootstrapv1.KThreesConfigSpec }, } - if config.IsEtcdManaged() { + if config.IsEtcdEmbedded() { etcdCert := &Certificate{ Purpose: EtcdCA, CertFile: filepath.Join(certificatesDir, "etcd", "server-ca.crt"), From c0e06b354e0106642901b5b5da540af7d10561ca Mon Sep 17 00:00:00 2001 From: Mog Liang Date: Mon, 11 Dec 2023 07:57:15 +0000 Subject: [PATCH 13/15] rename func IsEtcdEmbedded --- bootstrap/api/v1beta1/kthreesconfig_types.go | 2 +- pkg/secret/certificates.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/bootstrap/api/v1beta1/kthreesconfig_types.go b/bootstrap/api/v1beta1/kthreesconfig_types.go index 022d9016..77b4458f 100644 --- a/bootstrap/api/v1beta1/kthreesconfig_types.go +++ b/bootstrap/api/v1beta1/kthreesconfig_types.go @@ -53,7 +53,7 @@ type KThreesConfigSpec struct { // TODO // Will need extend this func when implementing other k3s database options. -func (c *KThreesConfigSpec) IsEtcdManaged() bool { +func (c *KThreesConfigSpec) IsEtcdEmbedded() bool { return true } diff --git a/pkg/secret/certificates.go b/pkg/secret/certificates.go index 7deebf39..2bc073b0 100644 --- a/pkg/secret/certificates.go +++ b/pkg/secret/certificates.go @@ -81,7 +81,7 @@ func NewCertificatesForInitialControlPlane(config *bootstrapv1.KThreesConfigSpec }, } - if config.IsEtcdManaged() { + if config.IsEtcdEmbedded() { etcdCert := &Certificate{ Purpose: EtcdCA, CertFile: filepath.Join(certificatesDir, "etcd", "server-ca.crt"), From c7ba379dcf47b89ac4c6702f758342fd9163edca Mon Sep 17 00:00:00 2001 From: Mog Liang Date: Wed, 13 Dec 2023 01:57:57 +0000 Subject: [PATCH 14/15] fix comment --- controlplane/controllers/scale.go | 1 - 1 file changed, 1 deletion(-) diff --git a/controlplane/controllers/scale.go b/controlplane/controllers/scale.go index 9828bf05..f71413b2 100644 --- a/controlplane/controllers/scale.go +++ b/controlplane/controllers/scale.go @@ -119,7 +119,6 @@ func (r *KThreesControlPlaneReconciler) scaleDownControlPlane( return ctrl.Result{}, fmt.Errorf("failed to pick control plane Machine to delete: %w", err) } - // TODO figure out etcd complexities // If KCP should manage etcd, If etcd leadership is on machine that is about to be deleted, move it to the newest member available. if controlPlane.IsEtcdManaged() { workloadCluster, err := r.managementCluster.GetWorkloadCluster(ctx, util.ObjectKey(cluster)) From 687fe965d0aedeff3d3656c25c39e2f15d257bbd Mon Sep 17 00:00:00 2001 From: mogliang Date: Mon, 29 Jan 2024 09:56:14 +0800 Subject: [PATCH 15/15] fix --- controlplane/controllers/scale.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/controlplane/controllers/scale.go b/controlplane/controllers/scale.go index f71413b2..a2bc2fef 100644 --- a/controlplane/controllers/scale.go +++ b/controlplane/controllers/scale.go @@ -132,12 +132,12 @@ func (r *KThreesControlPlaneReconciler) scaleDownControlPlane( logger.Error(err, "Failed to move leadership to candidate machine", "candidate", etcdLeaderCandidate.Name) return ctrl.Result{}, err } - logger.Info("etcd move etcd leader succeed, node to delete %s", machineToDelete.Status.NodeRef.Name) + logger.Info("etcd move etcd leader succeeded, node to delete %s", machineToDelete.Status.NodeRef.Name) if err := workloadCluster.RemoveEtcdMemberForMachine(ctx, machineToDelete); err != nil { logger.Error(err, "Failed to remove etcd member for machine") return ctrl.Result{}, err } - logger.Info("etcd remove etcd member succeed, node to delete %s", machineToDelete.Status.NodeRef.Name) + logger.Info("etcd remove etcd member succeeded, node to delete %s", machineToDelete.Status.NodeRef.Name) } logger = logger.WithValues("machine", machineToDelete)