Skip to content

Commit

Permalink
Return interfaces in Segment processor framework classes instead of I…
Browse files Browse the repository at this point in the history
…mplementations (#14252)

* change abstraction for writer.

* fix segment fetcher for localfs.

---------

Co-authored-by: Aishik <aishik@startree.ai>
  • Loading branch information
KKcorps and aishikbh authored Oct 18, 2024
1 parent 76b219b commit 9046af2
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ public class PinotFSSegmentFetcher extends BaseSegmentFetcher {
@Override
protected void fetchSegmentToLocalWithoutRetry(URI uri, File dest)
throws Exception {
PinotFSFactory.create(uri.getScheme()).copyToLocalFile(uri, dest);
if (uri.getScheme() == null) {
PinotFSFactory.create(PinotFSFactory.LOCAL_PINOT_FS_SCHEME).copyToLocalFile(uri, dest);
} else {
PinotFSFactory.create(uri.getScheme()).copyToLocalFile(uri, dest);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ public static SegmentFetcher getSegmentFetcher(String protocol) {
return segmentFetcher;
} else {
LOGGER.info("Segment fetcher is not configured for protocol: {}, using default", protocol);
if (protocol == null) {
return PINOT_FS_SEGMENT_FETCHER;
}
switch (protocol) {
case CommonConstants.HTTP_PROTOCOL:
case CommonConstants.HTTPS_PROTOCOL:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.pinot.spi.data.readers.GenericRow;


public class AdaptiveSizeBasedWriter implements AdaptiveConstraintsWriter<GenericRowFileWriter, GenericRow> {
public class AdaptiveSizeBasedWriter implements AdaptiveConstraintsWriter<FileWriter<GenericRow>, GenericRow> {

private final long _bytesLimit;
private long _numBytesWritten;
Expand All @@ -45,7 +45,7 @@ public boolean canWrite() {
}

@Override
public void write(GenericRowFileWriter writer, GenericRow row) throws IOException {
public void write(FileWriter<GenericRow> writer, GenericRow row) throws IOException {
_numBytesWritten += writer.writeData(row);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pinot.core.segment.processing.framework.SegmentProcessorConfig;
import org.apache.pinot.core.segment.processing.genericrow.AdaptiveSizeBasedWriter;
import org.apache.pinot.core.segment.processing.genericrow.FileWriter;
import org.apache.pinot.core.segment.processing.genericrow.GenericRowFileManager;
import org.apache.pinot.core.segment.processing.genericrow.GenericRowFileWriter;
import org.apache.pinot.core.segment.processing.partitioner.Partitioner;
import org.apache.pinot.core.segment.processing.partitioner.PartitionerConfig;
import org.apache.pinot.core.segment.processing.partitioner.PartitionerFactory;
Expand Down Expand Up @@ -245,7 +245,7 @@ private void writeRecord(GenericRow row)
}

// Get the file writer.
GenericRowFileWriter fileWriter = fileManager.getFileWriter();
FileWriter<GenericRow> fileWriter = fileManager.getFileWriter();

// Write the row.
_adaptiveSizeBasedWriter.write(fileWriter, row);
Expand Down

0 comments on commit 9046af2

Please sign in to comment.