Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AJ-1517 download files only once #448

Merged
merged 25 commits into from
Jan 26, 2024
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
6ba24d4
first pass one download
calypsomatic Jan 9, 2024
d3a173c
merge main and fix test
calypsomatic Jan 9, 2024
c8f2e2f
some cleanup
calypsomatic Jan 9, 2024
04b1ba4
first pass attempt to use directories WIP
calypsomatic Jan 10, 2024
12d1310
Merge branch 'main' into aj-1517-direct-http
calypsomatic Jan 10, 2024
feb9021
merge main
calypsomatic Jan 11, 2024
7af31b8
clean up
calypsomatic Jan 17, 2024
6929e4c
Merge branch 'main' into aj-1517-direct-http
calypsomatic Jan 17, 2024
b00b902
spotless
calypsomatic Jan 17, 2024
4e6c17b
back to multimap
calypsomatic Jan 18, 2024
c938683
minor sonar improvements
calypsomatic Jan 18, 2024
7122290
try custom filedownload class
calypsomatic Jan 18, 2024
bbcfcd9
cleanup/pr comments
calypsomatic Jan 19, 2024
daaa3c1
Merge branch 'main' into aj-1517-direct-http
calypsomatic Jan 19, 2024
f5b2fae
add retries
calypsomatic Jan 19, 2024
7db74d6
Merge branch 'main' into aj-1517-direct-http
calypsomatic Jan 19, 2024
421bb39
Add DownloadHelper seam for test override. (#468)
jladieu Jan 22, 2024
e7554a5
downloadhelper retry test
calypsomatic Jan 22, 2024
c29481f
Merge branch 'main' into aj-1517-direct-http
calypsomatic Jan 23, 2024
3b1816d
Merge branch 'main' into aj-1517-direct-http
calypsomatic Jan 24, 2024
51e0682
remove retry
calypsomatic Jan 24, 2024
3e2e841
actually remove retry
calypsomatic Jan 24, 2024
31de46d
use permissions to satisfy sonar
calypsomatic Jan 25, 2024
140d817
minor sonar complaints
calypsomatic Jan 25, 2024
16daf59
fix spotless
calypsomatic Jan 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package org.databiosphere.workspacedataservice.dataimport;

import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.PosixFilePermission;
import java.util.EnumSet;
import java.util.Set;
import org.apache.commons.io.FileUtils;
import org.databiosphere.workspacedataservice.service.model.exception.TdrManifestImportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.retry.annotation.Backoff;
import org.springframework.retry.annotation.Retryable;

public class FileDownloadHelper {

private final DownloadHelper downloadHelper;

public interface DownloadHelper {
default void copyURLToFile(URL sourceUrl, File destinationFile) throws IOException {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needed this method to not be abstract in order to mock it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very weird, I wasn't expecting that

{
FileUtils.copyURLToFile(sourceUrl, destinationFile);
}
}
}

private final Logger logger = LoggerFactory.getLogger(this.getClass());
private final Path tempFileDir;
private final Multimap<String, File> fileMap;
private final Set<PosixFilePermission> permissions =
EnumSet.of(
PosixFilePermission.OWNER_READ,
PosixFilePermission.GROUP_READ,
PosixFilePermission.OTHERS_READ);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could this be locked down even further, to just OWNER_READ? Is there a need for group/others to also read?


public FileDownloadHelper(String dirName, DownloadHelper downloadHelper) throws IOException {
this.tempFileDir = Files.createTempDirectory(dirName);
this.downloadHelper = downloadHelper;
this.fileMap = HashMultimap.create();
}

public FileDownloadHelper(String dirName) throws IOException {
this(
dirName,
new DownloadHelper() {
Copy link
Contributor

@jladieu jladieu Jan 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this doesn't need the anonymous sublcass & override anymore since the default impl does the right thing. Just pass in new DownloadHelper()

@Override
public void copyURLToFile(URL sourceUrl, File destinationFile) throws IOException {
DownloadHelper.super.copyURLToFile(sourceUrl, destinationFile);
}
});
}

@Retryable(maxAttempts = 3, backoff = @Backoff(delay = 1000))
public void downloadFileFromURL(String tableName, URL pathToRemoteFile) {
try {
File tempFile =
File.createTempFile(/* prefix= */ "tdr-", /* suffix= */ "download", tempFileDir.toFile());
logger.info("downloading to temp file {} ...", tempFile.getPath());
downloadHelper.copyURLToFile(pathToRemoteFile, tempFile);
// In the TDR manifest, for Azure snapshots only,
// the first file in the list will always be a directory.
// Attempting to import that directory
// will fail; it has no content. To avoid those failures,
// check files for length and ignore any that are empty
if (tempFile.length() == 0) {
logger.info("Empty file in parquet, skipping");
Files.delete(tempFile.toPath());
} else {
// Once the remote file has been copied to the temp file, make it read-only
Files.setPosixFilePermissions(tempFile.toPath(), permissions);
fileMap.put(tableName, tempFile);
}
} catch (IOException e) {
throw new TdrManifestImportException(e.getMessage(), e);
}
}

public void deleteFileDirectory() {
try {
Files.delete(tempFileDir);
} catch (IOException e) {
logger.error("Error deleting temporary files: {}", e.getMessage());
}
}

public Multimap<String, File> getFileMap() {
return this.fileMap;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,18 @@
import java.util.Set;
import java.util.UUID;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.parquet.io.InputFile;
import org.databiosphere.workspacedataservice.activitylog.ActivityLogger;
import org.databiosphere.workspacedataservice.dao.JobDao;
import org.databiosphere.workspacedataservice.dataimport.FileDownloadHelper;
import org.databiosphere.workspacedataservice.dataimport.WsmSnapshotSupport;
import org.databiosphere.workspacedataservice.jobexec.JobExecutionException;
import org.databiosphere.workspacedataservice.jobexec.QuartzJob;
import org.databiosphere.workspacedataservice.recordstream.TwoPassStreamingWriteHandler;
import org.databiosphere.workspacedataservice.recordstream.TwoPassStreamingWriteHandler.ImportMode;
import org.databiosphere.workspacedataservice.retry.RestClientRetry;
import org.databiosphere.workspacedataservice.service.BatchWriteService;
import org.databiosphere.workspacedataservice.service.model.BatchWriteResult;
Expand Down Expand Up @@ -106,16 +103,19 @@ protected void executeInternal(UUID jobId, JobExecutionContext context) {
List<TdrManifestImportTable> tdrManifestImportTables =
extractTableInfo(snapshotExportResponseModel);

// get all the parquet files from the manifests
FileDownloadHelper files = getFilesForImport(tdrManifestImportTables);

// loop through the tables to be imported and upsert base attributes
var result =
importTables(
tdrManifestImportTables,
files.getFileMap(),
targetInstance,
TwoPassStreamingWriteHandler.ImportMode.BASE_ATTRIBUTES);
ImportMode.BASE_ATTRIBUTES);

// add relations to the existing base attributes
importTables(
tdrManifestImportTables, targetInstance, TwoPassStreamingWriteHandler.ImportMode.RELATIONS);
importTables(tdrManifestImportTables, files.getFileMap(), targetInstance, ImportMode.RELATIONS);

// activity logging for import status
// no specific activity logging for relations since main import is a superset
Expand All @@ -130,61 +130,38 @@ protected void executeInternal(UUID jobId, JobExecutionContext context) {
.withRecordType(entry.getKey())
.ofQuantity(entry.getValue()));
});
// delete temp files after everything else is completed
// Any failed deletions will be removed if/when pod restarts
files.deleteFileDirectory();
}

/**
* Given a single Parquet file to be imported, import it
*
* @param path path to Parquet file to be imported.
* @param inputFile Parquet file to be imported.
* @param table info about the table to be imported
* @param targetInstance instance into which to import
* @param importMode mode for this invocation
* @return statistics on what was imported
*/
@VisibleForTesting
BatchWriteResult importTable(
URL path,
InputFile inputFile,
TdrManifestImportTable table,
UUID targetInstance,
TwoPassStreamingWriteHandler.ImportMode importMode) {
try {
// download the file from the URL to a temp file on the local filesystem
// Azure urls, with SAS tokens, don't need any particular auth.
// TODO AJ-1517 can we access the URL directly, no temp file?
File tempFile = File.createTempFile("tdr-", "download");
logger.info("downloading to temp file {} ...", tempFile.getPath());
FileUtils.copyURLToFile(path, tempFile);
Path hadoopFilePath = new Path(tempFile.getPath());
// do we need any other config here?
Configuration configuration = new Configuration();

// In the TDR manifest, for Azure snapshots only,
// the first file in the list will always be a directory. Attempting to import that directory
// will fail; it has no content. To avoid those failures,
// check files for length and ignore any that are empty
FileSystem fileSystem = FileSystem.get(configuration);
FileStatus fileStatus = fileSystem.getFileStatus(hadoopFilePath);
if (fileStatus.getLen() == 0) {
logger.info("Empty file in parquet, skipping");
return BatchWriteResult.empty();
}

// generate the HadoopInputFile
InputFile inputFile = HadoopInputFile.fromPath(hadoopFilePath, configuration);

// upsert this parquet file's contents
try (ParquetReader<GenericRecord> avroParquetReader =
AvroParquetReader.<GenericRecord>builder(inputFile)
.set(READ_INT96_AS_FIXED, "true")
.build()) {
logger.info("batch-writing records for file ...");

BatchWriteResult result =
batchWriteService.batchWriteParquetStream(
avroParquetReader, targetInstance, table, importMode);

return result;
}
ImportMode importMode) {
// upsert this parquet file's contents
try (ParquetReader<GenericRecord> avroParquetReader =
AvroParquetReader.<GenericRecord>builder(inputFile)
.set(READ_INT96_AS_FIXED, "true")
.build()) {
logger.info("batch-writing records for file ...");

BatchWriteResult result =
batchWriteService.batchWriteParquetStream(
avroParquetReader, targetInstance, table, importMode);

return result;
} catch (Throwable t) {
logger.error("Hit an error on file: {}", t.getMessage());
throw new TdrManifestImportException(t.getMessage());
Expand All @@ -200,36 +177,77 @@ BatchWriteResult importTable(
*/
private BatchWriteResult importTables(
List<TdrManifestImportTable> importTables,
Multimap<String, File> fileMap,
UUID targetInstance,
TwoPassStreamingWriteHandler.ImportMode importMode) {
ImportMode importMode) {

var combinedResult = BatchWriteResult.empty();
// loop through the tables that have data files.
importTables.forEach(
importTable -> {
logger.info("Processing table '{}' ...", importTable.recordType().getName());

// find all Parquet files for this table
List<URL> paths = importTable.dataFiles();
logger.debug(
"Table '{}' has {} export file(s) ...",
importTable.recordType().getName(),
paths.size());

// loop through each parquet file
paths.forEach(
path -> {
var result = importTable(path, importTable, targetInstance, importMode);

if (result != null) {
combinedResult.merge(result);
}
});
fileMap
.get(importTable.recordType().getName())
.forEach(
file -> {
try {
org.apache.hadoop.fs.Path hadoopFilePath =
new org.apache.hadoop.fs.Path(file.toString());
Configuration configuration = new Configuration();

// generate the HadoopInputFile
InputFile inputFile = HadoopInputFile.fromPath(hadoopFilePath, configuration);
var result = importTable(inputFile, importTable, targetInstance, importMode);
if (result != null) {
combinedResult.merge(result);
}
} catch (IOException e) {
throw new TdrManifestImportException(e.getMessage(), e);
}
});
});

return combinedResult;
}

/**
* Given the list of tables/data files to be imported, loop through and download each one to a
* temporary file
*
* @param importTables tables to be imported
* @return path for the directory where downloaded files are located
*/
@VisibleForTesting
FileDownloadHelper getFilesForImport(List<TdrManifestImportTable> importTables) {
try {
FileDownloadHelper files = new FileDownloadHelper("tempParquetDir");

// loop through the tables that have data files.
importTables.forEach(
importTable -> {
logger.info("Fetching files for table '{}' ...", importTable.recordType().getName());

// find all Parquet files for this table
List<URL> paths = importTable.dataFiles();
logger.debug(
"Table '{}' has {} export file(s) ...",
importTable.recordType().getName(),
paths.size());

// loop through each parquet file
paths.forEach(
path -> {
files.downloadFileFromURL(importTable.recordType().getName(), path);
});
});

return files;
} catch (IOException e) {
throw new TdrManifestImportException("Error downloading temporary files", e);
}
}

/**
* Read the manifest from the user-specified URL into a SnapshotExportResponseModel java object
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package org.databiosphere.workspacedataservice.dataimport;

import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import java.io.File;
import java.io.IOException;
import java.net.URL;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.core.io.Resource;
import org.springframework.retry.backoff.FixedBackOffPolicy;
import org.springframework.retry.support.RetryTemplate;

@SpringBootTest
public class FileDownloadHelperTest {

@Value("classpath:parquet/empty.parquet")
Resource emptyParquet;

@Value("classpath:parquet/v2f/all_data_types.parquet")
Resource allDataTypesParquet;

@Test
void downloadEmptyFile() throws IOException {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is a duplicate of the test in TdrManifestQuartzJobTest. This seems like the place for it, but should I alter the other test to verify the behavior of the actual TdrManifestQuartzJob with an empty file, or is this test sufficient?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's worth testing both TdrManifestQuartzJob.getFilesForImport() in TdrManifestQuartzJobTest and FileDownloadHelper.downloadFileFromURL() here, even though they're very similar.

FileDownloadHelper helper = new FileDownloadHelper("test");
assertDoesNotThrow(() -> helper.downloadFileFromURL("empty_table", emptyParquet.getURL()));
assert helper.getFileMap().isEmpty();
}

@Test
void testRetry() throws Exception {
FileDownloadHelper.DownloadHelper mockDownloadHelper =
Mockito.mock(FileDownloadHelper.DownloadHelper.class);
doThrow(new IOException("Simulated connection error"))
.doCallRealMethod() // Succeed on the second attempt
.when(mockDownloadHelper)
.copyURLToFile(any(URL.class), any(File.class));

// Create a RetryTemplate to set off Spring's retryable
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spring's @Retryable wasn't working and among the several hideous options I could find to try to rig it to trigger this one seemed the least convoluted

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fact that you need to include a RetryTemplate here is indication that @Retryable is not working correctly in FileDownloadHelper. From this test, you should be able to just call helper.downloadFileFromURL() outside of a RetryTemplate and it will work.

I am pretty certain the reason that @Retryable is not working is that FileDownloadHelper is not a Spring bean, and therefore Spring can't set up any proxying for it and therefore can't implement retries. To use @Retryable you'll have to lean into Spring-ness. I can see a few options:

  1. FileDownloadHelper is a singleton bean (which is what we use everywhere else), and you move all statefulness about the temp dir into DownloadHelper
  2. FileDownloadHelper is a prototype bean which gets created on demand, including its temp-dir statefulness. You'll probably need a separate singleton factory bean to do the creation.

there are probably other ways to do it …

( I could pair on this/contribute code if my explanations don't make sense)

RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setBackOffPolicy(new FixedBackOffPolicy());

FileDownloadHelper helper = new FileDownloadHelper("test", mockDownloadHelper);

// A single connectivity error should not throw
assertDoesNotThrow(
() ->
retryTemplate.execute(
context -> {
helper.downloadFileFromURL("table", allDataTypesParquet.getURL());
return null;
}));

// Make sure there actually was a connectivity problem
verify(mockDownloadHelper, times(2)).copyURLToFile(any(URL.class), any(File.class));

// File should successfully download on second attempt
assert helper.getFileMap().containsKey("table");
assert helper.getFileMap().get("table").size() == 1;
}
}
Loading
Loading