From 2b1a750e18a279c266e663c4fcd79a306dd46205 Mon Sep 17 00:00:00 2001 From: Rohit Yadav Date: Tue, 30 Apr 2024 12:52:03 +0530 Subject: [PATCH 1/2] fix merging null multi value in partial upsert Update PartialUpsertHandler.java --- .../pinot/segment/local/upsert/PartialUpsertHandler.java | 8 +++++++- .../segment/local/upsert/PartialUpsertHandlerTest.java | 6 +++++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java index 118412ab772..51f3886a8c5 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java @@ -84,7 +84,13 @@ private void setMergedValue(GenericRow row, String column, @Nullable Object merg row.putValue(column, mergedValue); } else { // if column exists but mapped to a null value then merger result was a null value - row.putDefaultNullValue(column, _fieldSpecMap.get(column).getDefaultNullValue()); + if (_fieldSpecMap.get(column).isSingleValueField()) { + row.putDefaultNullValue(column, _fieldSpecMap.get(column).getDefaultNullValue()); + } else { + // multivalue must necessarily use null or MutableSegmentImpl.updateDictionary fails due to typecasting + // primitive to array + row.putDefaultNullValue(column, null); + } } } } diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java index 4b954aa1400..7c18dc0b1f8 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java @@ -86,8 +86,10 @@ public void testCustomPartialUpsertMergerWithNullResult() { newRowData.put("hoursSinceEpoch", null); // testing null comparison column GenericRow newRecord = initGenericRow(new GenericRow(), newRowData); LazyRow prevRecord = mock(LazyRow.class); - mockLazyRow(prevRecord, Map.of("pk", "pk1", "field1", 5L, "field2", "set", "hoursSinceEpoch", 2L)); + mockLazyRow(prevRecord, + Map.of("pk", "pk1", "field1", 5L, "field2", "set", "field3", new Integer[]{0}, "hoursSinceEpoch", 2L)); Map expectedData = new HashMap<>(Map.of("pk", "pk1", "field2", "reset", "hoursSinceEpoch", 2L)); + expectedData.put("field3", null); expectedData.put("field1", Long.MIN_VALUE); GenericRow expectedRecord = initGenericRow(new GenericRow(), expectedData); expectedRecord.addNullValueField("field1"); @@ -138,6 +140,7 @@ private void testCustomMerge(LazyRow prevRecord, GenericRow newRecord, GenericRo Schema schema = new Schema.SchemaBuilder().addSingleValueDimension("pk", FieldSpec.DataType.STRING) .addSingleValueDimension("field1", FieldSpec.DataType.LONG) .addSingleValueDimension("field2", FieldSpec.DataType.STRING) + .addMultiValueDimension("field3", FieldSpec.DataType.INT) .addDateTime("hoursSinceEpoch", FieldSpec.DataType.LONG, "1:HOURS:EPOCH", "1:HOURS") .setPrimaryKeyColumns(Arrays.asList("pk")).build(); @@ -169,6 +172,7 @@ public PartialUpsertMerger getCustomMerger() { } if ((newRow.getValue("field2")).equals("reset")) { resultHolder.put("field1", null); + resultHolder.put("field3", null); } }; } From b46ba2d1b8bdb665102a929de6d989d185cea7b9 Mon Sep 17 00:00:00 2001 From: Rohit Yadav Date: Thu, 2 May 2024 00:42:28 +0530 Subject: [PATCH 2/2] address review comments --- .../local/upsert/PartialUpsertHandler.java | 22 ++++++++++++------- .../upsert/PartialUpsertHandlerTest.java | 5 +++-- 2 files changed, 17 insertions(+), 10 deletions(-) diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java index 51f3886a8c5..ad73de9d70f 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.segment.local.upsert; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.TreeMap; @@ -47,12 +48,24 @@ public class PartialUpsertHandler { private final TreeMap _fieldSpecMap; private final PartialUpsertMerger _partialUpsertMerger; + private final Map _defaultNullValues = new HashMap<>(); + public PartialUpsertHandler(Schema schema, List comparisonColumns, UpsertConfig upsertConfig) { _primaryKeyColumns = schema.getPrimaryKeyColumns(); _comparisonColumns = comparisonColumns; _fieldSpecMap = schema.getFieldSpecMap(); _partialUpsertMerger = PartialUpsertMergerFactory.getPartialUpsertMerger(_primaryKeyColumns, comparisonColumns, upsertConfig); + // cache default null values to handle null merger results + for (Map.Entry entry : schema.getFieldSpecMap().entrySet()) { + String column = entry.getKey(); + FieldSpec fieldSpec = entry.getValue(); + if (fieldSpec.isSingleValueField()) { + _defaultNullValues.put(column, fieldSpec.getDefaultNullValue()); + } else { + _defaultNullValues.put(column, new Object[]{fieldSpec.getDefaultNullValue()}); + } + } } public void merge(LazyRow previousRow, GenericRow newRow, Map resultHolder) { @@ -83,14 +96,7 @@ private void setMergedValue(GenericRow row, String column, @Nullable Object merg row.removeNullValueField(column); row.putValue(column, mergedValue); } else { - // if column exists but mapped to a null value then merger result was a null value - if (_fieldSpecMap.get(column).isSingleValueField()) { - row.putDefaultNullValue(column, _fieldSpecMap.get(column).getDefaultNullValue()); - } else { - // multivalue must necessarily use null or MutableSegmentImpl.updateDictionary fails due to typecasting - // primitive to array - row.putDefaultNullValue(column, null); - } + row.putDefaultNullValue(column, _defaultNullValues.get(column)); } } } diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java index 7c18dc0b1f8..fc8fdbdefbe 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java @@ -88,11 +88,12 @@ public void testCustomPartialUpsertMergerWithNullResult() { LazyRow prevRecord = mock(LazyRow.class); mockLazyRow(prevRecord, Map.of("pk", "pk1", "field1", 5L, "field2", "set", "field3", new Integer[]{0}, "hoursSinceEpoch", 2L)); - Map expectedData = new HashMap<>(Map.of("pk", "pk1", "field2", "reset", "hoursSinceEpoch", 2L)); - expectedData.put("field3", null); + Map expectedData = new HashMap<>( + Map.of("pk", "pk1", "field2", "reset", "hoursSinceEpoch", 2L)); expectedData.put("field1", Long.MIN_VALUE); GenericRow expectedRecord = initGenericRow(new GenericRow(), expectedData); expectedRecord.addNullValueField("field1"); + expectedRecord.putDefaultNullValue("field3", new Object[]{Integer.MIN_VALUE}); testCustomMerge(prevRecord, newRecord, expectedRecord, getCustomMerger()); }