Skip to content

Commit

Permalink
Modify segment metadata call
Browse files Browse the repository at this point in the history
  • Loading branch information
cypherean committed Oct 17, 2024
1 parent b556b37 commit 191ce5d
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -400,18 +400,26 @@ public ValidDocIdsBitmapResponse getValidDocIdsBitmapFromServer(String tableName
private String generateAggregateSegmentMetadataServerURL(String tableNameWithType, List<String> columns,
String endpoint) {
tableNameWithType = URLEncoder.encode(tableNameWithType, StandardCharsets.UTF_8);
String paramsStr = generateColumnsParam(columns);
String paramsStr = generateColumnsParam("columns", columns);
return String.format("%s/tables/%s/metadata?%s", endpoint, tableNameWithType, paramsStr);
}

private String generateSegmentMetadataServerURL(String tableNameWithType, String segmentName, List<String> columns,
String endpoint) {
tableNameWithType = URLEncoder.encode(tableNameWithType, StandardCharsets.UTF_8);
segmentName = URLEncoder.encode(segmentName, StandardCharsets.UTF_8);
String paramsStr = generateColumnsParam(columns);
String paramsStr = generateColumnsParam("columns", columns);
return String.format("%s/tables/%s/segments/%s/metadata?%s", endpoint, tableNameWithType, segmentName, paramsStr);
}

public String generateTableMetadataServerURL(String tableNameWithType, List<String> columns,
Set<String> segmentsToInclude, String endpoint) {
tableNameWithType = URLEncoder.encode(tableNameWithType, StandardCharsets.UTF_8);
String paramsStr = generateColumnsParam("columns", columns)
+ generateColumnsParam("segmentsToInclude", new ArrayList<>(segmentsToInclude));
return String.format("%s/tables/%s/metadata?%s", endpoint, tableNameWithType, paramsStr);
}

private String generateCheckReloadSegmentsServerURL(String tableNameWithType, String endpoint) {
tableNameWithType = URLEncoder.encode(tableNameWithType, StandardCharsets.UTF_8);
return String.format("%s/tables/%s/segments/needReload", endpoint, tableNameWithType);
Expand Down Expand Up @@ -458,14 +466,14 @@ private Pair<String, String> generateValidDocIdsMetadataURL(String tableNameWith
return Pair.of(url, jsonTableSegments);
}

private String generateColumnsParam(List<String> columns) {
private String generateColumnsParam(String param, List<String> values) {
String paramsStr = "";
if (columns == null || columns.isEmpty()) {
if (values == null || values.isEmpty()) {
return paramsStr;
}
List<String> params = new ArrayList<>(columns.size());
for (String column : columns) {
params.add(String.format("columns=%s", column));
List<String> params = new ArrayList<>(values.size());
for (String value : values) {
params.add(String.format("%s=%s", param, value));
}
paramsStr = String.join("&", params);
return paramsStr;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,21 +100,40 @@ private JsonNode getSegmentsMetadataInternal(String tableNameWithType, List<Stri
ServerSegmentMetadataReader serverSegmentMetadataReader =
new ServerSegmentMetadataReader(_executor, _connectionManager);

// Filter segments that we need
List<String> serverURL = new java.util.ArrayList<>(List.of());
for (Map.Entry<String, List<String>> serverToSegment : serverToSegmentsMap.entrySet()) {
List<String> segments = serverToSegment.getValue();
if (segmentsToInclude != null && !segmentsToInclude.isEmpty()) {
segments.retainAll(segmentsToInclude);
}
String serverInstance = serverToSegment.getKey();
serverURL.add(serverSegmentMetadataReader.generateTableMetadataServerURL(tableNameWithType,
columns, segmentsToInclude, endpoints.get(serverInstance)));
}

List<String> segmentsMetadata =
serverSegmentMetadataReader.getSegmentMetadataFromServer(tableNameWithType, serverToSegmentsMap, endpoints,
columns, timeoutMs);
CompletionServiceHelper completionServiceHelper =
new CompletionServiceHelper(_executor, _connectionManager, endpoints);
CompletionServiceHelper.CompletionServiceResponse serviceResponse =
completionServiceHelper.doMultiGetRequest(serverURL, tableNameWithType, false, timeoutMs);

Map<String, JsonNode> response = new HashMap<>();
for (String segmentMetadata : segmentsMetadata) {
JsonNode responseJson = JsonUtils.stringToJsonNode(segmentMetadata);
response.put(responseJson.get("segmentName").asText(), responseJson);
for (Map.Entry<String, String> serverToSegmentsMetadata : serviceResponse._httpResponses.entrySet()) {
JsonNode responseJson = JsonUtils.stringToJsonNode(serverToSegmentsMetadata.getValue());
Set<String> segmentNames = new HashSet<>();
responseJson.fieldNames().forEachRemaining(segmentNames::add);
if (segmentsToInclude != null && !segmentsToInclude.isEmpty()) {
segmentNames.retainAll(segmentsToInclude);
}
for (String segmentName : segmentNames) {
JsonNode segmentJson = responseJson.get(segmentName);
response.put(segmentName, segmentJson);
}

// Iterator<Map.Entry<String, JsonNode>> fields = responseJson.fields();
// while (fields.hasNext()) {
// Map.Entry<String, JsonNode> field = fields.next();
// String segmentName = field.getKey();
// JsonNode segmentJson = field.getValue();
// if (segmentsToInclude == null || segmentsToInclude.isEmpty() || segmentsToInclude.contains(segmentName)) {
// response.put(segmentName, segmentJson);
// }
// }
}
return JsonUtils.objectToJsonNode(response);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.builder.ControllerRequestURLBuilder;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
Expand Down Expand Up @@ -169,6 +171,37 @@ public void testSegmentCrcApi()
checkCrcRequest(rawTableName, segmentMetadataTable, 9);
}

@Test
public void testGetServerMetadataAPI() throws Exception {
// Adding table and schema
String rawTableName = "serverMetadataTestTable";
Schema schema = new Schema.SchemaBuilder()
.setSchemaName(rawTableName)
.addSingleValueDimension("dimension1", FieldSpec.DataType.STRING)
.addMetric("metric1", FieldSpec.DataType.INT)
.build();
PinotHelixResourceManager resourceManager = TEST_INSTANCE.getHelixResourceManager();
resourceManager.addSchema(schema, true, false);
String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(rawTableName);
TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(rawTableName).setNumReplicas(1)
.setDeletedSegmentsRetentionPeriod("0d").build();
resourceManager.addTable(tableConfig);
// Upload Segments
Map<String, SegmentMetadata> segmentMetadataTable = new HashMap<>();
for (int i = 0; i < 2; i++) {
SegmentMetadata segmentMetadata = SegmentMetadataMockUtils.mockSegmentMetadata(rawTableName);
resourceManager.addNewSegment(offlineTableName, segmentMetadata, "downloadUrl");
segmentMetadataTable.put(segmentMetadata.getName(), segmentMetadata);
}
// check that metadata matches
String sampleSegment = segmentMetadataTable.keySet().iterator().next();
List<String> columns = List.of("dimension1");
String resp = ControllerTest.sendGetRequest(
TEST_INSTANCE.getControllerRequestURLBuilder().forSegmentsMetadataFromServer(rawTableName));
Map<String, String> fetchedMetadata = JsonUtils.stringToObject(resp, Map.class);
assertEquals(fetchedMetadata.size(), 2);
}

@Test
public void testDeleteSegmentsWithTimeWindow()
throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,9 @@ public String listTableSegments(
public String getSegmentMetadata(
@ApiParam(value = "Table Name with type", required = true) @PathParam("tableName") String tableName,
@ApiParam(value = "Column name", allowMultiple = true) @QueryParam("columns") @DefaultValue("")
List<String> columns, @Context HttpHeaders headers)
List<String> columns,
@ApiParam(value = "List of segments to fetch metadata for") @QueryParam("segmentsToInclude") @DefaultValue("")
List<String> segmentsToInclude, @Context HttpHeaders headers)
throws WebApplicationException {
tableName = DatabaseUtils.translateTableName(tableName, headers);
InstanceDataManager instanceDataManager = _serverInstance.getInstanceDataManager();
Expand Down Expand Up @@ -219,8 +221,12 @@ public String getSegmentMetadata(
}
}
Set<String> columnSet = allColumns ? null : new HashSet<>(decodedColumns);

List<SegmentDataManager> segmentDataManagers = tableDataManager.acquireAllSegments();
List<SegmentDataManager> segmentDataManagers;
if (segmentsToInclude != null && !segmentsToInclude.isEmpty()) {
segmentDataManagers = tableDataManager.acquireSegments(segmentsToInclude, new ArrayList<>());
} else {
segmentDataManagers = tableDataManager.acquireAllSegments();
}
long totalSegmentSizeBytes = 0;
long totalNumRows = 0;
Map<String, Double> columnLengthMap = new HashMap<>();
Expand Down

0 comments on commit 191ce5d

Please sign in to comment.