Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pluggable partial upsert merger #11983

Merged
merged 9 commits into from
Apr 5, 2024

Conversation

rohityadav1993
Copy link
Contributor

@rohityadav1993 rohityadav1993 commented Nov 9, 2023

feature refactor release-notes

Implements: #11174
This PR introduces the possibility to define row mergers(currently only per column merging happens) for partial upsert. This allows a table to define complex logic for merging in partial upsert table. An analogy is painless script in ES.
This is the second PR for the feature described in this design doc.

Makes PartialUpsertHandler do row merging using PartialUpsertMerger.

Based on previous feedback comment.

  • Introduces new contract to merge rows by refactoring PartialUpsertMerger as:
    public void merge(LazyRow prevRecord, GenericRow newRecord, Map<String, Object> mergerResult)
  • LazyRow introduced in add LazyRow abstraction for previously indexed record #11826 enables lazily reading previous column value only if needed by PartialUpsertMerger
  • PartialUpsertHandler initiliazes an implementation of PartialUpsertMerger using PartialUpsertMergerFactory.
  • Existing column level merger behaviour is unchanged.

Adds the possibility to define custom merging per table and specify as table config.

  • PartialUpsertMergerFactory initializes PartialUpsertColumnarMerger (this is existing columnar merging logic extracted out of PartialUpsertHandler) if column level merger strategies are defined.
  • If UpsertConfig. partialUpsertMergerClass is defined then PartialUpsertMergerFactory provides the implementation for same. e.g.
{
  "upsertConfig": {
    "mode": "PARTIAL",
    "partialUpsertMergerClass": "org.apache.pinot.segment.local.upsert.merger.PartialUpsertMyCustomMerger",
  },
  "tableIndexConfig": {
    "nullHandlingEnabled": true
  }
}

_Note: When custom partialUpsertMergerClasscu is provided then the column merger strategies will not be applied. The defaultPartialUpsertStrategy is also not applicable for any column and should be handled in the custom merge logic.

Refactors existing column mergers as implementations of PartialUpsertColumnMerger.

Based on previous feedback comment.

  • Current merger logic, which is per column merging of previous and new value is still the default behaviour and backward compatible with configs.
  • The existing mergers now implement PartialUpsertColumnMerger to differentiate from row merger: PartialUpsertMerger

Changes based on the last review:

  1. Renamed the merger class config name to: partialUpsertMergerClass
  2. Modified PartialUpsertHandler to iterate over merger result when partialUpsertMergerClass is provided.

Tests:
Unit test: Added

Functional test:

Created a test setup by replicating the quick start example: upsertPartialMeetupRsvp i.e. the partial upsert strategies were implemented as a custom partialUpsertMergerClass:

public class CustomMeetUpRSVPRowMerger extends BasePartialUpsertMerger {
  private UnionMerger _unionMerger;
  private AppendMerger _appendMerger;
  private IncrementMerger _incrementMerger;

  public CustomMeetUpRSVPRowMerger(List<String> primaryKeyColumns, List<String> comparisonColumns,
      UpsertConfig upsertConfig) {
    super(primaryKeyColumns, comparisonColumns, upsertConfig);
    _unionMerger = new UnionMerger();
    _appendMerger = new AppendMerger();
    _incrementMerger = new IncrementMerger();
  }

  @Override
  public void merge(LazyRow prevRecord, GenericRow newRecord, Map<String, Object> mergerResult) {
    // Merge the rsvp_count
    mergerResult.put("rsvp_count",
        _incrementMerger.merge(prevRecord.getValue("rsvp_count"), newRecord.getValue("rsvp_count")));

    // union group_name string array

    mergerResult.put("group_name",
        _unionMerger.merge(prevRecord.getValue("group_name"), newRecord.getValue("group_name")));

    // append venue_name string
    mergerResult.put("venue_name",
        _appendMerger.merge(prevRecord.getValue("venue_name"), newRecord.getValue("venue_name")));
  }
}

Table config:
upsertPartialCustomMergerMeetupRsvp_realtime_table_config.json

Observation:
Both partial upsert table with columnar strategies and custom merger class had same data:
upsertPartialMeetupRsvp_quer.txt
upsertPartialCustomMergerMeetupRsvp_query.txt

@codecov-commenter
Copy link

codecov-commenter commented Nov 9, 2023

Codecov Report

Attention: Patch coverage is 86.11111% with 15 lines in your changes are missing coverage. Please review.

Project coverage is 62.08%. Comparing base (59551e4) to head (edf3ab8).
Report is 212 commits behind head on master.

Files Patch % Lines
...he/pinot/segment/local/utils/TableConfigUtils.java 68.75% 1 Missing and 4 partials ⚠️
...ocal/upsert/merger/PartialUpsertMergerFactory.java 66.66% 3 Missing ⚠️
...not/segment/local/upsert/PartialUpsertHandler.java 90.47% 1 Missing and 1 partial ⚠️
...ger/columnar/PartialUpsertColumnMergerFactory.java 87.50% 1 Missing and 1 partial ⚠️
...rg/apache/pinot/spi/config/table/UpsertConfig.java 33.33% 2 Missing ⚠️
...cal/upsert/merger/PartialUpsertColumnarMerger.java 96.29% 0 Missing and 1 partial ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #11983      +/-   ##
============================================
+ Coverage     61.75%   62.08%   +0.33%     
+ Complexity      207      198       -9     
============================================
  Files          2436     2465      +29     
  Lines        133233   134846    +1613     
  Branches      20636    20834     +198     
