Skip to content

Commit

Permalink
ftp,ut
Browse files Browse the repository at this point in the history
Signed-off-by: Hang Yan <yhang@vmware.com>
  • Loading branch information
hangyan committed Nov 7, 2024
1 parent 3942406 commit bb6df8d
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 37 deletions.
59 changes: 41 additions & 18 deletions pkg/agent/packetcapture/packetcapture_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package packetcapture

import (
"context"
"errors"
"fmt"
"maps"
"net"
Expand All @@ -29,6 +30,7 @@ import (
"github.com/gopacket/gopacket/layers"
"github.com/gopacket/gopacket/pcapgo"
"github.com/spf13/afero"
"golang.org/x/crypto/ssh"
"golang.org/x/time/rate"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -50,8 +52,9 @@ import (
clientsetversioned "antrea.io/antrea/pkg/client/clientset/versioned"
crdinformers "antrea.io/antrea/pkg/client/informers/externalversions/crd/v1alpha1"
crdlisters "antrea.io/antrea/pkg/client/listers/crd/v1alpha1"
"antrea.io/antrea/pkg/util/auth"
"antrea.io/antrea/pkg/util/env"
"antrea.io/antrea/pkg/util/ftp"
"antrea.io/antrea/pkg/util/sftp"
)

type storageProtocolType string
Expand Down Expand Up @@ -121,7 +124,7 @@ type packetCaptureState struct {
}

func (pcs *packetCaptureState) isCaptureSucceed() bool {
return pcs.capturedPacketsNum == pcs.targetCapturedPacketsNum
return pcs.capturedPacketsNum == pcs.targetCapturedPacketsNum && pcs.targetCapturedPacketsNum > 0
}

type Controller struct {
Expand All @@ -132,7 +135,7 @@ type Controller struct {
packetCaptureSynced cache.InformerSynced
interfaceStore interfacestore.InterfaceStore
queue workqueue.TypedRateLimitingInterface[string]
sftpUploader ftp.Uploader
sftpUploader sftp.Uploader
captureInterface PacketCapturer
lock sync.Mutex
// A name-phase mapping for all PacketCapture CRs.
Expand All @@ -157,7 +160,7 @@ func NewPacketCaptureController(
workqueue.NewTypedItemExponentialFailureRateLimiter[string](minRetryDelay, maxRetryDelay),
workqueue.TypedRateLimitingQueueConfig[string]{Name: "packetcapture"},
),
sftpUploader: &ftp.SftpUploader{},
sftpUploader: sftp.NewUploader(),
captures: make(map[string]*packetCaptureState),
}

Expand Down Expand Up @@ -253,12 +256,14 @@ func (c *Controller) processPacketCaptureItem() bool {
func (c *Controller) syncPacketCapture(pcName string) error {
cleanupStatus := func() {
c.lock.Lock()
defer c.lock.Unlock()
state := c.captures[pcName]
if state.cancel != nil {
state.cancel()
if state != nil {
if state.cancel != nil {
state.cancel()
}
delete(c.captures, pcName)
}
delete(c.captures, pcName)
c.lock.Unlock()
}

pc, err := c.packetCaptureLister.Get(pcName)
Expand All @@ -276,7 +281,7 @@ func (c *Controller) syncPacketCapture(pcName string) error {

if err := c.validatePacketCapture(&pc.Spec); err != nil {
klog.ErrorS(err, "Invalid PacketCapture", "name", pc.Name)
if err := c.updateStatus(pc.Name, &packetCaptureState{err: err}); err != nil {
if updateErr := c.updateStatus(pc.Name, &packetCaptureState{err: err}); updateErr != nil {
klog.ErrorS(err, "Failed to update PacketCapture status", "name", pc.Name)
}
cleanupStatus()
Expand Down Expand Up @@ -305,11 +310,11 @@ func (c *Controller) syncPacketCapture(pcName string) error {
timeout = time.Duration(*pc.Spec.Timeout) * time.Second
}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
state.cancel = cancel
if err = c.startPacketCapture(ctx, pc, device); err != nil {
phase = packetCapturePhaseCompleted
} else {
phase = packetCapturePhaseRunning
state.cancel = cancel
}
}
state.phase = phase
Expand Down Expand Up @@ -437,10 +442,12 @@ func (c *Controller) performCapture(
for {
select {
case packet := <-packets:
c.lock.Lock()
if captureState.isCaptureSucceed() {
return nil
}
captureState.capturedPacketsNum++
c.lock.Unlock()
ci := gopacket.CaptureInfo{
Timestamp: time.Now(),
CaptureLength: len(packet.Data()),
Expand All @@ -453,14 +460,17 @@ func (c *Controller) performCapture(
klog.V(5).InfoS("Capture packets", "name", captureState.name, "count",
captureState.capturedPacketsNum, "len", ci.Length)

c.lock.Lock()
reachTarget := captureState.isCaptureSucceed()
c.lock.Unlock()
// use rate limiter to reduce the times we need to update status.
if captureState.isCaptureSucceed() || captureState.updateRateLimiter.Allow() {
if reachTarget || captureState.updateRateLimiter.Allow() {
pc, err := c.packetCaptureLister.Get(captureState.name)
if err != nil {
return fmt.Errorf("get PacketCapture failed: %w", err)
}
// if reach the target. flush the file and upload it.
if captureState.isCaptureSucceed() {
if reachTarget {
path := env.GetPodName() + ":" + nameToPath(pc.Name)
statusPath := path
if err = captureState.pcapngWriter.Flush(); err != nil {
Expand Down Expand Up @@ -488,7 +498,7 @@ func (c *Controller) performCapture(
return fmt.Errorf("get PacketCapture failed: %w", err)
}
klog.InfoS("PacketCapture timeout", "name", pc.Name)
return err
return errors.New(captureTimeoutReason)
}
}
}
Expand Down Expand Up @@ -536,7 +546,7 @@ func (c *Controller) parseIPs(pc *crdv1alpha1.PacketCapture) (srcIP, dstIP net.I
return
}

func (c *Controller) getUploaderByProtocol(protocol storageProtocolType) (ftp.Uploader, error) {
func (c *Controller) getUploaderByProtocol(protocol storageProtocolType) (sftp.Uploader, error) {
if protocol == sftpProtocol {
return c.sftpUploader, nil
}
Expand All @@ -553,16 +563,28 @@ func (c *Controller) uploadPackets(pc *crdv1alpha1.PacketCapture, outputFile afe
if err != nil {
return fmt.Errorf("failed to upload packets while getting uploader: %w", err)
}
if _, err := outputFile.Seek(0, 0); err != nil {
return fmt.Errorf("failed to upload to the file server while setting offset: %v", err)
}
authSecret := v1.SecretReference{
Name: fileServerAuthSecretName,
Namespace: env.GetAntreaNamespace(),
}
serverAuth, err := ftp.ParseFileServerAuth(ftp.BasicAuthentication, &authSecret, c.kubeClient)
serverAuth, err := auth.GetAuthConfigurationFromSecret(context.TODO(), auth.BasicAuthenticationType, &authSecret, c.kubeClient)
if err != nil {
klog.ErrorS(err, "Failed to get authentication for the file server", "name", pc.Name, "authSecret", authSecret)
return err
}
cfg := ftp.GenSSHClientConfig(serverAuth.BasicAuthentication.Username, serverAuth.BasicAuthentication.Password)
if serverAuth.BasicAuthentication == nil {
return fmt.Errorf("failed to get basic authentication info for the file server")
}
cfg := &ssh.ClientConfig{
User: serverAuth.BasicAuthentication.Username,
Auth: []ssh.AuthMethod{ssh.Password(serverAuth.BasicAuthentication.Password)},
// #nosec G106: skip host key check here and users can specify their own checks if needed
HostKeyCallback: ssh.InsecureIgnoreHostKey(),

Check failure

Code scanning / CodeQL

Use of insecure HostKeyCallback implementation High

Configuring SSH ClientConfig with insecure HostKeyCallback implementation from
this source
.
Timeout: time.Second,
}
return uploader.Upload(pc.Spec.FileServer.URL, c.generatePacketsPathForServer(pc.Name), cfg, outputFile)
}

Expand All @@ -578,6 +600,7 @@ func (c *Controller) updateStatus(name string, state *packetCaptureState) error
FilePath: state.filePath,
}
t := metav1.Now()
c.lock.Lock()
if state.err != nil {
updatedStatus.FilePath = ""
conditions = append(conditions, crdv1alpha1.PacketCaptureCondition{
Expand Down Expand Up @@ -616,7 +639,6 @@ func (c *Controller) updateStatus(name string, state *packetCaptureState) error
Message: state.err.Error(),
})
}

} else {
if state.isCaptureSucceed() {
conditions = []crdv1alpha1.PacketCaptureCondition{
Expand Down Expand Up @@ -650,6 +672,8 @@ func (c *Controller) updateStatus(name string, state *packetCaptureState) error
}

}
c.lock.Unlock()
updatedStatus.Conditions = conditions

if retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
if toUpdate.Status.FilePath != "" {
Expand Down Expand Up @@ -678,7 +702,6 @@ func (c *Controller) updateStatus(name string, state *packetCaptureState) error
return retryErr
}
klog.V(2).InfoS("Updated PacketCapture", "name", name)

return nil
}

Expand Down
40 changes: 21 additions & 19 deletions pkg/agent/packetcapture/packetcapture_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"errors"
"fmt"
"io"
"net"
"testing"
"time"
Expand Down Expand Up @@ -149,7 +150,7 @@ type testUploader struct {
fileName string
}

func (uploader *testUploader) Upload(url string, fileName string, config *ssh.ClientConfig, outputFile afero.File) error {
func (uploader *testUploader) Upload(url string, fileName string, config *ssh.ClientConfig, outputFile io.Reader) error {
if url != uploader.url {
return fmt.Errorf("expected url: %s for uploader, got: %s", uploader.url, url)
}
Expand Down Expand Up @@ -319,9 +320,10 @@ func TestStartPacketCapture(t *testing.T) {
fileName := item.pc.Name + ".pcapng"
pcc.sftpUploader = &testUploader{fileName: fileName, url: "sftp://127.0.0.1:22/aaa"}
})
fakeDevice := "lo"
pcc.startPacketCapture(context.TODO, item.pc, &fakeDevice)
time.Sleep(300 * time.Millisecond)
// fakeDevice := "lo"
// pcc.startPacketCapture(context.Background(), item.pc, &fakeDevice)
go pcc.Run(stopCh)
time.Sleep(500 * time.Millisecond)
result, nil := pcc.crdClient.CrdV1alpha1().PacketCaptures().Get(context.Background(), item.pc.Name, metav1.GetOptions{})
assert.Nil(t, nil)
for _, cond := range result.Status.Conditions {
Expand Down Expand Up @@ -466,18 +468,17 @@ func TestMergeConditions(t *testing.T) {
func TestUpdatePacketCaptureStatus(t *testing.T) {
tt := []struct {
name string
num int32
captureNum int32
path string
err error
state *packetCaptureState
expectedStatus *crdv1alpha1.PacketCaptureStatus
}{
{
name: "upload-error",
err: errors.New("failed to upload"),
num: 15,
captureNum: 15,
path: "/tmp/a.pcapng",
name: "upload-error",
state: &packetCaptureState{
capturedPacketsNum: 15,
targetCapturedPacketsNum: 15,
filePath: "/tmp/a.pcapng",
err: errors.New("failed to upload"),
},
expectedStatus: &crdv1alpha1.PacketCaptureStatus{
NumberCaptured: 15,
Conditions: []crdv1alpha1.PacketCaptureCondition{
Expand All @@ -496,10 +497,11 @@ func TestUpdatePacketCaptureStatus(t *testing.T) {
},
},
{
name: "running",
err: nil,
num: 15,
captureNum: 1,
name: "running",
state: &packetCaptureState{
capturedPacketsNum: 1,
targetCapturedPacketsNum: 15,
},
expectedStatus: &crdv1alpha1.PacketCaptureStatus{
NumberCaptured: 1,
Conditions: []crdv1alpha1.PacketCaptureCondition{
Expand All @@ -514,7 +516,7 @@ func TestUpdatePacketCaptureStatus(t *testing.T) {

objs := []runtime.Object{}
for _, item := range tt {
objs = append(objs, genTestCR(item.name, item.num))
objs = append(objs, genTestCR(item.name, item.state.targetCapturedPacketsNum))
}

pcc := newFakePacketCaptureController(t, nil, objs)
Expand All @@ -527,7 +529,7 @@ func TestUpdatePacketCaptureStatus(t *testing.T) {

for _, item := range tt {
t.Run(item.name, func(t *testing.T) {
err := pcc.updatePacketCaptureStatus(item.name, item.captureNum, item.path, item.err)
err := pcc.updateStatus(item.name, item.state)
require.NoError(t, err)
result, err := pcc.crdClient.CrdV1alpha1().PacketCaptures().Get(context.TODO(), item.name, metav1.GetOptions{})
require.NoError(t, err)
Expand Down

0 comments on commit bb6df8d

Please sign in to comment.