Skip to content

Commit

Permalink
Merge pull request #306 from Pavani-Panakanti/strict_mode_issue
Browse files Browse the repository at this point in the history
Fix race condition in strict mode
  • Loading branch information
Pavani-Panakanti authored Sep 25, 2024
2 parents 14858d4 + 410aa1e commit fb4dd87
Show file tree
Hide file tree
Showing 3 changed files with 143 additions and 46 deletions.
45 changes: 39 additions & 6 deletions controllers/policyendpoints_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,33 @@ func (r *PolicyEndpointsReconciler) cleanUpPolicyEndpoint(ctx context.Context, r
return nil
}

func (r *PolicyEndpointsReconciler) isProgFdShared(ctx context.Context, targetPodName string,
targetPodNamespace string) bool {
targetpodNamespacedName := utils.GetPodNamespacedName(targetPodName, targetPodNamespace)
var foundSharedIngress bool
var foundSharedEgress bool
if targetProgFD, ok := r.ebpfClient.GetIngressPodToProgMap().Load(targetpodNamespacedName); ok {
if currentList, ok := r.ebpfClient.GetIngressProgToPodsMap().Load(targetProgFD); ok {
podsList, ok := currentList.(map[string]struct{})
if ok && len(podsList) > 1 {
foundSharedIngress = true
r.log.Info("isProgFdShared", "Found shared ingress progFD for target: ", targetPodName, "progFD: ", targetProgFD)
}
}
}

if targetProgFD, ok := r.ebpfClient.GetEgressPodToProgMap().Load(targetpodNamespacedName); ok {
if currentList, ok := r.ebpfClient.GetEgressProgToPodsMap().Load(targetProgFD); ok {
podsList, ok := currentList.(map[string]struct{})
if ok && len(podsList) > 1 {
foundSharedEgress = true
r.log.Info("isProgFdShared", "Found shared egress progFD for target: ", targetPodName, "progFD: ", targetProgFD)
}
}
}
return foundSharedIngress || foundSharedEgress
}

func (r *PolicyEndpointsReconciler) updatePolicyEnforcementStatusForPods(ctx context.Context, policyEndpointName string,
targetPods []types.NamespacedName, podIdentifiers map[string]bool, isDeleteFlow bool) error {
var err error
Expand All @@ -211,11 +238,10 @@ func (r *PolicyEndpointsReconciler) updatePolicyEnforcementStatusForPods(ctx con
deletePinPath := true
podIdentifier := utils.GetPodIdentifier(targetPod.Name, targetPod.Namespace, r.log)
r.log.Info("Derived ", "Pod identifier to check if update is needed : ", podIdentifier)
//Derive the podIdentifier and check if there is another pod in the same replicaset using the pinpath
if found, ok := podIdentifiers[podIdentifier]; ok {
//podIdentifiers will always have true in the value if found..
r.log.Info("PodIdentifier pinpath ", "shared: ", found)
deletePinPath = !found
// check if ebpf progs are being shared by any other pods
if progFdShared := r.isProgFdShared(ctx, targetPod.Name, targetPod.Namespace); progFdShared {
r.log.Info("ProgFD pinpath ", "shared: ", progFdShared)
deletePinPath = !progFdShared
}

cleanupErr := r.cleanupeBPFProbes(ctx, targetPod, policyEndpointName, deletePinPath, isDeleteFlow)
Expand Down Expand Up @@ -584,9 +610,17 @@ func (r *PolicyEndpointsReconciler) getPodListToBeCleanedUp(oldPodSet []types.Na
break
}
}
// We want to clean up the pod and detach ebpf probes in two cases
// 1. When pod is still running but pod is not an active pod against policy endpoint which implies policy endpoint is no longer applied to the podIdentifier
// 2. When pod is deleted and is the last pod in the PodIdentifier
if !activePod && !podIdentifiers[oldPodIdentifier] {
r.log.Info("Pod to cleanup: ", "name: ", oldPod.Name, "namespace: ", oldPod.Namespace)
podsToBeCleanedUp = append(podsToBeCleanedUp, oldPod)
// When pod is deleted and this is not the last pod in podIdentifier, we are just updating the podName and progFD local caches
} else if !activePod {
r.log.Info("Pod not active. Deleting from progPod caches", "podName: ", oldPod.Name, "podNamespace: ", oldPod.Namespace)
r.ebpfClient.DeletePodFromIngressProgPodCaches(oldPod.Name, oldPod.Namespace)
r.ebpfClient.DeletePodFromEgressProgPodCaches(oldPod.Name, oldPod.Namespace)
}
}

Expand Down Expand Up @@ -651,7 +685,6 @@ func (r *PolicyEndpointsReconciler) deletePolicyEndpointFromPodIdentifierMap(ctx
policyEndpoint string) {
r.podIdentifierToPolicyEndpointMapMutex.Lock()
defer r.podIdentifierToPolicyEndpointMapMutex.Unlock()

var currentPEList []string
if policyEndpointList, ok := r.podIdentifierToPolicyEndpointMap.Load(podIdentifier); ok {
for _, policyEndpointName := range policyEndpointList.([]string) {
Expand Down
82 changes: 72 additions & 10 deletions pkg/ebpf/bpf_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,12 @@ type BpfClient interface {
UpdateEbpfMaps(podIdentifier string, ingressFirewallRules []EbpfFirewallRules, egressFirewallRules []EbpfFirewallRules) error
IsEBPFProbeAttached(podName string, podNamespace string) (bool, bool)
IsMapUpdateRequired(podIdentifier string) bool
GetIngressPodToProgMap() *sync.Map
GetEgressPodToProgMap() *sync.Map
GetIngressProgToPodsMap() *sync.Map
GetEgressProgToPodsMap() *sync.Map
DeletePodFromIngressProgPodCaches(podName string, podNamespace string)
DeletePodFromEgressProgPodCaches(podName string, podNamespace string)
}

type EvProgram struct {
Expand All @@ -113,11 +119,13 @@ func NewBpfClient(policyEndpointeBPFContext *sync.Map, nodeIP string, enablePoli

ebpfClient := &bpfClient{
policyEndpointeBPFContext: policyEndpointeBPFContext,
IngressProgPodMap: new(sync.Map),
EgressProgPodMap: new(sync.Map),
IngressPodToProgMap: new(sync.Map),
EgressPodToProgMap: new(sync.Map),
nodeIP: nodeIP,
enableIPv6: enableIPv6,
GlobalMaps: new(sync.Map),
IngressProgToPodsMap: new(sync.Map),
EgressProgToPodsMap: new(sync.Map),
}
ebpfClient.logger = ctrl.Log.WithName("ebpf-client")
ingressBinary, egressBinary, eventsBinary,
Expand Down Expand Up @@ -256,9 +264,9 @@ type bpfClient struct {
// Stores eBPF Ingress and Egress context per policyEndpoint resource
policyEndpointeBPFContext *sync.Map
// Stores the Ingress eBPF Prog FD per pod
IngressProgPodMap *sync.Map
IngressPodToProgMap *sync.Map
// Stores the Egress eBPF Prog FD per pod
EgressProgPodMap *sync.Map
EgressPodToProgMap *sync.Map
// Stores info on the global maps the agent creates
GlobalMaps *sync.Map
// Primary IP of the node
Expand All @@ -279,6 +287,10 @@ type bpfClient struct {
bpfTCClient tc.BpfTc
// Logger instance
logger logr.Logger
// Stores the Ingress eBPF Prog FD to pods mapping
IngressProgToPodsMap *sync.Map
// Stores the Egress eBPF Prog FD to pods mapping
EgressProgToPodsMap *sync.Map
}

type Event_t struct {
Expand Down Expand Up @@ -410,6 +422,22 @@ func recoverBPFState(eBPFSDKClient goelf.BpfSDKClient, policyEndpointeBPFContext
return isConntrackMapPresent, isPolicyEventsMapPresent, eventsMapFD, nil
}

func (l *bpfClient) GetIngressPodToProgMap() *sync.Map {
return l.IngressPodToProgMap
}

func (l *bpfClient) GetEgressPodToProgMap() *sync.Map {
return l.EgressPodToProgMap
}

func (l *bpfClient) GetIngressProgToPodsMap() *sync.Map {
return l.IngressProgToPodsMap
}

func (l *bpfClient) GetEgressProgToPodsMap() *sync.Map {
return l.EgressProgToPodsMap
}

func (l *bpfClient) AttacheBPFProbes(pod types.NamespacedName, podIdentifier string, ingress bool, egress bool) error {
start := time.Now()
// We attach the TC probes to the hostVeth interface of the pod. Derive the hostVeth
Expand All @@ -428,7 +456,10 @@ func (l *bpfClient) AttacheBPFProbes(pod types.NamespacedName, podIdentifier str
return err
}
l.logger.Info("Successfully attached Ingress TC probe for", "pod: ", pod.Name, " in namespace", pod.Namespace)
l.IngressProgPodMap.Store(utils.GetPodNamespacedName(pod.Name, pod.Namespace), progFD)
podNamespacedName := utils.GetPodNamespacedName(pod.Name, pod.Namespace)
l.IngressPodToProgMap.Store(podNamespacedName, progFD)
currentPodSet, _ := l.IngressProgToPodsMap.LoadOrStore(progFD, make(map[string]struct{}))
currentPodSet.(map[string]struct{})[podNamespacedName] = struct{}{}
}

if egress {
Expand All @@ -441,7 +472,10 @@ func (l *bpfClient) AttacheBPFProbes(pod types.NamespacedName, podIdentifier str
return err
}
l.logger.Info("Successfully attached Egress TC probe for", "pod: ", pod.Name, " in namespace", pod.Namespace)
l.EgressProgPodMap.Store(utils.GetPodNamespacedName(pod.Name, pod.Namespace), progFD)
podNamespacedName := utils.GetPodNamespacedName(pod.Name, pod.Namespace)
l.EgressPodToProgMap.Store(podNamespacedName, progFD)
currentPodSet, _ := l.EgressProgToPodsMap.LoadOrStore(progFD, make(map[string]struct{}))
currentPodSet.(map[string]struct{})[podNamespacedName] = struct{}{}
}

return nil
Expand Down Expand Up @@ -470,7 +504,7 @@ func (l *bpfClient) DetacheBPFProbes(pod types.NamespacedName, ingress bool, egr
l.logger.Info("Error while deleting Ingress BPF Probe for ", "podIdentifier: ", podIdentifier)
}
}
l.IngressProgPodMap.Delete(utils.GetPodNamespacedName(pod.Name, pod.Namespace))
l.DeletePodFromIngressProgPodCaches(pod.Name, pod.Namespace)
}

if egress {
Expand All @@ -493,7 +527,7 @@ func (l *bpfClient) DetacheBPFProbes(pod types.NamespacedName, ingress bool, egr
}
l.policyEndpointeBPFContext.Delete(podIdentifier)
}
l.EgressProgPodMap.Delete(utils.GetPodNamespacedName(pod.Name, pod.Namespace))
l.DeletePodFromEgressProgPodCaches(pod.Name, pod.Namespace)
}
return nil
}
Expand Down Expand Up @@ -701,11 +735,11 @@ func (l *bpfClient) UpdateEbpfMaps(podIdentifier string, ingressFirewallRules []

func (l *bpfClient) IsEBPFProbeAttached(podName string, podNamespace string) (bool, bool) {
ingress, egress := false, false
if _, ok := l.IngressProgPodMap.Load(utils.GetPodNamespacedName(podName, podNamespace)); ok {
if _, ok := l.IngressPodToProgMap.Load(utils.GetPodNamespacedName(podName, podNamespace)); ok {
l.logger.Info("Pod already has Ingress Probe attached - ", "Name: ", podName, "Namespace: ", podNamespace)
ingress = true
}
if _, ok := l.EgressProgPodMap.Load(utils.GetPodNamespacedName(podName, podNamespace)); ok {
if _, ok := l.EgressPodToProgMap.Load(utils.GetPodNamespacedName(podName, podNamespace)); ok {
l.logger.Info("Pod already has Egress Probe attached - ", "Name: ", podName, "Namespace: ", podNamespace)
egress = true
}
Expand Down Expand Up @@ -955,3 +989,31 @@ func (l *bpfClient) addCatchAllL4Entry(firewallRule *EbpfFirewallRules) {
}
firewallRule.L4Info = append(firewallRule.L4Info, catchAllL4Entry)
}

func (l *bpfClient) DeletePodFromIngressProgPodCaches(podName string, podNamespace string) {
podNamespacedName := utils.GetPodNamespacedName(podName, podNamespace)
if progFD, ok := l.IngressPodToProgMap.Load(podNamespacedName); ok {
l.IngressPodToProgMap.Delete(podNamespacedName)
if currentSet, ok := l.IngressProgToPodsMap.Load(progFD); ok {
set := currentSet.(map[string]struct{})
delete(set, podNamespacedName)
if len(set) == 0 {
l.IngressProgToPodsMap.Delete(progFD)
}
}
}
}

func (l *bpfClient) DeletePodFromEgressProgPodCaches(podName string, podNamespace string) {
podNamespacedName := utils.GetPodNamespacedName(podName, podNamespace)
if progFD, ok := l.EgressPodToProgMap.Load(podNamespacedName); ok {
l.EgressPodToProgMap.Delete(podNamespacedName)
if currentSet, ok := l.EgressProgToPodsMap.Load(progFD); ok {
set := currentSet.(map[string]struct{})
delete(set, podNamespacedName)
if len(set) == 0 {
l.EgressProgToPodsMap.Delete(progFD)
}
}
}
}
62 changes: 32 additions & 30 deletions pkg/ebpf/bpf_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,21 +152,21 @@ func TestBpfClient_IsEBPFProbeAttached(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
testBpfClient := &bpfClient{
nodeIP: "10.1.1.1",
logger: logr.New(&log.NullLogSink{}),
enableIPv6: false,
hostMask: "/32",
IngressProgPodMap: new(sync.Map),
EgressProgPodMap: new(sync.Map),
nodeIP: "10.1.1.1",
logger: logr.New(&log.NullLogSink{}),
enableIPv6: false,
hostMask: "/32",
IngressPodToProgMap: new(sync.Map),
EgressPodToProgMap: new(sync.Map),
}

if tt.ingressAttached {
podIdentifier := utils.GetPodNamespacedName(tt.podName, tt.podNamespace)
testBpfClient.IngressProgPodMap.Store(podIdentifier, ingressProgFD)
testBpfClient.IngressPodToProgMap.Store(podIdentifier, ingressProgFD)
}
if tt.egressAttached {
podIdentifier := utils.GetPodNamespacedName(tt.podName, tt.podNamespace)
testBpfClient.EgressProgPodMap.Store(podIdentifier, egressProgFD)
testBpfClient.EgressPodToProgMap.Store(podIdentifier, egressProgFD)
}
gotIngress, gotEgress := testBpfClient.IsEBPFProbeAttached(tt.podName, tt.podNamespace)
assert.Equal(t, tt.want.ingress, gotIngress)
Expand Down Expand Up @@ -257,12 +257,12 @@ func TestBpfClient_CheckAndDeriveCatchAllIPPorts(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
testBpfClient := &bpfClient{
nodeIP: "10.1.1.1",
logger: logr.New(&log.NullLogSink{}),
enableIPv6: false,
hostMask: "/32",
IngressProgPodMap: new(sync.Map),
EgressProgPodMap: new(sync.Map),
nodeIP: "10.1.1.1",
logger: logr.New(&log.NullLogSink{}),
enableIPv6: false,
hostMask: "/32",
IngressPodToProgMap: new(sync.Map),
EgressPodToProgMap: new(sync.Map),
}
gotCatchAllL4Info, gotIsCatchAllIPEntryPresent, gotAllowAllPortAndProtocols := testBpfClient.checkAndDeriveCatchAllIPPorts(tt.firewallRules)
assert.Equal(t, tt.want.catchAllL4Info, gotCatchAllL4Info)
Expand Down Expand Up @@ -320,12 +320,12 @@ func TestBpfClient_CheckAndDeriveL4InfoFromAnyMatchingCIDRs(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
testBpfClient := &bpfClient{
nodeIP: "10.1.1.1",
logger: logr.New(&log.NullLogSink{}),
enableIPv6: false,
hostMask: "/32",
IngressProgPodMap: new(sync.Map),
EgressProgPodMap: new(sync.Map),
nodeIP: "10.1.1.1",
logger: logr.New(&log.NullLogSink{}),
enableIPv6: false,
hostMask: "/32",
IngressPodToProgMap: new(sync.Map),
EgressPodToProgMap: new(sync.Map),
}
gotMatchingCIDRL4Info := testBpfClient.checkAndDeriveL4InfoFromAnyMatchingCIDRs(tt.firewallRule, tt.nonHostCIDRs)
assert.Equal(t, tt.want.matchingCIDRL4Info, gotMatchingCIDRL4Info)
Expand Down Expand Up @@ -373,12 +373,12 @@ func TestBpfClient_AddCatchAllL4Entry(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
testBpfClient := &bpfClient{
nodeIP: "10.1.1.1",
logger: logr.New(&log.NullLogSink{}),
enableIPv6: false,
hostMask: "/32",
IngressProgPodMap: new(sync.Map),
EgressProgPodMap: new(sync.Map),
nodeIP: "10.1.1.1",
logger: logr.New(&log.NullLogSink{}),
enableIPv6: false,
hostMask: "/32",
IngressPodToProgMap: new(sync.Map),
EgressPodToProgMap: new(sync.Map),
}
testBpfClient.addCatchAllL4Entry(&tt.firewallRules)
assert.Equal(t, tt.firewallRules, l4InfoWithCatchAllL4Info)
Expand Down Expand Up @@ -607,8 +607,10 @@ func TestBpfClient_AttacheBPFProbes(t *testing.T) {
policyEndpointeBPFContext: new(sync.Map),
bpfSDKClient: mockBpfClient,
bpfTCClient: mockTCClient,
IngressProgPodMap: new(sync.Map),
EgressProgPodMap: new(sync.Map),
IngressPodToProgMap: new(sync.Map),
EgressPodToProgMap: new(sync.Map),
IngressProgToPodsMap: new(sync.Map),
EgressProgToPodsMap: new(sync.Map),
}

sampleBPFContext := BPFContext{
Expand Down Expand Up @@ -661,8 +663,8 @@ func TestBpfClient_DetacheBPFProbes(t *testing.T) {
hostMask: "/32",
policyEndpointeBPFContext: new(sync.Map),
bpfTCClient: mockTCClient,
IngressProgPodMap: new(sync.Map),
EgressProgPodMap: new(sync.Map),
IngressPodToProgMap: new(sync.Map),
EgressPodToProgMap: new(sync.Map),
}

t.Run(tt.name, func(t *testing.T) {
Expand Down

0 comments on commit fb4dd87

Please sign in to comment.