Skip to content

Commit

Permalink
Fix issues, rebase and resolve comments
Browse files Browse the repository at this point in the history
  • Loading branch information
lnbest0707-uber committed Oct 17, 2024
1 parent 48dc8dc commit 1af31b2
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ static long computeTotalDocumentCount(List<SegmentZKMetadata> segmentsZKMetadata
@Override
public void cleanUpTask() {
LOGGER.info("Unregister all the validation metrics.");
_validationMetrics.unregisterAllMetrics();/PinotLLCRealtimeSegmentManagerTest
_validationMetrics.unregisterAllMetrics();
}

public static final class Context {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,10 @@ public static void validate(TableConfig tableConfig, @Nullable Schema schema, @N
// Only allow realtime tables with non-null stream.type and LLC consumer.type
if (tableConfig.getTableType() == TableType.REALTIME) {
List<Map<String, String>> streamConfigMaps = IngestionConfigUtils.getStreamConfigMaps(tableConfig);
if (streamConfigMaps.size() > 1) {
Preconditions.checkArgument(!tableConfig.isUpsertEnabled(),
"Multiple stream configs are not supported for upsert tables");
}
// TODO: validate stream configs in the map are identical in most fields
StreamConfig streamConfig;
for (Map<String, String> streamConfigMap : streamConfigMaps) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -685,7 +685,7 @@ public void ingestionStreamConfigsTest() {
new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName("timeColumn")
.setIngestionConfig(ingestionConfig).build();

// only 1 stream config allowed
// Multiple stream configs is allowed
try {
TableConfigUtils.validateIngestionConfig(tableConfig, null);
} catch (IllegalStateException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,17 +74,24 @@ public Exception getException() {
@Override
public Boolean call()
throws Exception {
_newPartitionGroupMetadataList.clear();
for (int i = 0; i < _streamConfigs.size(); i++) {
String clientId = PartitionGroupMetadataFetcher.class.getSimpleName() + "-"
+ _streamConfigs.get(i).getTableNameWithType() + "-" + _topicNames.get(i);
StreamConsumerFactory streamConsumerFactory = StreamConsumerFactoryProvider.create(_streamConfigs.get(i));
final int index = i;
List<PartitionGroupConsumptionStatus> topicPartitionGroupConsumptionStatusList =
_partitionGroupConsumptionStatusList.stream()
.filter(partitionGroupConsumptionStatus ->
IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId(
partitionGroupConsumptionStatus.getPartitionGroupId()) == index)
.collect(Collectors.toList());
try (
StreamMetadataProvider streamMetadataProvider =
streamConsumerFactory.createStreamMetadataProvider(clientId)) {
final int index = i;
_newPartitionGroupMetadataList.addAll(streamMetadataProvider.computePartitionGroupMetadata(clientId,
_streamConfigs.get(i),
_partitionGroupConsumptionStatusList, /*maxWaitTimeMs=*/5000).stream().map(
topicPartitionGroupConsumptionStatusList, /*maxWaitTimeMs=*/5000).stream().map(
metadata -> new PartitionGroupMetadata(
IngestionConfigUtils.getPinotPartitionIdFromStreamPartitionId(
metadata.getPartitionGroupId(), index),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ default List<PartitionGroupMetadata> computePartitionGroupMetadata(String client
// If partition group is still in progress, this value will be null
for (PartitionGroupConsumptionStatus currentPartitionGroupConsumptionStatus : partitionGroupConsumptionStatuses) {
newPartitionGroupMetadataList.add(
new PartitionGroupMetadata(currentPartitionGroupConsumptionStatus.getPartitionGroupId(),
new PartitionGroupMetadata(currentPartitionGroupConsumptionStatus.getStreamPartitionGroupId(),
currentPartitionGroupConsumptionStatus.getEndOffset()));
}
// Add PartitionGroupMetadata for new partitions
Expand Down

0 comments on commit 1af31b2

Please sign in to comment.