Skip to content

Commit

Permalink
ut
Browse files Browse the repository at this point in the history
Signed-off-by: Hang Yan <yhang@vmware.com>
  • Loading branch information
hangyan committed Nov 8, 2024
1 parent 420612b commit cba5530
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 26 deletions.
14 changes: 8 additions & 6 deletions pkg/agent/packetcapture/packetcapture_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ const (
minRetryDelay = 5 * time.Second
maxRetryDelay = 60 * time.Second

defaultWorkers = 2
defaultWorkers = 4

// defines how many capture request we can handle concurrently. waiting captures will be
// marked as Pending until they can be processed.
Expand Down Expand Up @@ -313,7 +313,11 @@ func (c *Controller) syncPacketCapture(pcName string) error {
return state
}()

if updateErr := c.updateStatus(context.Background(), pcName, state); updateErr != nil {
c.mutex.Lock()
newState := new(packetCaptureState)
*newState = *state
c.mutex.Unlock()
if updateErr := c.updateStatus(context.Background(), pcName, newState); updateErr != nil {
return fmt.Errorf("error when patching status: %w", updateErr)
}
return err
Expand Down Expand Up @@ -582,7 +586,6 @@ func (c *Controller) updateStatus(ctx context.Context, name string, state *packe
}
conditions := []crdv1alpha1.PacketCaptureCondition{}
t := metav1.Now()
c.mutex.Lock()
updatedStatus := crdv1alpha1.PacketCaptureStatus{
NumberCaptured: state.capturedPacketsNum,
FilePath: state.filePath,
Expand Down Expand Up @@ -659,7 +662,6 @@ func (c *Controller) updateStatus(ctx context.Context, name string, state *packe
}

}
c.mutex.Unlock()
updatedStatus.Conditions = conditions

if retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
Expand All @@ -676,10 +678,10 @@ func (c *Controller) updateStatus(ctx context.Context, name string, state *packe
}
toUpdate.Status = updatedStatus
klog.V(2).InfoS("Updating PacketCapture", "name", name, "status", toUpdate.Status)
_, updateErr := c.crdClient.CrdV1alpha1().PacketCaptures().UpdateStatus(context.TODO(), toUpdate, metav1.UpdateOptions{})
_, updateErr := c.crdClient.CrdV1alpha1().PacketCaptures().UpdateStatus(ctx, toUpdate, metav1.UpdateOptions{})
if updateErr != nil && apierrors.IsConflict(updateErr) {
var getErr error
if toUpdate, getErr = c.crdClient.CrdV1alpha1().PacketCaptures().Get(context.TODO(), name, metav1.GetOptions{}); getErr != nil {
if toUpdate, getErr = c.crdClient.CrdV1alpha1().PacketCaptures().Get(ctx, name, metav1.GetOptions{}); getErr != nil {
return getErr
}
}
Expand Down
94 changes: 77 additions & 17 deletions pkg/agent/packetcapture/packetcapture_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/util/workqueue"

"antrea.io/antrea/pkg/agent/interfacestore"
"antrea.io/antrea/pkg/agent/util"
Expand All @@ -51,15 +52,17 @@ var (
pod1IPv4 = "192.168.10.10"
pod2IPv4 = "192.168.11.10"

ipv6 = "2001:db8::68"
pod1MAC, _ = net.ParseMAC("aa:bb:cc:dd:ee:0f")
pod2MAC, _ = net.ParseMAC("aa:bb:cc:dd:ee:00")
ofPortPod1 = uint32(1)
ofPortPod2 = uint32(2)
timeout = uint32(1)
ipv6 = "2001:db8::68"
pod1MAC, _ = net.ParseMAC("aa:bb:cc:dd:ee:0f")
pod2MAC, _ = net.ParseMAC("aa:bb:cc:dd:ee:00")
ofPortPod1 = uint32(1)
ofPortPod2 = uint32(2)
testCaptureTimeout = uint32(1)
testCaptureNum int32 = 15

icmpProto = intstr.FromString("ICMP")
invalidProto = intstr.FromString("INVALID")
testFTPUrl = "sftp://127.0.0.1:22/path"

pod1 = v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Expand Down Expand Up @@ -123,8 +126,9 @@ func genTestCR(name string, num int32) *crdv1alpha1.PacketCapture {
Protocol: &icmpProto,
},
FileServer: &crdv1alpha1.PacketCaptureFileServer{
URL: "sftp://127.0.0.1:22/aaa",
URL: testFTPUrl,
},
Timeout: &testCaptureTimeout,
},
}
return result
Expand All @@ -139,9 +143,6 @@ func (uploader *testUploader) Upload(url string, fileName string, config *ssh.Cl
if url != uploader.url {
return fmt.Errorf("expected url: %s for uploader, got: %s", uploader.url, url)
}
if fileName != uploader.fileName {
return fmt.Errorf("expected filename: %s for uploader, got: %s", uploader.fileName, fileName)
}
return nil
}

Expand Down Expand Up @@ -171,7 +172,7 @@ type testCapture struct {
}

func (p *testCapture) Capture(ctx context.Context, device string, srcIP, dstIP net.IP, packet *crdv1alpha1.Packet) (chan gopacket.Packet, error) {
ch := make(chan gopacket.Packet, 15)
ch := make(chan gopacket.Packet, testCaptureNum)
for i := 0; i < 15; i++ {
ch <- craftTestPacket()
}
Expand Down Expand Up @@ -208,7 +209,12 @@ func newFakePacketCaptureController(t *testing.T, runtimeObjects []runtime.Objec
)
pcController.sftpUploader = &testUploader{}
pcController.captureInterface = &testCapture{}
pcController.queue = workqueue.NewTypedRateLimitingQueueWithConfig(
workqueue.NewTypedItemExponentialFailureRateLimiter[string](time.Millisecond*100, time.Millisecond*500),
workqueue.TypedRateLimitingQueueConfig[string]{Name: "packetcapture"},
)
t.Setenv("POD_NAME", "antrea-agent")
t.Setenv("POD_NAMESPACE", "kube-system")
return &fakePacketCaptureController{
Controller: pcController,
kubeClient: kubeClient,
Expand All @@ -235,12 +241,66 @@ func addPodInterface(ifaceStore interfacestore.InterfaceStore, podNamespace, pod
})
}

func TestMultiplePacketCaptures(t *testing.T) {
defaultFS = afero.NewMemMapFs()
packetsDir := "/tmp/antrea/packetcapture/packets"
defaultFS.MkdirAll(packetsDir, 0755)
nameFunc := func(id int) string {
return fmt.Sprintf("pc-%d", id)
}
var objs []runtime.Object
for i := 0; i < 20; i++ {
objs = append(objs, genTestCR(nameFunc(i), int32(testCaptureNum)))
}
pcc := newFakePacketCaptureController(t, nil, objs)
pcc.sftpUploader = &testUploader{url: testFTPUrl}
stopCh := make(chan struct{})
defer close(stopCh)
pcc.crdInformerFactory.Start(stopCh)
pcc.crdInformerFactory.WaitForCacheSync(stopCh)
pcc.informerFactory.Start(stopCh)
pcc.informerFactory.WaitForCacheSync(stopCh)
go pcc.Run(stopCh)
assert.Eventually(t, func() bool {
pcc.mutex.Lock()
if pcc.numRunningCaptures != 0 {
return false
}
pcc.mutex.Unlock()
items, err := pcc.crdClient.CrdV1alpha1().PacketCaptures().List(context.Background(), metav1.ListOptions{})
if err != nil {
return false
}
for _, result := range items.Items {
for _, cond := range result.Status.Conditions {
if cond.Type == crdv1alpha1.PacketCaptureCompleted || cond.Type == crdv1alpha1.PacketCaptureFileUploaded {
if cond.Status == metav1.ConditionFalse {
return false
}
}
}
}
return true
}, 5*time.Second, 20*time.Millisecond)

for i := 0; i < 20; i++ {
err := pcc.crdClient.CrdV1alpha1().PacketCaptures().Delete(context.TODO(), nameFunc(i), metav1.DeleteOptions{})
require.NoError(t, err)
}
assert.Eventually(t, func() bool {
pcc.mutex.Lock()
if len(pcc.captures) != 0 {
return false
}
pcc.mutex.Unlock()
return true
}, 5*time.Second, 20*time.Millisecond)

}

// TestPacketCaptureControllerRun was used to validate the whole run process is working. It doesn't wait for
// the testing pc to finish. on sandbox env, no good solution to open raw socket.
func TestPacketCaptureControllerRun(t *testing.T) {
// create test os
defaultFS = afero.NewMemMapFs()
defaultFS.MkdirAll("/tmp/antrea/packetcapture/packets", 0755)
pcs := []struct {
name string
pc *crdv1alpha1.PacketCapture
Expand Down Expand Up @@ -275,7 +335,7 @@ func TestPacketCaptureControllerRun(t *testing.T) {
FileServer: &crdv1alpha1.PacketCaptureFileServer{
URL: "sftp://127.0.0.1:22/aaa",
},
Timeout: &timeout,
Timeout: &testCaptureTimeout,
},
},
},
Expand Down Expand Up @@ -308,7 +368,7 @@ func TestPacketCaptureControllerRun(t *testing.T) {
FileServer: &crdv1alpha1.PacketCaptureFileServer{
URL: "sftp://127.0.0.1:22/aaa",
},
Timeout: &timeout,
Timeout: &testCaptureTimeout,
},
},
},
Expand Down Expand Up @@ -341,7 +401,7 @@ func TestPacketCaptureControllerRun(t *testing.T) {
FileServer: &crdv1alpha1.PacketCaptureFileServer{
URL: "sftp://127.0.0.1:22/aaa",
},
Timeout: &timeout,
Timeout: &testCaptureTimeout,
},
},
},
Expand Down
6 changes: 3 additions & 3 deletions test/e2e/packetcapture_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ var (
testNonExistPort int32 = 8085

pcTimeoutReason = "PacketCapture timeout"
pcShortTimeout = uint16(5)
pcShortTimeout = uint32(5)
)

type pcTestCase struct {
Expand Down Expand Up @@ -604,13 +604,13 @@ func runPacketCaptureTest(t *testing.T, data *TestData, tc pcTestCase) {

timeout := tc.pc.Spec.Timeout
if timeout == nil {
tv := uint16(15)
tv := uint32(15)
timeout = &tv
}

if strings.Contains(tc.name, "timeout") {
// wait more for status update.
tv := *timeout + uint16(10)
tv := *timeout + uint32(10)
timeout = &tv
}

Expand Down

0 comments on commit cba5530

Please sign in to comment.