============================================
+ Hits          82274    83720    +1446     
- Misses        44911    44954      +43     
- Partials       6048     6172     +124     
Flag Coverage Δ
custom-integration1 <0.01% <0.00%> (-0.01%) ⬇️
integration <0.01% <0.00%> (-0.01%) ⬇️
integration1 <0.01% <0.00%> (-0.01%) ⬇️
integration2 0.00% <0.00%> (ø)
java-11 61.98% <86.11%> (+0.27%) ⬆️
java-21 61.93% <86.11%> (+0.30%) ⬆️
skip-bytebuffers-false 62.04% <86.11%> (+0.29%) ⬆️
skip-bytebuffers-true 61.87% <86.11%> (+34.14%) ⬆️
temurin 62.08% <86.11%> (+0.33%) ⬆️
unittests 62.08% <86.11%> (+0.33%) ⬆️
unittests1 46.71% <0.92%> (-0.18%) ⬇️
unittests2 28.08% <85.18%> (+0.34%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@Jackie-Jiang Jackie-Jiang added feature release-notes Referenced by PRs that need attention when compiling the next release notes Configuration Config changes (addition/deletion/change in behavior) upsert labels Nov 14, 2023
@Jackie-Jiang
Copy link
Contributor

Please add some description to the PR, including what support is added, how to configure it etc.

@rohityadav1993 rohityadav1993 marked this pull request as ready for review November 14, 2023 08:14
@rohityadav1993 rohityadav1993 marked this pull request as draft November 15, 2023 11:01
@rohityadav1993 rohityadav1993 force-pushed the custom_merger branch 2 times, most recently from 3e4f005 to 7ec1c8a Compare November 15, 2023 14:47
@rohityadav1993 rohityadav1993 changed the title Custom merger pluggable partial upsert merger Nov 17, 2023
@rohityadav1993 rohityadav1993 marked this pull request as ready for review November 17, 2023 18:34
@rohityadav1993
Copy link
Contributor Author

Please add some description to the PR, including what support is added, how to configure it etc.

@Jackie-Jiang, please review, added the necessary details.

@@ -106,8 +106,8 @@ public void init(TableConfig tableConfig, Schema schema, TableDataManager tableD
Preconditions.checkArgument(partialUpsertStrategies != null,
"Partial-upsert strategies must be configured for partial-upsert enabled table: %s", _tableNameWithType);
_partialUpsertHandler =
new PartialUpsertHandler(schema, partialUpsertStrategies, upsertConfig.getDefaultPartialUpsertStrategy(),
_comparisonColumns);
new PartialUpsertHandler(schema, tableConfig.getValidationConfig().getTimeColumnName(), _comparisonColumns,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we get _comparisonColumns from upsertConfig?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not pass in time column. We can pass in _comparisonColumns and upsertConfig

@@ -106,8 +106,8 @@ public void init(TableConfig tableConfig, Schema schema, TableDataManager tableD
Preconditions.checkArgument(partialUpsertStrategies != null,
"Partial-upsert strategies must be configured for partial-upsert enabled table: %s", _tableNameWithType);
_partialUpsertHandler =
new PartialUpsertHandler(schema, partialUpsertStrategies, upsertConfig.getDefaultPartialUpsertStrategy(),
_comparisonColumns);
new PartialUpsertHandler(schema, tableConfig.getValidationConfig().getTimeColumnName(), _comparisonColumns,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not pass in time column. We can pass in _comparisonColumns and upsertConfig

@@ -307,7 +309,7 @@ protected GenericRow doUpdateRecord(GenericRow record, RecordInfo recordInfo) {
int currentDocId = recordLocation.getDocId();
if (currentQueryableDocIds == null || currentQueryableDocIds.contains(currentDocId)) {
_reusePreviousRow.init(currentSegment, currentDocId);
_partialUpsertHandler.merge(_reusePreviousRow, record);
_partialUpsertHandler.merge(_reusePreviousRow, record, _reuseMergerResult);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel this reusable map should be maintained within the PartialUpsertHandler. I don't see why it should be part of the interface

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The_reuseMergerResult map is defined in the context of a partition for a consuming segment. If we move it to PartialUpsertHandler then we need to modify PartialUpsertHandler to be initialised in BasePartitionUpsertMetadataManager instead of BaseTableUpsertMetadataManager. Moreover the _reuseMergerResult would have to be made threadsafe across consuming segments.

@rohityadav1993
Copy link
Contributor Author

The integration test failure is in delete table flow which is unrelated to the proposed changes.

// PartialUpsertColumnMerger already handles default merger but for any custom implementations
// non merged columns need to be applied with default merger
newRecord.putValue(column,
_defaultPartialUpsertMerger.merge(prevRecord.getValue(column), newRecord.getValue(column)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default strategy is columnar based. Do we want to apply it for other mergers? I feel it is more intuitive if all the merge logic is handled within the merger

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we do not handle default merge here, we may iterate over the merger result to reduce map lookups

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, this was done with intention to keep the logic in the merger class minimal. But the optmization on map iteration is a better option. Let me update this.

@@ -918,32 +918,43 @@ static void validatePartialUpsertStrategies(TableConfig tableConfig, Schema sche
UpsertConfig upsertConfig = tableConfig.getUpsertConfig();
assert upsertConfig != null;
Map<String, UpsertConfig.Strategy> partialUpsertStrategies = upsertConfig.getPartialUpsertStrategies();
String partialUpsertMergerClass = upsertConfig.getPartialUpsertMergerClass();

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Jackie-Jiang , @deemoliu , we may want to disallow defualtMergerStrategy to be defined when using partialUpsertMergerClass. This will require us to modify UpsertConfig.class default initialization: private Strategy _defaultPartialUpsertStrategy = Strategy.OVERWRITE; And handle this initilization in POST /tableConfigs

@rohityadav1993
Copy link
Contributor Author

@Jackie-Jiang @deemoliu , please review the updated changes.

"Partial-upsert strategies must be configured for partial-upsert enabled table: %s", _tableNameWithType);
partialUpsertHandler =
new PartialUpsertHandler(schema, partialUpsertStrategies, upsertConfig.getDefaultPartialUpsertStrategy(),
comparisonColumns);
new PartialUpsertHandler(schema, comparisonColumns, upsertConfig);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(format) Please auto reformat all the changes with Pinot Style

"Partial-upsert strategies must be configured for partial-upsert enabled table: %s", _tableNameWithType);
partialUpsertHandler =
new PartialUpsertHandler(schema, partialUpsertStrategies, upsertConfig.getDefaultPartialUpsertStrategy(),
comparisonColumns);
new PartialUpsertHandler(schema, comparisonColumns, upsertConfig);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(format) Please auto-reformat all the changes with Pinot Style

Comment on lines 65 to 67
String rowMergerCustomImplementation = upsertConfig.getPartialUpsertMergerClass();
Preconditions.checkArgument(
StringUtils.isNotBlank(rowMergerCustomImplementation) || partialUpsertStrategies != null,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(minor) These checks can be pushed down to the constructor of PartialUpsertHandler

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the checks are moved down to PartialUpsertHandler, the create table API response is success but the table goes in error state. This check would pro-actively fail table creation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've applied a commit to demonstrate the idea. The exception will be thrown from the constructor

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, thank you Jackie.

}
}
public void merge(LazyRow prevRecord, GenericRow newRecord, Map<String, Object> reuseMergerResult) {
reuseMergerResult.clear();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(minor) Clear the result on the caller side

// merger current row with previously indexed row
_partialUpsertMerger.merge(prevRecord, newRecord, reuseMergerResult);

if (_partialUpsertMerger instanceof PartialUpsertColumnarMerger) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to differentiate PartialUpsertColumnarMerger and custom merger? The logic should be the same?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to keep the logic for PartialUpsertColumnarMerger(behaviour so far) to be unmodified and was doing null handling wrongly for custom merger. I have removed the differentiation and using putDefaultNullValue() for null handling based on another review comment. We will never have a null merger result from PartialUpsertColumnarMerger.

} else {
// if column exists but mapped to a null value then merger result was a null value
newRecord.addNullValueField(column);
newRecord.putValue(column, null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(MAJOR) This won't work. In order to set a value to null, you'll need to call putDefaultNullValue() with a default null value. Do you see a scenario where you want to explicitly set a value to null?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might not be fully aware of the null value support feature, let me update it with putDefaultNullValue(). Thank you. I don't forsee a scenario for null value.

Comment on lines 55 to 57
} catch (ClassNotFoundException
| NoSuchMethodException | InstantiationException | IllegalAccessException
| InvocationTargetException e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(minor) These can be merged as Exception e

if (StringUtils.isNotBlank(customRowMerger)) {
try {
Class<?> partialUpsertMergerClass = Class.forName(customRowMerger);
if (!BasePartialUpsertMerger.class.isAssignableFrom(partialUpsertMergerClass)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does it have to extend BasePartialUpsertMerger? Implementing PartialUpsertMerger should be good right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BasePartialUpsertMerger defines a constructor which is be used to initialize a custom merger class by overriding the constructor:
(PartialUpsertMerger) partialUpsertMergerClass.getConstructor(List.class, List.class, UpsertConfig.class) .newInstance(primaryKeyColumns, comparisonColumns, upsertConfig);

Copy link
Contributor

@Jackie-Jiang Jackie-Jiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Applied a commit for some minor cleanups

@Jackie-Jiang Jackie-Jiang merged commit 01df14b into apache:master Apr 5, 2024
19 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Configuration Config changes (addition/deletion/change in behavior) feature release-notes Referenced by PRs that need attention when compiling the next release notes upsert
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants