Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix race condition in strict mode #306

Merged
merged 3 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 39 additions & 6 deletions controllers/policyendpoints_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,33 @@ func (r *PolicyEndpointsReconciler) cleanUpPolicyEndpoint(ctx context.Context, r
return nil
}

func (r *PolicyEndpointsReconciler) isProgFdShared(ctx context.Context, targetPodName string,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can optimize this as a pod will always have both ingress and egress probes attached (i.e.,) there will never be a scenario where only one of the probes is attached for any particular pod. Probably can collapse the maps in to one unless there are plans to use them for something else.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@achevuru Yeah using one just one map to store reverse mapping from progFds to Pods list makes sense. I will just add the ingress one and we can remove the new egress map

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will update this optimization in a separate PR as this is change is blocking cx

targetPodNamespace string) bool {
targetpodNamespacedName := utils.GetPodNamespacedName(targetPodName, targetPodNamespace)
var foundSharedIngress bool
var foundSharedEgress bool
if targetProgFD, ok := r.ebpfClient.GetIngressPodToProgMap().Load(targetpodNamespacedName); ok {
if currentList, ok := r.ebpfClient.GetIngressProgToPodsMap().Load(targetProgFD); ok {
podsList, ok := currentList.(map[string]struct{})
if ok && len(podsList) > 1 {
foundSharedIngress = true
r.log.Info("isProgFdShared", "Found shared ingress progFD for target: ", targetPodName, "progFD: ", targetProgFD)
}
}
}

if targetProgFD, ok := r.ebpfClient.GetEgressPodToProgMap().Load(targetpodNamespacedName); ok {
if currentList, ok := r.ebpfClient.GetEgressProgToPodsMap().Load(targetProgFD); ok {
podsList, ok := currentList.(map[string]struct{})
if ok && len(podsList) > 1 {
foundSharedEgress = true
r.log.Info("isProgFdShared", "Found shared egress progFD for target: ", targetPodName, "progFD: ", targetProgFD)
}
}
}
return foundSharedIngress || foundSharedEgress
}

