Skip to content

Commit

Permalink
Fix PlainText data source making bad spectra count estimate
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanielsherry committed Jan 11, 2024
1 parent 2467e11 commit 705022d
Showing 1 changed file with 44 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,21 +71,24 @@ public String pluginUUID() {

@Override
public void read(DataSourceContext ctx) throws DataSourceReadException, IOException, InterruptedException {
List<DataInputAdapter> files = ctx.inputs();
List<DataInputAdapter> inputs = ctx.inputs();

if (files == null) throw new UnsupportedOperationException();
if (files.size() == 0) throw new UnsupportedOperationException();
if (files.size() > 1) throw new UnsupportedOperationException();

DataInputAdapter file = files.get(0);

var filesize = file.size();
int lineEstimate = -1;
if (inputs == null) throw new UnsupportedOperationException();
if (inputs.size() == 0) throw new UnsupportedOperationException();
if (inputs.size() > 1) throw new UnsupportedOperationException();

DataInputAdapter file = inputs.get(0);
scandata = new PipelineScanData(file.getBasename());



// Variables for getting a linecount estimate
final int LINE_ESTIMATE_SAMPLE_SIZE = 5;
var filesize = file.size();
Spectrum linesizes = new ArraySpectrum(LINE_ESTIMATE_SAMPLE_SIZE);
int linecountEstimate = -1;
boolean hasLinecountEstimate = false;


// CSV parser used to read this file
CsvParserSettings settings = new CsvParserSettings();
//Note: the order of the delimiters here matters (!) because it will
//determine the delimiter chosen in the event of a tie. I believe that
Expand All @@ -99,23 +102,33 @@ public void read(DataSourceContext ctx) throws DataSourceReadException, IOExcept
settings.setMaxCharsPerColumn(24);
CsvParser parser = new CsvParser(settings);

InputStream instream = file.getInputStream();;
InputStream instream = file.getInputStream();

int index = 0;
int readcount = 0;
int notificationInterval = 50;
for (String[] row : parser.iterate(instream)) {
int scanIndex = index++;

if (lineEstimate == -1 && filesize.isPresent()) {
lineEstimate = ((int)filesize.get().longValue()) / (String.join(" ", row).length());
getInteraction().notifyScanCount(lineEstimate);
}

if (getInteraction().checkReadAborted()) {
scandata.abort();
break;
// Estimating the number of rows by the length of the first n rows against the
// length of the whole file
if (!hasLinecountEstimate && filesize.isPresent()) {
// Track each linesize
linesizes.add(String.join(" ", row).length());

// After we have a few data points, calculate an average linesize and do the
// calculation.
if (readcount == LINE_ESTIMATE_SAMPLE_SIZE) {
hasLinecountEstimate = true;
float averageLinesize = linesizes.sum() / linesizes.size();
float bytes = filesize.get().floatValue();
linecountEstimate = (int)Math.round(bytes / averageLinesize);
getInteraction().notifyScanCount(linecountEstimate);
notificationInterval = (int) Math.min(Math.max(notificationInterval, linecountEstimate / 100f), 1000);
}
}



char delim = parser.getDetectedFormat().getDelimiter();
ScanEntry entry = new PlainTextScanEntry(scanIndex, row, delim, sizes);
Expand All @@ -124,9 +137,19 @@ public void read(DataSourceContext ctx) throws DataSourceReadException, IOExcept
scandata.submit(entry);
readcount++;

if (readcount == 50) {
// Every so often we check in with a listener-ish component
if (readcount == notificationInterval) {

// Progress
getInteraction().notifyScanRead(readcount);
readcount = 0;

// Check for abort request
if (getInteraction().checkReadAborted()) {
scandata.abort();
break;
}

}

}
Expand Down

0 comments on commit 705022d

Please sign in to comment.