Skip to content

Commit

Permalink
filter out non-covid results from bulk uploader csv (#6192)
Browse files Browse the repository at this point in the history
* filter out non-covid results from bulk uploader csv

* add support to flu only files

* cleanup EmptyCsvException

* addess sonar feedback

* correct test file

* address sonar feedback
  • Loading branch information
zdeveloper authored Jul 26, 2023
1 parent 4235b55 commit 653bb30
Show file tree
Hide file tree
Showing 9 changed files with 318 additions and 84 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package gov.cdc.usds.simplereport.api.model.errors;

import java.io.Serial;

public class EmptyCsvException extends Exception {
@Serial private static final long serialVersionUID = 1L;

public EmptyCsvException() {
super("CSV file has no rows");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ public class TestResultRow implements FileRow {
static final String PATIENT_RACE = "patient_race";
static final String PATIENT_ETHNICITY = "patient_ethnicity";
static final String ACCESSION_NUMBER = "accession_number";
static final String EQUIPMENT_MODEL_NAME = "equipment_model_name";
static final String TEST_PERFORMED_CODE = "test_performed_code";
public static final String EQUIPMENT_MODEL_NAME = "equipment_model_name";
public static final String TEST_PERFORMED_CODE = "test_performed_code";
static final String TEST_RESULT = "test_result";
static final String ORDER_TEST_DATE = "order_test_date";
static final String TEST_RESULT_DATE = "test_result_date";
Expand Down Expand Up @@ -387,7 +387,7 @@ private boolean validModelTestPerformedCombination(
&& resultsUploaderCachingService
.getModelAndTestPerformedCodeToDeviceMap()
.containsKey(
ResultsUploaderCachingService.getMapKey(
ResultsUploaderCachingService.getKey(
removeTrailingAsterisk(equipmentModelName), testPerformedCode));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
@EnableCaching
public class CachingConfig {

public static final String COVID_EQUIPMENT_MODEL_AND_TEST_PERFORMED_CODE_SET =
"covidEquipmentModelAndTestPerformedCodeSet";
public static final String DEVICE_MODEL_AND_TEST_PERFORMED_CODE_MAP =
"deviceModelAndTestPerformedCodeMap";
public static final String SPECIMEN_NAME_AND_SNOMED_MAP = "specimenTypeNameSNOMEDMap";
Expand All @@ -18,6 +20,7 @@ public class CachingConfig {
@Bean
public CacheManager cacheManager() {
return new ConcurrentMapCacheManager(
COVID_EQUIPMENT_MODEL_AND_TEST_PERFORMED_CODE_SET,
DEVICE_MODEL_AND_TEST_PERFORMED_CODE_MAP,
SPECIMEN_NAME_AND_SNOMED_MAP,
ADDRESS_TIMEZONE_LOOKUP_MAP);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package gov.cdc.usds.simplereport.service;

import static gov.cdc.usds.simplereport.config.CachingConfig.ADDRESS_TIMEZONE_LOOKUP_MAP;
import static gov.cdc.usds.simplereport.config.CachingConfig.COVID_EQUIPMENT_MODEL_AND_TEST_PERFORMED_CODE_SET;
import static gov.cdc.usds.simplereport.config.CachingConfig.DEVICE_MODEL_AND_TEST_PERFORMED_CODE_MAP;
import static gov.cdc.usds.simplereport.config.CachingConfig.SPECIMEN_NAME_AND_SNOMED_MAP;

Expand All @@ -11,7 +12,9 @@
import gov.cdc.usds.simplereport.db.repository.SpecimenTypeRepository;
import java.time.ZoneId;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -98,7 +101,7 @@ public Map<String, DeviceType> getModelAndTestPerformedCodeToDeviceMap() {
String model = deviceType.getModel();
String testPerformedCode = deviceTypeDisease.getTestPerformedLoincCode();
if (model != null && testPerformedCode != null) {
resultMap.put(getMapKey(model, testPerformedCode), deviceType);
resultMap.put(getKey(model, testPerformedCode), deviceType);
}
}));

Expand All @@ -113,6 +116,46 @@ public void cacheModelAndTestPerformedCodeToDeviceMap() {
getModelAndTestPerformedCodeToDeviceMap();
}

@Cacheable(COVID_EQUIPMENT_MODEL_AND_TEST_PERFORMED_CODE_SET)
public Set<String> getCovidEquipmentModelAndTestPerformedCodeSet() {
log.info("generating covidEquipmentModelAndTestPerformedCodeSet cache");

Set<String> resultSet = new HashSet<>();

deviceTypeRepository
.findAllRecords()
.forEach(
deviceType ->
deviceType
.getSupportedDiseaseTestPerformed()
.forEach(
deviceTypeDisease -> {
if (deviceTypeDisease
.getSupportedDisease()
.getName()
.equals("COVID-19")) {
String model = deviceType.getModel();
String testPerformedCode =
deviceTypeDisease.getTestPerformedLoincCode();
if (model != null && testPerformedCode != null) {
resultSet.add(getKey(model, testPerformedCode));
}
}
}));

return resultSet;
}

@Scheduled(fixedRate = 1, timeUnit = TimeUnit.HOURS)
@Caching(
evict = {
@CacheEvict(value = COVID_EQUIPMENT_MODEL_AND_TEST_PERFORMED_CODE_SET, allEntries = true)
})
public void cacheCovidEquipmentModelAndTestPerformedCodeSet() {
log.info("clear and generate covidEquipmentModelAndTestPerformedCodeSet cache");
getCovidEquipmentModelAndTestPerformedCodeSet();
}

@Cacheable(SPECIMEN_NAME_AND_SNOMED_MAP)
public Map<String, String> getSpecimenTypeNameToSNOMEDMap() {
log.info("generating getSpecimenTypeNameToSNOMEDMap cache");
Expand All @@ -137,7 +180,7 @@ public void cacheSpecimenTypeNameToSNOMEDMap() {
getSpecimenTypeNameToSNOMEDMap();
}

public static String getMapKey(String model, String testPerformedCode) {
public static String getKey(String model, String testPerformedCode) {
return model.toLowerCase() + "|" + testPerformedCode.toLowerCase();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package gov.cdc.usds.simplereport.service;

import static gov.cdc.usds.simplereport.api.model.filerow.TestResultRow.EQUIPMENT_MODEL_NAME;
import static gov.cdc.usds.simplereport.api.model.filerow.TestResultRow.TEST_PERFORMED_CODE;
import static gov.cdc.usds.simplereport.utils.AsyncLoggingUtils.withMDC;
import static gov.cdc.usds.simplereport.utils.DateTimeUtils.convertToZonedDateTime;
import static gov.cdc.usds.simplereport.validators.CsvValidatorUtils.getIteratorForCsv;
Expand All @@ -15,6 +17,7 @@
import feign.FeignException;
import gov.cdc.usds.simplereport.api.model.errors.CsvProcessingException;
import gov.cdc.usds.simplereport.api.model.errors.DependencyFailureException;
import gov.cdc.usds.simplereport.api.model.errors.EmptyCsvException;
import gov.cdc.usds.simplereport.api.model.filerow.TestResultRow;
import gov.cdc.usds.simplereport.config.AuthorizationConfiguration;
import gov.cdc.usds.simplereport.db.model.Organization;
Expand Down Expand Up @@ -43,10 +46,13 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import lombok.Builder;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -138,33 +144,41 @@ public TestResultUpload processResultCSV(InputStream csvStream) {
}

TestResultUpload csvResult = null;
Future<UploadResponse> csvResponse;
TestResultUpload fhirResult = null;
Future<FutureResult<UploadResponse, Exception>> csvResponse;
Future<UploadResponse> fhirResponse = null;

if (content.length > 0) {
csvResponse = submitResultsAsCsv(content);

csvResponse = submitResultsAsCsv(content);
if (fhirEnabled) {
fhirResponse = submitResultsAsFhir(new ByteArrayInputStream(content), org);
}

try {
if (csvResponse.get() == null) {
throw new DependencyFailureException("Unable to parse Report Stream response.");
if (csvResponse.get().getError() instanceof DependencyFailureException) {
throw (DependencyFailureException) csvResponse.get().getError();
}
csvResult = saveSubmissionToDb(csvResponse.get(), org, submissionId);
if (fhirResponse != null) {
saveSubmissionToDb(fhirResponse.get(), org, submissionId);

if (csvResponse.get().getValue() != null) {
csvResult = saveSubmissionToDb(csvResponse.get().getValue(), org, submissionId);
}
} catch (CsvProcessingException e) {
log.error("Error processing CSV in Bulk Result Upload", e);

} catch (CsvProcessingException | ExecutionException | InterruptedException e) {
log.error("Error processing csv in bulk result upload", e);
Thread.currentThread().interrupt();
} catch (ExecutionException | InterruptedException e) {
log.error("Error Processing Bulk Result Upload.", e);
}

try {
if (fhirResponse != null && fhirResponse.get() != null) {
fhirResult = saveSubmissionToDb(fhirResponse.get(), org, submissionId);
}
} catch (CsvProcessingException | ExecutionException | InterruptedException e) {
log.error("Error processing FHIR in bulk result upload", e);
Thread.currentThread().interrupt();
}
}
return csvResult;
return Optional.ofNullable(csvResult).orElse(fhirResult);
}

private byte[] transformCsvContent(byte[] content) {
Expand All @@ -180,8 +194,16 @@ private byte[] transformCsvContent(byte[] content) {
log.error("Unable to parse csv.", ex);
continue;
}
updatedRows.add(transformCsvRow(row));

if (isCovidResult(row)) {
updatedRows.add(transformCsvRow(row));
}
}

if (updatedRows.isEmpty()) {
return new byte[0];
}

var headers = updatedRows.stream().flatMap(row -> row.keySet().stream()).distinct().toList();
var csvMapper =
new CsvMapper()
Expand All @@ -202,6 +224,14 @@ private byte[] transformCsvContent(byte[] content) {
return csvContent.getBytes(StandardCharsets.UTF_8);
}

private boolean isCovidResult(Map<String, String> row) {
String equipmentModelName = row.get(EQUIPMENT_MODEL_NAME);
String testPerformedCode = row.get(TEST_PERFORMED_CODE);
return resultsUploaderCachingService
.getCovidEquipmentModelAndTestPerformedCodeSet()
.contains(ResultsUploaderCachingService.getKey(equipmentModelName, testPerformedCode));
}

private Map<String, String> transformCsvRow(Map<String, String> row) {

if (!"P".equals(processingModeCodeValue)
Expand Down Expand Up @@ -347,22 +377,42 @@ private Future<UploadResponse> submitResultsAsFhir(
}));
}

private Future<UploadResponse> submitResultsAsCsv(byte[] content) {
private Future<FutureResult<UploadResponse, Exception>> submitResultsAsCsv(byte[] content) {
return CompletableFuture.supplyAsync(
withMDC(
() -> {
long start = System.currentTimeMillis();
UploadResponse response;
FutureResult<UploadResponse, Exception> result;
var csvContent = transformCsvContent(content);
if (csvContent.length == 0) {
return FutureResult.<UploadResponse, Exception>builder()
.error(new EmptyCsvException())
.build();
}
try {
response = _client.uploadCSV(csvContent);
result =
FutureResult.<UploadResponse, Exception>builder()
.value(_client.uploadCSV(csvContent))
.build();
} catch (FeignException e) {
log.info("RS CSV API Error " + e.status() + " Response: " + e.contentUTF8());
response = parseFeignException(e);
try {
UploadResponse value = mapper.readValue(e.contentUTF8(), UploadResponse.class);
result = FutureResult.<UploadResponse, Exception>builder().value(value).build();

} catch (JsonProcessingException ex) {
log.error("Unable to parse Report Stream response.", ex);
result =
FutureResult.<UploadResponse, Exception>builder()
.error(
new DependencyFailureException(
"Unable to parse Report Stream response."))
.build();
}
}
log.info(
"CSV submitted in " + (System.currentTimeMillis() - start) + " milliseconds");
return response;
return result;
}));
}

Expand Down Expand Up @@ -415,4 +465,11 @@ public TestResultUpload processHIVResultCSV(InputStream csvStream) {
return new TestResultUpload(
UUID.randomUUID(), UUID.randomUUID(), UploadStatus.PENDING, 0, null, empty, empty);
}

@Getter
@Builder
public static class FutureResult<V, E> {
private V value;
private E error;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ private Bundle convertRowToFhirBundle(TestResultRow row, UUID orgId) {
var matchingDevice =
resultsUploaderCachingService
.getModelAndTestPerformedCodeToDeviceMap()
.get(ResultsUploaderCachingService.getMapKey(modelName, testPerformedCode));
.get(ResultsUploaderCachingService.getKey(modelName, testPerformedCode));

if (matchingDevice != null) {
List<DeviceTypeDisease> deviceTypeDiseaseEntries =
Expand Down
Loading

0 comments on commit 653bb30

Please sign in to comment.