Skip to content

Commit

Permalink
address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
rohityadav1993 committed May 1, 2024
1 parent 2b1a750 commit b46ba2d
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pinot.segment.local.upsert;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
Expand Down Expand Up @@ -47,12 +48,24 @@ public class PartialUpsertHandler {
private final TreeMap<String, FieldSpec> _fieldSpecMap;
private final PartialUpsertMerger _partialUpsertMerger;

private final Map<String, Object> _defaultNullValues = new HashMap<>();

public PartialUpsertHandler(Schema schema, List<String> comparisonColumns, UpsertConfig upsertConfig) {
_primaryKeyColumns = schema.getPrimaryKeyColumns();
_comparisonColumns = comparisonColumns;
_fieldSpecMap = schema.getFieldSpecMap();
_partialUpsertMerger =
PartialUpsertMergerFactory.getPartialUpsertMerger(_primaryKeyColumns, comparisonColumns, upsertConfig);
// cache default null values to handle null merger results
for (Map.Entry<String, FieldSpec> entry : schema.getFieldSpecMap().entrySet()) {
String column = entry.getKey();
FieldSpec fieldSpec = entry.getValue();
if (fieldSpec.isSingleValueField()) {
_defaultNullValues.put(column, fieldSpec.getDefaultNullValue());
} else {
_defaultNullValues.put(column, new Object[]{fieldSpec.getDefaultNullValue()});
}
}
}

public void merge(LazyRow previousRow, GenericRow newRow, Map<String, Object> resultHolder) {
Expand Down Expand Up @@ -83,14 +96,7 @@ private void setMergedValue(GenericRow row, String column, @Nullable Object merg
row.removeNullValueField(column);
row.putValue(column, mergedValue);
} else {
// if column exists but mapped to a null value then merger result was a null value
if (_fieldSpecMap.get(column).isSingleValueField()) {
row.putDefaultNullValue(column, _fieldSpecMap.get(column).getDefaultNullValue());
} else {
// multivalue must necessarily use null or MutableSegmentImpl.updateDictionary fails due to typecasting
// primitive to array
row.putDefaultNullValue(column, null);
}
row.putDefaultNullValue(column, _defaultNullValues.get(column));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,12 @@ public void testCustomPartialUpsertMergerWithNullResult() {
LazyRow prevRecord = mock(LazyRow.class);
mockLazyRow(prevRecord,
Map.of("pk", "pk1", "field1", 5L, "field2", "set", "field3", new Integer[]{0}, "hoursSinceEpoch", 2L));
Map<String, Object> expectedData = new HashMap<>(Map.of("pk", "pk1", "field2", "reset", "hoursSinceEpoch", 2L));
expectedData.put("field3", null);
Map<String, Object> expectedData = new HashMap<>(
Map.of("pk", "pk1", "field2", "reset", "hoursSinceEpoch", 2L));
expectedData.put("field1", Long.MIN_VALUE);
GenericRow expectedRecord = initGenericRow(new GenericRow(), expectedData);
expectedRecord.addNullValueField("field1");
expectedRecord.putDefaultNullValue("field3", new Object[]{Integer.MIN_VALUE});

testCustomMerge(prevRecord, newRecord, expectedRecord, getCustomMerger());
}
Expand Down

0 comments on commit b46ba2d

Please sign in to comment.