This repository has been archived by the owner on Apr 18, 2022. It is now read-only.
forked from nrmitchi/k8s-controller-sidecars
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathmain.go
138 lines (122 loc) · 4.38 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
package main
import (
"os"
"os/signal"
"syscall"
log "github.com/Sirupsen/logrus"
api_v1 "k8s.io/api/core/v1"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/workqueue"
)
// retrieve the Kubernetes cluster client from outside of the cluster
func getKubernetesClient() kubernetes.Interface {
// construct the path to resolve to `~/.kube/config`
kubeConfigPath := "" // os.Getenv("HOME") + "/.kube/config"
// create the config from the path
config, err := clientcmd.BuildConfigFromFlags("", kubeConfigPath)
if err != nil {
log.Fatalf("getClusterConfig: %v", err)
}
// generate the client based off of the config
client, err := kubernetes.NewForConfig(config)
if err != nil {
log.Fatalf("getClusterConfig: %v", err)
}
log.Debugf("Successfully constructed k8s client")
return client
}
// main code path
func main() {
// get the Kubernetes client for connectivity
client := getKubernetesClient()
namespace := meta_v1.NamespaceAll
// create the informer so that we can not only list resources
// but also watch them for all pods in the default namespace
informer := cache.NewSharedIndexInformer(
// the ListWatch contains two different functions that our
// informer requires: ListFunc to take care of listing and watching
// the resources we want to handle
&cache.ListWatch{
ListFunc: func(options meta_v1.ListOptions) (runtime.Object, error) {
// list all of the pods (core resource) in the deafult namespace
return client.CoreV1().Pods(namespace).List(options)
},
WatchFunc: func(options meta_v1.ListOptions) (watch.Interface, error) {
// watch all of the pods (core resource) in the default namespace
return client.CoreV1().Pods(namespace).Watch(options)
},
},
&api_v1.Pod{}, // the target type (Pod)
0, // no resync (period of 0)
cache.Indexers{},
)
// create a new queue so that when the informer gets a resource that is either
// a result of listing or watching, we can add an idenfitying key to the queue
// so that it can be handled in the handler
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
// add event handlers to handle the three types of events for resources:
// - adding new resources
// - updating existing resources
// - deleting resources
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
// convert the resource object into a key (in this case
// we are just doing it in the format of 'namespace/name')
key, err := cache.MetaNamespaceKeyFunc(obj)
log.Debugf("Add pod: %s", key)
if err == nil {
// add the key to the queue for the handler to get
queue.Add(key)
log.Debugf(" Queue len: %d", queue.Len())
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(newObj)
log.Debugf("Update pod: %s", key)
if err == nil {
queue.Add(key)
log.Debugf(" Queue len: %d", queue.Len())
}
},
DeleteFunc: func(obj interface{}) {
// DeletionHandlingMetaNamsespaceKeyFunc is a helper function that allows
// us to check the DeletedFinalStateUnknown existence in the event that
// a resource was deleted but it is still contained in the index
//
// this then in turn calls MetaNamespaceKeyFunc
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
log.Infof("Delete pod: %s", key)
if err == nil {
queue.Add(key)
log.Debugf(" Queue len: %d", queue.Len())
}
},
})
// construct the Controller object which has all of the necessary components to
// handle logging, connections, informing (listing and watching), the queue,
// and the handler
controller := Controller{
logger: log.NewEntry(log.New()),
clientset: client,
informer: informer,
queue: queue,
handler: &SidecarShutdownHandler{},
}
// use a channel to synchronize the finalization for a graceful shutdown
stopCh := make(chan struct{})
defer close(stopCh)
// run the controller loop to process items
go controller.Run(stopCh)
// use a channel to handle OS signals to terminate and gracefully shut
// down processing
sigTerm := make(chan os.Signal, 1)
signal.Notify(sigTerm, syscall.SIGTERM)
signal.Notify(sigTerm, syscall.SIGINT)
<-sigTerm
log.Info("Shutting down....")
}