func (r *PolicyEndpointsReconciler) updatePolicyEnforcementStatusForPods(ctx context.Context, policyEndpointName string,
targetPods []types.NamespacedName, podIdentifiers map[string]bool, isDeleteFlow bool) error {
var err error
Expand All @@ -211,11 +238,10 @@ func (r *PolicyEndpointsReconciler) updatePolicyEnforcementStatusForPods(ctx con
deletePinPath := true
podIdentifier := utils.GetPodIdentifier(targetPod.Name, targetPod.Namespace, r.log)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need podIdentifier? Seems like only used in the below log?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah we can remove that one. I will add optimization and clean up of this line in next PR

r.log.Info("Derived ", "Pod identifier to check if update is needed : ", podIdentifier)
//Derive the podIdentifier and check if there is another pod in the same replicaset using the pinpath
if found, ok := podIdentifiers[podIdentifier]; ok {
//podIdentifiers will always have true in the value if found..
r.log.Info("PodIdentifier pinpath ", "shared: ", found)
deletePinPath = !found
// check if ebpf progs are being shared by any other pods
if progFdShared := r.isProgFdShared(ctx, targetPod.Name, targetPod.Namespace); progFdShared {
r.log.Info("ProgFD pinpath ", "shared: ", progFdShared)
deletePinPath = !progFdShared
}

cleanupErr := r.cleanupeBPFProbes(ctx, targetPod, policyEndpointName, deletePinPath, isDeleteFlow)
Expand Down Expand Up @@ -584,9 +610,17 @@ func (r *PolicyEndpointsReconciler) getPodListToBeCleanedUp(oldPodSet []types.Na
break
}
}
// We want to clean up the pod and detach ebpf probes in two cases
// 1. When pod is still running but pod is not an active pod against policy endpoint which implies policy endpoint is no longer applied to the podIdentifier
// 2. When pod is deleted and is the last pod in the PodIdentifier
if !activePod && !podIdentifiers[oldPodIdentifier] {
r.log.Info("Pod to cleanup: ", "name: ", oldPod.Name, "namespace: ", oldPod.Namespace)
podsToBeCleanedUp = append(podsToBeCleanedUp, oldPod)
// When pod is deleted and this is not the last pod in podIdentifier, we are just updating the podName and progFD local caches
} else if !activePod {
r.log.Info("Pod not active. Deleting from progPod caches", "podName: ", oldPod.Name, "podNamespace: ", oldPod.Namespace)
r.ebpfClient.DeletePodFromIngressProgPodCaches(oldPod.Name, oldPod.Namespace)
r.ebpfClient.DeletePodFromEgressProgPodCaches(oldPod.Name, oldPod.Namespace)
}
}

Expand Down Expand Up @@ -651,7 +685,6 @@ func (r *PolicyEndpointsReconciler) deletePolicyEndpointFromPodIdentifierMap(ctx
policyEndpoint string) {
r.podIdentifierToPolicyEndpointMapMutex.Lock()
defer r.podIdentifierToPolicyEndpointMapMutex.Unlock()

var currentPEList []string
if policyEndpointList, ok := r.podIdentifierToPolicyEndpointMap.Load(podIdentifier); ok {
for _, policyEndpointName := range policyEndpointList.([]string) {
Expand Down
82 changes: 72 additions & 10 deletions pkg/ebpf/bpf_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,12 @@ type BpfClient interface {
UpdateEbpfMaps(podIdentifier string, ingressFirewallRules []EbpfFirewallRules, egressFirewallRules []EbpfFirewallRules) error
IsEBPFProbeAttached(podName string, podNamespace string) (bool, bool)
IsMapUpdateRequired(podIdentifier string) bool
GetIngressPodToProgMap() *sync.Map
GetEgressPodToProgMap() *sync.Map
GetIngressProgToPodsMap() *sync.Map
GetEgressProgToPodsMap() *sync.Map
DeletePodFromIngressProgPodCaches(podName string, podNamespace string)
DeletePodFromEgressProgPodCaches(podName string, podNamespace string)
}

type EvProgram struct {
Expand All @@ -113,11 +119,13 @@ func NewBpfClient(policyEndpointeBPFContext *sync.Map, nodeIP string, enablePoli

ebpfClient := &bpfClient{
policyEndpointeBPFContext: policyEndpointeBPFContext,
IngressProgPodMap: new(sync.Map),
EgressProgPodMap: new(sync.Map),
IngressPodToProgMap: new(sync.Map),
EgressPodToProgMap: new(sync.Map),
nodeIP: nodeIP,
enableIPv6: enableIPv6,
GlobalMaps: new(sync.Map),
IngressProgToPodsMap: new(sync.Map),
EgressProgToPodsMap: new(sync.Map),
}
ebpfClient.logger = ctrl.Log.WithName("ebpf-client")
ingressBinary, egressBinary, eventsBinary,
Expand Down Expand Up @@ -256,9 +264,9 @@ type bpfClient struct {
// Stores eBPF Ingress and Egress context per policyEndpoint resource
policyEndpointeBPFContext *sync.Map
// Stores the Ingress eBPF Prog FD per pod
IngressProgPodMap *sync.Map
IngressPodToProgMap *sync.Map
// Stores the Egress eBPF Prog FD per pod
EgressProgPodMap *sync.Map
EgressPodToProgMap *sync.Map
// Stores info on the global maps the agent creates
GlobalMaps *sync.Map
// Primary IP of the node
Expand All @@ -279,6 +287,10 @@ type bpfClient struct {
bpfTCClient tc.BpfTc
// Logger instance
logger logr.Logger
// Stores the Ingress eBPF Prog FD to pods mapping
IngressProgToPodsMap *sync.Map
// Stores the Egress eBPF Prog FD to pods mapping
EgressProgToPodsMap *sync.Map
}

type Event_t struct {
Expand Down Expand Up @@ -410,6 +422,22 @@ func recoverBPFState(eBPFSDKClient goelf.BpfSDKClient, policyEndpointeBPFContext
return isConntrackMapPresent, isPolicyEventsMapPresent, eventsMapFD, nil
}

func (l *bpfClient) GetIngressPodToProgMap() *sync.Map {
return l.IngressPodToProgMap
}

func (l *bpfClient) GetEgressPodToProgMap() *sync.Map {
return l.EgressPodToProgMap
}

func (l *bpfClient) GetIngressProgToPodsMap() *sync.Map {
return l.IngressProgToPodsMap
}

func (l *bpfClient) GetEgressProgToPodsMap() *sync.Map {
return l.EgressProgToPodsMap
}

func (l *bpfClient) AttacheBPFProbes(pod types.NamespacedName, podIdentifier string, ingress bool, egress bool) error {
start := time.Now()
// We attach the TC probes to the hostVeth interface of the pod. Derive the hostVeth
Expand All @@ -428,7 +456,10 @@ func (l *bpfClient) AttacheBPFProbes(pod types.NamespacedName, podIdentifier str
return err
}
l.logger.Info("Successfully attached Ingress TC probe for", "pod: ", pod.Name, " in namespace", pod.Namespace)
l.IngressProgPodMap.Store(utils.GetPodNamespacedName(pod.Name, pod.Namespace), progFD)
podNamespacedName := utils.GetPodNamespacedName(pod.Name, pod.Namespace)
l.IngressPodToProgMap.Store(podNamespacedName, progFD)
currentPodSet, _ := l.IngressProgToPodsMap.LoadOrStore(progFD, make(map[string]struct{}))
currentPodSet.(map[string]struct{})[podNamespacedName] = struct{}{}
}

if egress {
Expand All @@ -441,7 +472,10 @@ func (l *bpfClient) AttacheBPFProbes(pod types.NamespacedName, podIdentifier str
return err
}
l.logger.Info("Successfully attached Egress TC probe for", "pod: ", pod.Name, " in namespace", pod.Namespace)
l.EgressProgPodMap.Store(utils.GetPodNamespacedName(pod.Name, pod.Namespace), progFD)
podNamespacedName := utils.GetPodNamespacedName(pod.Name, pod.Namespace)
l.EgressPodToProgMap.Store(podNamespacedName, progFD)
currentPodSet, _ := l.EgressProgToPodsMap.LoadOrStore(progFD, make(map[string]struct{}))
currentPodSet.(map[string]struct{})[podNamespacedName] = struct{}{}
}

return nil
Expand Down Expand Up @@ -470,7 +504,7 @@ func (l *bpfClient) DetacheBPFProbes(pod types.NamespacedName, ingress bool, egr
l.logger.Info("Error while deleting Ingress BPF Probe for ", "podIdentifier: ", podIdentifier)
}
}
l.IngressProgPodMap.Delete(utils.GetPodNamespacedName(pod.Name, pod.Namespace))
l.DeletePodFromIngressProgPodCaches(pod.Name, pod.Namespace)
}

if egress {
Expand All @@ -493,7 +527,7 @@ func (l *bpfClient) DetacheBPFProbes(pod types.NamespacedName, ingress bool, egr
}
l.policyEndpointeBPFContext.Delete(podIdentifier)
}
l.EgressProgPodMap.Delete(utils.GetPodNamespacedName(pod.Name, pod.Namespace))
l.DeletePodFromEgressProgPodCaches(pod.Name, pod.Namespace)
}
return nil
}
Expand Down Expand Up @@ -701,11 +735,11 @@ func (l *bpfClient) UpdateEbpfMaps(podIdentifier string, ingressFirewallRules []

func (l *bpfClient) IsEBPFProbeAttached(podName string, podNamespace string) (bool, bool) {
ingress, egress := false, false
if _, ok := l.IngressProgPodMap.Load(utils.GetPodNamespacedName(podName, podNamespace)); ok {
if _, ok := l.IngressPodToProgMap.Load(utils.GetPodNamespacedName(podName, podNamespace)); ok {
l.logger.Info("Pod already has Ingress Probe attached - ", "Name: ", podName, "Namespace: ", podNamespace)
ingress = true
}
if _, ok := l.EgressProgPodMap.Load(utils.GetPodNamespacedName(podName, podNamespace)); ok {
if _, ok := l.EgressPodToProgMap.Load(utils.GetPodNamespacedName(podName, podNamespace)); ok {
l.logger.Info("Pod already has Egress Probe attached - ", "Name: ", podName, "Namespace: ", podNamespace)
egress = true
}
Expand Down Expand Up @@ -955,3 +989,31 @@ func (l *bpfClient) addCatchAllL4Entry(firewallRule *EbpfFirewallRules) {
}
firewallRule.L4Info = append(firewallRule.L4Info, catchAllL4Entry)
}

func (l *bpfClient) DeletePodFromIngressProgPodCaches(podName string, podNamespace string) {
podNamespacedName := utils.GetPodNamespacedName(podName, podNamespace)
if progFD, ok := l.IngressPodToProgMap.Load(podNamespacedName); ok {
l.IngressPodToProgMap.Delete(podNamespacedName)
if currentSet, ok := l.IngressProgToPodsMap.Load(progFD); ok {
set := currentSet.(map[string]struct{})
delete(set, podNamespacedName)
if len(set) == 0 {
l.IngressProgToPodsMap.Delete(progFD)
}
}
}
}

func (l *bpfClient) DeletePodFromEgressProgPodCaches(podName string, podNamespace string) {
podNamespacedName := utils.GetPodNamespacedName(podName, podNamespace)
if progFD, ok := l.EgressPodToProgMap.Load(podNamespacedName); ok {
l.EgressPodToProgMap.Delete(podNamespacedName)
if currentSet, ok := l.EgressProgToPodsMap.Load(progFD); ok {
set := currentSet.(map[string]struct{})
delete(set, podNamespacedName)
if len(set) == 0 {
l.EgressProgToPodsMap.Delete(progFD)
}
}
}
}
62 changes: 32 additions & 30 deletions pkg/ebpf/bpf_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,21 +152,21 @@ func TestBpfClient_IsEBPFProbeAttached(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
testBpfClient := &bpfClient{
nodeIP: "10.1.1.1",
logger: logr.New(&log.NullLogSink{}),
enableIPv6: false,
hostMask: "/32",
IngressProgPodMap: new(sync.Map),
EgressProgPodMap: new(sync.Map),
nodeIP: "10.1.1.1",
logger: logr.New(&log.NullLogSink{}),
enableIPv6: false,
hostMask: "/32",
IngressPodToProgMap: new(sync.Map),
EgressPodToProgMap: new(sync.Map),
}

if tt.ingressAttached {
podIdentifier := utils.GetPodNamespacedName(tt.podName, tt.podNamespace)
testBpfClient.IngressProgPodMap.Store(podIdentifier, ingressProgFD)
testBpfClient.IngressPodToProgMap.Store(podIdentifier, ingressProgFD)
}
if tt.egressAttached {
podIdentifier := utils.GetPodNamespacedName(tt.podName, tt.podNamespace)
testBpfClient.EgressProgPodMap.Store(podIdentifier, egressProgFD)
testBpfClient.EgressPodToProgMap.Store(podIdentifier, egressProgFD)
}
gotIngress, gotEgress := testBpfClient.IsEBPFProbeAttached(tt.podName, tt.podNamespace)
assert.Equal(t, tt.want.ingress, gotIngress)
Expand Down Expand Up @@ -257,12 +257,12 @@ func TestBpfClient_CheckAndDeriveCatchAllIPPorts(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
testBpfClient := &bpfClient{
nodeIP: "10.1.1.1",
logger: logr.New(&log.NullLogSink{}),
enableIPv6: false,
hostMask: "/32",
IngressProgPodMap: new(sync.Map),
EgressProgPodMap: new(sync.Map),
nodeIP: "10.1.1.1",
logger: logr.New(&log.NullLogSink{}),
enableIPv6: false,
hostMask: "/32",
IngressPodToProgMap: new(sync.Map),
EgressPodToProgMap: new(sync.Map),
}
gotCatchAllL4Info, gotIsCatchAllIPEntryPresent, gotAllowAllPortAndProtocols := testBpfClient.checkAndDeriveCatchAllIPPorts(tt.firewallRules)
assert.Equal(t, tt.want.catchAllL4Info, gotCatchAllL4Info)
Expand Down Expand Up @@ -320,12 +320,12 @@ func TestBpfClient_CheckAndDeriveL4InfoFromAnyMatchingCIDRs(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
testBpfClient := &bpfClient{
nodeIP: "10.1.1.1",
logger: logr.New(&log.NullLogSink{}),
enableIPv6: false,
hostMask: "/32",
IngressProgPodMap: new(sync.Map),
EgressProgPodMap: new(sync.Map),
nodeIP: "10.1.1.1",
logger: logr.New(&log.NullLogSink{}),
enableIPv6: false,
hostMask: "/32",
IngressPodToProgMap: new(sync.Map),
EgressPodToProgMap: new(sync.Map),
}
gotMatchingCIDRL4Info := testBpfClient.checkAndDeriveL4InfoFromAnyMatchingCIDRs(tt.firewallRule, tt.nonHostCIDRs)
assert.Equal(t, tt.want.matchingCIDRL4Info, gotMatchingCIDRL4Info)
Expand Down Expand Up @@ -373,12 +373,12 @@ func TestBpfClient_AddCatchAllL4Entry(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
testBpfClient := &bpfClient{
nodeIP: "10.1.1.1",
logger: logr.New(&log.NullLogSink{}),
enableIPv6: false,
hostMask: "/32",
IngressProgPodMap: new(sync.Map),
EgressProgPodMap: new(sync.Map),
nodeIP: "10.1.1.1",
logger: logr.New(&log.NullLogSink{}),
enableIPv6: false,
hostMask: "/32",
IngressPodToProgMap: new(sync.Map),
EgressPodToProgMap: new(sync.Map),
}
testBpfClient.addCatchAllL4Entry(&tt.firewallRules)
assert.Equal(t, tt.firewallRules, l4InfoWithCatchAllL4Info)
Expand Down Expand Up @@ -607,8 +607,10 @@ func TestBpfClient_AttacheBPFProbes(t *testing.T) {
policyEndpointeBPFContext: new(sync.Map),
bpfSDKClient: mockBpfClient,
bpfTCClient: mockTCClient,
IngressProgPodMap: new(sync.Map),
EgressProgPodMap: new(sync.Map),
IngressPodToProgMap: new(sync.Map),
EgressPodToProgMap: new(sync.Map),
IngressProgToPodsMap: new(sync.Map),
EgressProgToPodsMap: new(sync.Map),
}

sampleBPFContext := BPFContext{
Expand Down Expand Up @@ -661,8 +663,8 @@ func TestBpfClient_DetacheBPFProbes(t *testing.T) {
hostMask: "/32",
policyEndpointeBPFContext: new(sync.Map),
bpfTCClient: mockTCClient,
IngressProgPodMap: new(sync.Map),
EgressProgPodMap: new(sync.Map),
IngressPodToProgMap: new(sync.Map),
EgressPodToProgMap: new(sync.Map),
}

t.Run(tt.name, func(t *testing.T) {
Expand Down
Loading