Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AJ-1517 download files only once #448

Merged
merged 25 commits into from
Jan 26, 2024
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
6ba24d4
first pass one download
calypsomatic Jan 9, 2024
d3a173c
merge main and fix test
calypsomatic Jan 9, 2024
c8f2e2f
some cleanup
calypsomatic Jan 9, 2024
04b1ba4
first pass attempt to use directories WIP
calypsomatic Jan 10, 2024
12d1310
Merge branch 'main' into aj-1517-direct-http
calypsomatic Jan 10, 2024
feb9021
merge main
calypsomatic Jan 11, 2024
7af31b8
clean up
calypsomatic Jan 17, 2024
6929e4c
Merge branch 'main' into aj-1517-direct-http
calypsomatic Jan 17, 2024
b00b902
spotless
calypsomatic Jan 17, 2024
4e6c17b
back to multimap
calypsomatic Jan 18, 2024
c938683
minor sonar improvements
calypsomatic Jan 18, 2024
7122290
try custom filedownload class
calypsomatic Jan 18, 2024
bbcfcd9
cleanup/pr comments
calypsomatic Jan 19, 2024
daaa3c1
Merge branch 'main' into aj-1517-direct-http
calypsomatic Jan 19, 2024
f5b2fae
add retries
calypsomatic Jan 19, 2024
7db74d6
Merge branch 'main' into aj-1517-direct-http
calypsomatic Jan 19, 2024
421bb39
Add DownloadHelper seam for test override. (#468)
jladieu Jan 22, 2024
e7554a5
downloadhelper retry test
calypsomatic Jan 22, 2024
c29481f
Merge branch 'main' into aj-1517-direct-http
calypsomatic Jan 23, 2024
3b1816d
Merge branch 'main' into aj-1517-direct-http
calypsomatic Jan 24, 2024
51e0682
remove retry
calypsomatic Jan 24, 2024
3e2e841
actually remove retry
calypsomatic Jan 24, 2024
31de46d
use permissions to satisfy sonar
calypsomatic Jan 25, 2024
140d817
minor sonar complaints
calypsomatic Jan 25, 2024
16daf59
fix spotless
calypsomatic Jan 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package org.databiosphere.workspacedataservice.dataimport;

import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.PosixFilePermission;
import java.util.EnumSet;
import java.util.Set;
import org.apache.commons.io.FileUtils;
import org.databiosphere.workspacedataservice.service.model.exception.TdrManifestImportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FileDownloadHelper {

private final Logger logger = LoggerFactory.getLogger(this.getClass());
private final Path tempFileDir;
private final Multimap<String, File> fileMap;
private final Set<PosixFilePermission> permissions = EnumSet.of(PosixFilePermission.OWNER_READ);

public FileDownloadHelper(String dirName) throws IOException {
this.tempFileDir = Files.createTempDirectory(dirName);
this.fileMap = HashMultimap.create();
}

public void downloadFileFromURL(String tableName, URL pathToRemoteFile) {
try {
File tempFile = File.createTempFile(/* prefix= */ "tdr-", /* suffix= */ "download");
logger.info("downloading to temp file {} ...", tempFile.getPath());
FileUtils.copyURLToFile(pathToRemoteFile, tempFile);
// In the TDR manifest, for Azure snapshots only,
// the first file in the list will always be a directory.
// Attempting to import that directory
// will fail; it has no content. To avoid those failures,
// check files for length and ignore any that are empty
if (tempFile.length() == 0) {
logger.info("Empty file in parquet, skipping");
Files.delete(tempFile.toPath());
} else {
// Once the remote file has been copied to the temp file, make it read-only
Files.setPosixFilePermissions(tempFile.toPath(), permissions);
fileMap.put(tableName, tempFile);
}
} catch (IOException e) {
throw new TdrManifestImportException(e.getMessage(), e);
}
}

public void deleteFileDirectory() {
try {
Files.delete(tempFileDir);
} catch (IOException e) {
logger.error("Error deleting temporary files: {}", e.getMessage());
}
}

public Multimap<String, File> getFileMap() {
return fileMap;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,14 @@
import java.util.Set;
import java.util.UUID;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.parquet.io.InputFile;
import org.databiosphere.workspacedataservice.activitylog.ActivityLogger;
import org.databiosphere.workspacedataservice.dao.JobDao;
import org.databiosphere.workspacedataservice.dataimport.FileDownloadHelper;
import org.databiosphere.workspacedataservice.dataimport.WsmSnapshotSupport;
import org.databiosphere.workspacedataservice.jobexec.JobExecutionException;
import org.databiosphere.workspacedataservice.jobexec.QuartzJob;
Expand Down Expand Up @@ -106,16 +103,24 @@ protected void executeInternal(UUID jobId, JobExecutionContext context) {
List<TdrManifestImportTable> tdrManifestImportTables =
extractTableInfo(snapshotExportResponseModel);

// get all the parquet files from the manifests

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this class should be responsible for creating the temp directory, and passing the Path to that directory down through getFilesForImport

Then, later on, when this class wants to delete all the downloaded files, it can just delete the directory it already has a reference to.

FileDownloadHelper fileDownloadHelper = getFilesForImport(tdrManifestImportTables);

// loop through the tables to be imported and upsert base attributes
var result =
importTables(
tdrManifestImportTables,
fileDownloadHelper.getFileMap(),
targetInstance,
TwoPassStreamingWriteHandler.ImportMode.BASE_ATTRIBUTES);
calypsomatic marked this conversation as resolved.
Show resolved Hide resolved

// add relations to the existing base attributes
importTables(
tdrManifestImportTables, targetInstance, TwoPassStreamingWriteHandler.ImportMode.RELATIONS);
tdrManifestImportTables,
fileDownloadHelper.getFileMap(),
targetInstance,
TwoPassStreamingWriteHandler.ImportMode.RELATIONS);

// activity logging for import status
// no specific activity logging for relations since main import is a superset
Expand All @@ -130,61 +135,38 @@ protected void executeInternal(UUID jobId, JobExecutionContext context) {
.withRecordType(entry.getKey())
.ofQuantity(entry.getValue()));
});
// delete temp files after everything else is completed
// Any failed deletions will be removed if/when pod restarts
fileDownloadHelper.deleteFileDirectory();
}

