diff --git a/controllers/policyendpoints_controller.go b/controllers/policyendpoints_controller.go index 014ea59..585fe25 100644 --- a/controllers/policyendpoints_controller.go +++ b/controllers/policyendpoints_controller.go @@ -197,6 +197,33 @@ func (r *PolicyEndpointsReconciler) cleanUpPolicyEndpoint(ctx context.Context, r return nil } +func (r *PolicyEndpointsReconciler) isProgFdShared(ctx context.Context, targetPodName string, + targetPodNamespace string) bool { + targetpodNamespacedName := utils.GetPodNamespacedName(targetPodName, targetPodNamespace) + var foundSharedIngress bool + var foundSharedEgress bool + if targetProgFD, ok := r.ebpfClient.GetIngressPodToProgMap().Load(targetpodNamespacedName); ok { + if currentList, ok := r.ebpfClient.GetIngressProgToPodsMap().Load(targetProgFD); ok { + podsList, ok := currentList.(map[string]struct{}) + if ok && len(podsList) > 1 { + foundSharedIngress = true + r.log.Info("isProgFdShared", "Found shared ingress progFD for target: ", targetPodName, "progFD: ", targetProgFD) + } + } + } + + if targetProgFD, ok := r.ebpfClient.GetEgressPodToProgMap().Load(targetpodNamespacedName); ok { + if currentList, ok := r.ebpfClient.GetEgressProgToPodsMap().Load(targetProgFD); ok { + podsList, ok := currentList.(map[string]struct{}) + if ok && len(podsList) > 1 { + foundSharedEgress = true + r.log.Info("isProgFdShared", "Found shared egress progFD for target: ", targetPodName, "progFD: ", targetProgFD) + } + } + } + return foundSharedIngress || foundSharedEgress +} + func (r *PolicyEndpointsReconciler) updatePolicyEnforcementStatusForPods(ctx context.Context, policyEndpointName string, targetPods []types.NamespacedName, podIdentifiers map[string]bool, isDeleteFlow bool) error { var err error @@ -211,11 +238,10 @@ func (r *PolicyEndpointsReconciler) updatePolicyEnforcementStatusForPods(ctx con deletePinPath := true podIdentifier := utils.GetPodIdentifier(targetPod.Name, targetPod.Namespace, r.log) r.log.Info("Derived ", "Pod identifier to check if update is needed : ", podIdentifier) - //Derive the podIdentifier and check if there is another pod in the same replicaset using the pinpath - if found, ok := podIdentifiers[podIdentifier]; ok { - //podIdentifiers will always have true in the value if found.. - r.log.Info("PodIdentifier pinpath ", "shared: ", found) - deletePinPath = !found + // check if ebpf progs are being shared by any other pods + if progFdShared := r.isProgFdShared(ctx, targetPod.Name, targetPod.Namespace); progFdShared { + r.log.Info("ProgFD pinpath ", "shared: ", progFdShared) + deletePinPath = !progFdShared } cleanupErr := r.cleanupeBPFProbes(ctx, targetPod, policyEndpointName, deletePinPath, isDeleteFlow) @@ -584,9 +610,17 @@ func (r *PolicyEndpointsReconciler) getPodListToBeCleanedUp(oldPodSet []types.Na break } } + // We want to clean up the pod and detach ebpf probes in two cases + // 1. When pod is still running but pod is not an active pod against policy endpoint which implies policy endpoint is no longer applied to the podIdentifier + // 2. When pod is deleted and is the last pod in the PodIdentifier if !activePod && !podIdentifiers[oldPodIdentifier] { r.log.Info("Pod to cleanup: ", "name: ", oldPod.Name, "namespace: ", oldPod.Namespace) podsToBeCleanedUp = append(podsToBeCleanedUp, oldPod) + // When pod is deleted and this is not the last pod in podIdentifier, we are just updating the podName and progFD local caches + } else if !activePod { + r.log.Info("Pod not active. Deleting from progPod caches", "podName: ", oldPod.Name, "podNamespace: ", oldPod.Namespace) + r.ebpfClient.DeletePodFromIngressProgPodCaches(oldPod.Name, oldPod.Namespace) + r.ebpfClient.DeletePodFromEgressProgPodCaches(oldPod.Name, oldPod.Namespace) } } @@ -651,7 +685,6 @@ func (r *PolicyEndpointsReconciler) deletePolicyEndpointFromPodIdentifierMap(ctx policyEndpoint string) { r.podIdentifierToPolicyEndpointMapMutex.Lock() defer r.podIdentifierToPolicyEndpointMapMutex.Unlock() - var currentPEList []string if policyEndpointList, ok := r.podIdentifierToPolicyEndpointMap.Load(podIdentifier); ok { for _, policyEndpointName := range policyEndpointList.([]string) { diff --git a/pkg/ebpf/bpf_client.go b/pkg/ebpf/bpf_client.go index d281847..d50a1c7 100644 --- a/pkg/ebpf/bpf_client.go +++ b/pkg/ebpf/bpf_client.go @@ -89,6 +89,12 @@ type BpfClient interface { UpdateEbpfMaps(podIdentifier string, ingressFirewallRules []EbpfFirewallRules, egressFirewallRules []EbpfFirewallRules) error IsEBPFProbeAttached(podName string, podNamespace string) (bool, bool) IsMapUpdateRequired(podIdentifier string) bool + GetIngressPodToProgMap() *sync.Map + GetEgressPodToProgMap() *sync.Map + GetIngressProgToPodsMap() *sync.Map + GetEgressProgToPodsMap() *sync.Map + DeletePodFromIngressProgPodCaches(podName string, podNamespace string) + DeletePodFromEgressProgPodCaches(podName string, podNamespace string) } type EvProgram struct { @@ -113,11 +119,13 @@ func NewBpfClient(policyEndpointeBPFContext *sync.Map, nodeIP string, enablePoli ebpfClient := &bpfClient{ policyEndpointeBPFContext: policyEndpointeBPFContext, - IngressProgPodMap: new(sync.Map), - EgressProgPodMap: new(sync.Map), + IngressPodToProgMap: new(sync.Map), + EgressPodToProgMap: new(sync.Map), nodeIP: nodeIP, enableIPv6: enableIPv6, GlobalMaps: new(sync.Map), + IngressProgToPodsMap: new(sync.Map), + EgressProgToPodsMap: new(sync.Map), } ebpfClient.logger = ctrl.Log.WithName("ebpf-client") ingressBinary, egressBinary, eventsBinary, @@ -256,9 +264,9 @@ type bpfClient struct { // Stores eBPF Ingress and Egress context per policyEndpoint resource policyEndpointeBPFContext *sync.Map // Stores the Ingress eBPF Prog FD per pod - IngressProgPodMap *sync.Map + IngressPodToProgMap *sync.Map // Stores the Egress eBPF Prog FD per pod - EgressProgPodMap *sync.Map + EgressPodToProgMap *sync.Map // Stores info on the global maps the agent creates GlobalMaps *sync.Map // Primary IP of the node @@ -279,6 +287,10 @@ type bpfClient struct { bpfTCClient tc.BpfTc // Logger instance logger logr.Logger + // Stores the Ingress eBPF Prog FD to pods mapping + IngressProgToPodsMap *sync.Map + // Stores the Egress eBPF Prog FD to pods mapping + EgressProgToPodsMap *sync.Map } type Event_t struct { @@ -410,6 +422,22 @@ func recoverBPFState(eBPFSDKClient goelf.BpfSDKClient, policyEndpointeBPFContext return isConntrackMapPresent, isPolicyEventsMapPresent, eventsMapFD, nil } +func (l *bpfClient) GetIngressPodToProgMap() *sync.Map { + return l.IngressPodToProgMap +} + +func (l *bpfClient) GetEgressPodToProgMap() *sync.Map { + return l.EgressPodToProgMap +} + +func (l *bpfClient) GetIngressProgToPodsMap() *sync.Map { + return l.IngressProgToPodsMap +} + +func (l *bpfClient) GetEgressProgToPodsMap() *sync.Map { + return l.EgressProgToPodsMap +} + func (l *bpfClient) AttacheBPFProbes(pod types.NamespacedName, podIdentifier string, ingress bool, egress bool) error { start := time.Now() // We attach the TC probes to the hostVeth interface of the pod. Derive the hostVeth @@ -428,7 +456,10 @@ func (l *bpfClient) AttacheBPFProbes(pod types.NamespacedName, podIdentifier str return err } l.logger.Info("Successfully attached Ingress TC probe for", "pod: ", pod.Name, " in namespace", pod.Namespace) - l.IngressProgPodMap.Store(utils.GetPodNamespacedName(pod.Name, pod.Namespace), progFD) + podNamespacedName := utils.GetPodNamespacedName(pod.Name, pod.Namespace) + l.IngressPodToProgMap.Store(podNamespacedName, progFD) + currentPodSet, _ := l.IngressProgToPodsMap.LoadOrStore(progFD, make(map[string]struct{})) + currentPodSet.(map[string]struct{})[podNamespacedName] = struct{}{} } if egress { @@ -441,7 +472,10 @@ func (l *bpfClient) AttacheBPFProbes(pod types.NamespacedName, podIdentifier str return err } l.logger.Info("Successfully attached Egress TC probe for", "pod: ", pod.Name, " in namespace", pod.Namespace) - l.EgressProgPodMap.Store(utils.GetPodNamespacedName(pod.Name, pod.Namespace), progFD) + podNamespacedName := utils.GetPodNamespacedName(pod.Name, pod.Namespace) + l.EgressPodToProgMap.Store(podNamespacedName, progFD) + currentPodSet, _ := l.EgressProgToPodsMap.LoadOrStore(progFD, make(map[string]struct{})) + currentPodSet.(map[string]struct{})[podNamespacedName] = struct{}{} } return nil @@ -470,7 +504,7 @@ func (l *bpfClient) DetacheBPFProbes(pod types.NamespacedName, ingress bool, egr l.logger.Info("Error while deleting Ingress BPF Probe for ", "podIdentifier: ", podIdentifier) } } - l.IngressProgPodMap.Delete(utils.GetPodNamespacedName(pod.Name, pod.Namespace)) + l.DeletePodFromIngressProgPodCaches(pod.Name, pod.Namespace) } if egress { @@ -493,7 +527,7 @@ func (l *bpfClient) DetacheBPFProbes(pod types.NamespacedName, ingress bool, egr } l.policyEndpointeBPFContext.Delete(podIdentifier) } - l.EgressProgPodMap.Delete(utils.GetPodNamespacedName(pod.Name, pod.Namespace)) + l.DeletePodFromEgressProgPodCaches(pod.Name, pod.Namespace) } return nil } @@ -701,11 +735,11 @@ func (l *bpfClient) UpdateEbpfMaps(podIdentifier string, ingressFirewallRules [] func (l *bpfClient) IsEBPFProbeAttached(podName string, podNamespace string) (bool, bool) { ingress, egress := false, false - if _, ok := l.IngressProgPodMap.Load(utils.GetPodNamespacedName(podName, podNamespace)); ok { + if _, ok := l.IngressPodToProgMap.Load(utils.GetPodNamespacedName(podName, podNamespace)); ok { l.logger.Info("Pod already has Ingress Probe attached - ", "Name: ", podName, "Namespace: ", podNamespace) ingress = true } - if _, ok := l.EgressProgPodMap.Load(utils.GetPodNamespacedName(podName, podNamespace)); ok { + if _, ok := l.EgressPodToProgMap.Load(utils.GetPodNamespacedName(podName, podNamespace)); ok { l.logger.Info("Pod already has Egress Probe attached - ", "Name: ", podName, "Namespace: ", podNamespace) egress = true } @@ -955,3 +989,31 @@ func (l *bpfClient) addCatchAllL4Entry(firewallRule *EbpfFirewallRules) { } firewallRule.L4Info = append(firewallRule.L4Info, catchAllL4Entry) } + +func (l *bpfClient) DeletePodFromIngressProgPodCaches(podName string, podNamespace string) { + podNamespacedName := utils.GetPodNamespacedName(podName, podNamespace) + if progFD, ok := l.IngressPodToProgMap.Load(podNamespacedName); ok { + l.IngressPodToProgMap.Delete(podNamespacedName) + if currentSet, ok := l.IngressProgToPodsMap.Load(progFD); ok { + set := currentSet.(map[string]struct{}) + delete(set, podNamespacedName) + if len(set) == 0 { + l.IngressProgToPodsMap.Delete(progFD) + } + } + } +} + +func (l *bpfClient) DeletePodFromEgressProgPodCaches(podName string, podNamespace string) { + podNamespacedName := utils.GetPodNamespacedName(podName, podNamespace) + if progFD, ok := l.EgressPodToProgMap.Load(podNamespacedName); ok { + l.EgressPodToProgMap.Delete(podNamespacedName) + if currentSet, ok := l.EgressProgToPodsMap.Load(progFD); ok { + set := currentSet.(map[string]struct{}) + delete(set, podNamespacedName) + if len(set) == 0 { + l.EgressProgToPodsMap.Delete(progFD) + } + } + } +} diff --git a/pkg/ebpf/bpf_client_test.go b/pkg/ebpf/bpf_client_test.go index a27b0f7..5d0953b 100644 --- a/pkg/ebpf/bpf_client_test.go +++ b/pkg/ebpf/bpf_client_test.go @@ -152,21 +152,21 @@ func TestBpfClient_IsEBPFProbeAttached(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { testBpfClient := &bpfClient{ - nodeIP: "10.1.1.1", - logger: logr.New(&log.NullLogSink{}), - enableIPv6: false, - hostMask: "/32", - IngressProgPodMap: new(sync.Map), - EgressProgPodMap: new(sync.Map), + nodeIP: "10.1.1.1", + logger: logr.New(&log.NullLogSink{}), + enableIPv6: false, + hostMask: "/32", + IngressPodToProgMap: new(sync.Map), + EgressPodToProgMap: new(sync.Map), } if tt.ingressAttached { podIdentifier := utils.GetPodNamespacedName(tt.podName, tt.podNamespace) - testBpfClient.IngressProgPodMap.Store(podIdentifier, ingressProgFD) + testBpfClient.IngressPodToProgMap.Store(podIdentifier, ingressProgFD) } if tt.egressAttached { podIdentifier := utils.GetPodNamespacedName(tt.podName, tt.podNamespace) - testBpfClient.EgressProgPodMap.Store(podIdentifier, egressProgFD) + testBpfClient.EgressPodToProgMap.Store(podIdentifier, egressProgFD) } gotIngress, gotEgress := testBpfClient.IsEBPFProbeAttached(tt.podName, tt.podNamespace) assert.Equal(t, tt.want.ingress, gotIngress) @@ -257,12 +257,12 @@ func TestBpfClient_CheckAndDeriveCatchAllIPPorts(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { testBpfClient := &bpfClient{ - nodeIP: "10.1.1.1", - logger: logr.New(&log.NullLogSink{}), - enableIPv6: false, - hostMask: "/32", - IngressProgPodMap: new(sync.Map), - EgressProgPodMap: new(sync.Map), + nodeIP: "10.1.1.1", + logger: logr.New(&log.NullLogSink{}), + enableIPv6: false, + hostMask: "/32", + IngressPodToProgMap: new(sync.Map), + EgressPodToProgMap: new(sync.Map), } gotCatchAllL4Info, gotIsCatchAllIPEntryPresent, gotAllowAllPortAndProtocols := testBpfClient.checkAndDeriveCatchAllIPPorts(tt.firewallRules) assert.Equal(t, tt.want.catchAllL4Info, gotCatchAllL4Info) @@ -320,12 +320,12 @@ func TestBpfClient_CheckAndDeriveL4InfoFromAnyMatchingCIDRs(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { testBpfClient := &bpfClient{ - nodeIP: "10.1.1.1", - logger: logr.New(&log.NullLogSink{}), - enableIPv6: false, - hostMask: "/32", - IngressProgPodMap: new(sync.Map), - EgressProgPodMap: new(sync.Map), + nodeIP: "10.1.1.1", + logger: logr.New(&log.NullLogSink{}), + enableIPv6: false, + hostMask: "/32", + IngressPodToProgMap: new(sync.Map), + EgressPodToProgMap: new(sync.Map), } gotMatchingCIDRL4Info := testBpfClient.checkAndDeriveL4InfoFromAnyMatchingCIDRs(tt.firewallRule, tt.nonHostCIDRs) assert.Equal(t, tt.want.matchingCIDRL4Info, gotMatchingCIDRL4Info) @@ -373,12 +373,12 @@ func TestBpfClient_AddCatchAllL4Entry(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { testBpfClient := &bpfClient{ - nodeIP: "10.1.1.1", - logger: logr.New(&log.NullLogSink{}), - enableIPv6: false, - hostMask: "/32", - IngressProgPodMap: new(sync.Map), - EgressProgPodMap: new(sync.Map), + nodeIP: "10.1.1.1", + logger: logr.New(&log.NullLogSink{}), + enableIPv6: false, + hostMask: "/32", + IngressPodToProgMap: new(sync.Map), + EgressPodToProgMap: new(sync.Map), } testBpfClient.addCatchAllL4Entry(&tt.firewallRules) assert.Equal(t, tt.firewallRules, l4InfoWithCatchAllL4Info) @@ -607,8 +607,10 @@ func TestBpfClient_AttacheBPFProbes(t *testing.T) { policyEndpointeBPFContext: new(sync.Map), bpfSDKClient: mockBpfClient, bpfTCClient: mockTCClient, - IngressProgPodMap: new(sync.Map), - EgressProgPodMap: new(sync.Map), + IngressPodToProgMap: new(sync.Map), + EgressPodToProgMap: new(sync.Map), + IngressProgToPodsMap: new(sync.Map), + EgressProgToPodsMap: new(sync.Map), } sampleBPFContext := BPFContext{ @@ -661,8 +663,8 @@ func TestBpfClient_DetacheBPFProbes(t *testing.T) { hostMask: "/32", policyEndpointeBPFContext: new(sync.Map), bpfTCClient: mockTCClient, - IngressProgPodMap: new(sync.Map), - EgressProgPodMap: new(sync.Map), + IngressPodToProgMap: new(sync.Map), + EgressPodToProgMap: new(sync.Map), } t.Run(tt.name, func(t *testing.T) {