Skip to content

Commit

Permalink
feat: re-add ability to filter tables from trace data (#1455)
Browse files Browse the repository at this point in the history
Reintroduces #1374 in a non-breaking way. This would be helpful for my
compact blocks test. I don't know why it got reverted

---------

Co-authored-by: Sanaz Taheri <35961250+staheri14@users.noreply.github.com>
  • Loading branch information
cmwaters and staheri14 authored Aug 16, 2024
1 parent c23345b commit b648b80
Showing 1 changed file with 48 additions and 30 deletions.
78 changes: 48 additions & 30 deletions pkg/trace/fileserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"github.com/aws/aws-sdk-go/service/s3"
)

const jsonL = ".jsonl"

func (lt *LocalTracer) getTableHandler() http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
// Parse the request to get the data
Expand Down Expand Up @@ -76,7 +78,7 @@ func pump(table string, br *bufio.Reader) (*io.PipeReader, *multipart.Writer) {
defer w.Close()
defer m.Close()

part, err := m.CreateFormFile("filename", table+".jsonl")
part, err := m.CreateFormFile("filename", table+jsonL)
if err != nil {
return
}
Expand Down Expand Up @@ -133,7 +135,7 @@ func GetTable(serverURL, table, dirPath string) error {
return err
}

outputFile, err := os.Create(path.Join(dirPath, table+".jsonl"))
outputFile, err := os.Create(path.Join(dirPath, table+jsonL))
if err != nil {
return err
}
Expand Down Expand Up @@ -264,7 +266,9 @@ func (lt *LocalTracer) PushAll() error {

// S3Download downloads files that match some prefix from an S3 bucket to a
// local directory dst.
func S3Download(dst, prefix string, cfg S3Config) error {
// fileNames is a list of traced jsonl file names to download. If it is empty, all traces are downloaded.
// fileNames should not have .jsonl suffix.
func S3Download(dst, prefix string, cfg S3Config, fileNames ...string) error {
// Ensure local directory structure exists
err := os.MkdirAll(dst, os.ModePerm)
if err != nil {
Expand Down Expand Up @@ -293,37 +297,51 @@ func S3Download(dst, prefix string, cfg S3Config) error {

err = s3Svc.ListObjectsV2Pages(input, func(page *s3.ListObjectsV2Output, lastPage bool) bool {
for _, content := range page.Contents {
localFilePath := filepath.Join(dst, prefix, strings.TrimPrefix(*content.Key, prefix))
fmt.Printf("Downloading %s to %s\n", *content.Key, localFilePath)

// Create the directories in the path
if err := os.MkdirAll(filepath.Dir(localFilePath), os.ModePerm); err != nil {
return false
}

// Create a file to write the S3 Object contents to.
f, err := os.Create(localFilePath)
if err != nil {
return false
}
key := *content.Key

resp, err := s3Svc.GetObject(&s3.GetObjectInput{
Bucket: aws.String(cfg.BucketName),
Key: aws.String(*content.Key),
})
if err != nil {
f.Close()
continue
// If no fileNames are specified, download all files
if len(fileNames) == 0 {
fileNames = append(fileNames, strings.TrimPrefix(key, prefix))
}
defer resp.Body.Close()

// Copy the contents of the S3 object to the local file
if _, err := io.Copy(f, resp.Body); err != nil {
return false
for _, filename := range fileNames {
// Add .jsonl suffix to the fileNames
fullFilename := filename + jsonL
if strings.HasSuffix(key, fullFilename) {
localFilePath := filepath.Join(dst, prefix, strings.TrimPrefix(key, prefix))
fmt.Printf("Downloading %s to %s\n", key, localFilePath)

// Create the directories in the path
if err := os.MkdirAll(filepath.Dir(localFilePath), os.ModePerm); err != nil {
return false
}

// Create a file to write the S3 Object contents to.
f, err := os.Create(localFilePath)
if err != nil {
return false
}

resp, err := s3Svc.GetObject(&s3.GetObjectInput{
Bucket: aws.String(cfg.BucketName),
Key: aws.String(key),
})
if err != nil {
f.Close()
continue
}
defer resp.Body.Close()

// Copy the contents of the S3 object to the local file
if _, err := io.Copy(f, resp.Body); err != nil {
f.Close()
return false
}

fmt.Printf("Successfully downloaded %s to %s\n", key, localFilePath)
f.Close()
}
}

fmt.Printf("Successfully downloaded %s to %s\n", *content.Key, localFilePath)
f.Close()
}
return !lastPage // continue paging
})
Expand Down

0 comments on commit b648b80

Please sign in to comment.