/**
* Given a single Parquet file to be imported, import it
*
* @param path path to Parquet file to be imported.
* @param inputFile Parquet file to be imported.
* @param table info about the table to be imported
* @param targetInstance instance into which to import
* @param importMode mode for this invocation
* @return statistics on what was imported
*/
@VisibleForTesting
BatchWriteResult importTable(
URL path,
InputFile inputFile,
TdrManifestImportTable table,
UUID targetInstance,
TwoPassStreamingWriteHandler.ImportMode importMode) {
try {
// download the file from the URL to a temp file on the local filesystem
// Azure urls, with SAS tokens, don't need any particular auth.
// TODO AJ-1517 can we access the URL directly, no temp file?
File tempFile = File.createTempFile("tdr-", "download");
logger.info("downloading to temp file {} ...", tempFile.getPath());
FileUtils.copyURLToFile(path, tempFile);
Path hadoopFilePath = new Path(tempFile.getPath());
// do we need any other config here?
Configuration configuration = new Configuration();

// In the TDR manifest, for Azure snapshots only,
// the first file in the list will always be a directory. Attempting to import that directory
// will fail; it has no content. To avoid those failures,
// check files for length and ignore any that are empty
FileSystem fileSystem = FileSystem.get(configuration);
FileStatus fileStatus = fileSystem.getFileStatus(hadoopFilePath);
if (fileStatus.getLen() == 0) {
logger.info("Empty file in parquet, skipping");
return BatchWriteResult.empty();
}

// generate the HadoopInputFile
InputFile inputFile = HadoopInputFile.fromPath(hadoopFilePath, configuration);

// upsert this parquet file's contents
try (ParquetReader<GenericRecord> avroParquetReader =
AvroParquetReader.<GenericRecord>builder(inputFile)
.set(READ_INT96_AS_FIXED, "true")
.build()) {
logger.info("batch-writing records for file ...");

BatchWriteResult result =
batchWriteService.batchWriteParquetStream(
avroParquetReader, targetInstance, table, importMode);

return result;
}
// upsert this parquet file's contents
try (ParquetReader<GenericRecord> avroParquetReader =
AvroParquetReader.<GenericRecord>builder(inputFile)
.set(READ_INT96_AS_FIXED, "true")
.build()) {
logger.info("batch-writing records for file ...");

BatchWriteResult result =
batchWriteService.batchWriteParquetStream(
avroParquetReader, targetInstance, table, importMode);

return result;
} catch (Throwable t) {
logger.error("Hit an error on file: {}", t.getMessage());
throw new TdrManifestImportException(t.getMessage());
Expand All @@ -200,6 +182,7 @@ BatchWriteResult importTable(
*/
private BatchWriteResult importTables(
List<TdrManifestImportTable> importTables,
Multimap<String, File> fileMap,
UUID targetInstance,
TwoPassStreamingWriteHandler.ImportMode importMode) {

Expand All @@ -209,27 +192,67 @@ private BatchWriteResult importTables(
importTable -> {
logger.info("Processing table '{}' ...", importTable.recordType().getName());

// find all Parquet files for this table
List<URL> paths = importTable.dataFiles();
logger.debug(
"Table '{}' has {} export file(s) ...",
importTable.recordType().getName(),
paths.size());

// loop through each parquet file
paths.forEach(
path -> {
var result = importTable(path, importTable, targetInstance, importMode);

if (result != null) {
combinedResult.merge(result);
}
});
fileMap
.get(importTable.recordType().getName())
.forEach(
file -> {
try {
org.apache.hadoop.fs.Path hadoopFilePath =
new org.apache.hadoop.fs.Path(file.toString());
Configuration configuration = new Configuration();

// generate the HadoopInputFile
InputFile inputFile = HadoopInputFile.fromPath(hadoopFilePath, configuration);
var result = importTable(inputFile, importTable, targetInstance, importMode);
if (result != null) {
combinedResult.merge(result);
}
} catch (IOException e) {
throw new TdrManifestImportException(e.getMessage(), e);
}
});
});

return combinedResult;
}

/**
* Given the list of tables/data files to be imported, loop through and download each one to a
* temporary file
*
* @param importTables tables to be imported
* @return path for the directory where downloaded files are located
*/
@VisibleForTesting
FileDownloadHelper getFilesForImport(List<TdrManifestImportTable> importTables) {
try {
FileDownloadHelper fileDownloadHelper = new FileDownloadHelper("tempParquetDir");

// loop through the tables that have data files.
importTables.forEach(
importTable -> {
logger.info("Fetching files for table '{}' ...", importTable.recordType().getName());

// find all Parquet files for this table
List<URL> paths = importTable.dataFiles();
logger.debug(
"Table '{}' has {} export file(s) ...",
importTable.recordType().getName(),
paths.size());

// loop through each parquet file
paths.forEach(
path -> {
fileDownloadHelper.downloadFileFromURL(importTable.recordType().getName(), path);
});
});

return fileDownloadHelper;
} catch (IOException e) {
throw new TdrManifestImportException("Error downloading temporary files", e);
}
}

/**
* Read the manifest from the user-specified URL into a SnapshotExportResponseModel java object
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package org.databiosphere.workspacedataservice.dataimport;

import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;

import java.io.IOException;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.core.io.Resource;

@SpringBootTest
public class FileDownloadHelperTest {

@Value("classpath:parquet/empty.parquet")
Resource emptyParquet;

@Test
void downloadEmptyFile() throws IOException {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is a duplicate of the test in TdrManifestQuartzJobTest. This seems like the place for it, but should I alter the other test to verify the behavior of the actual TdrManifestQuartzJob with an empty file, or is this test sufficient?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's worth testing both TdrManifestQuartzJob.getFilesForImport() in TdrManifestQuartzJobTest and FileDownloadHelper.downloadFileFromURL() here, even though they're very similar.

FileDownloadHelper helper = new FileDownloadHelper("test");
assertDoesNotThrow(() -> helper.downloadFileFromURL("empty_table", emptyParquet.getURL()));
assert helper.getFileMap().isEmpty();
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package org.databiosphere.workspacedataservice.dataimport.tdr;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
Expand All @@ -12,13 +11,17 @@
import java.io.IOException;
import java.util.List;
import java.util.UUID;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.parquet.io.InputFile;
import org.databiosphere.workspacedataservice.activitylog.ActivityLogger;
import org.databiosphere.workspacedataservice.dao.JobDao;
import org.databiosphere.workspacedataservice.dataimport.FileDownloadHelper;
import org.databiosphere.workspacedataservice.dataimport.tdr.TdrManifestExemplarData.AzureSmall;
import org.databiosphere.workspacedataservice.recordstream.TwoPassStreamingWriteHandler;
import org.databiosphere.workspacedataservice.retry.RestClientRetry;
import org.databiosphere.workspacedataservice.service.BatchWriteService;
import org.databiosphere.workspacedataservice.service.model.BatchWriteResult;
import org.databiosphere.workspacedataservice.service.model.TdrManifestImportTable;
import org.databiosphere.workspacedataservice.service.model.exception.TdrManifestImportException;
import org.databiosphere.workspacedataservice.shared.model.RecordType;
Expand Down Expand Up @@ -113,15 +116,9 @@ void parseEmptyParquet() throws IOException {
List.of());

// An empty file should not throw any errors
BatchWriteResult actual =
assertDoesNotThrow(
() ->
tdrManifestQuartzJob.importTable(
emptyParquet.getURL(),
table,
workspaceId,
TwoPassStreamingWriteHandler.ImportMode.BASE_ATTRIBUTES));
assertThat(actual.entrySet()).isEmpty();
FileDownloadHelper fileMap =
assertDoesNotThrow(() -> tdrManifestQuartzJob.getFilesForImport(List.of(table)));
assert (fileMap.getFileMap().isEmpty());
}

/*
Expand Down Expand Up @@ -153,12 +150,16 @@ void parseMalformedParquet() throws IOException {
List.of(malformedParquet.getURL()),
List.of());

InputFile malformedFile =
HadoopInputFile.fromPath(
new Path(malformedParquet.getURL().toString()), new Configuration());

// Make sure real errors on parsing parquets are not swallowed
assertThrows(
TdrManifestImportException.class,
() ->
tdrManifestQuartzJob.importTable(
manifestAzure.getURL(),
malformedFile,
table,
workspaceId,
TwoPassStreamingWriteHandler.ImportMode.BASE_ATTRIBUTES));
Expand Down
Loading