Skip to content

Commit

Permalink
Record enricher (#12243)
Browse files Browse the repository at this point in the history
* Enricher interface

* Support in realtime ingestion

* Fix offline segment builder

* Add enricher to mapper

* Add columnsToExtract config

* Complete enricher pipeline usage

* Enrichment configs

* Add transform function based enrichment

* Change package name

* Change package name

* Review comments

* Add table config validation

* Review comments

* Change to fieldToFunctionMap

---------

Co-authored-by: Saurabh Dubey <saurabh.dubey@saurabhs-macbook-pro-1.tail8a064.ts.net>
Co-authored-by: Saurabh Dubey <saurabh.dubey@Saurabhs-MacBook-Pro.local>
  • Loading branch information
3 people authored Feb 27, 2024
1 parent 3267a74 commit 07daa7b
Show file tree
Hide file tree
Showing 25 changed files with 777 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
import org.apache.pinot.spi.ingestion.batch.spec.Constants;
import org.apache.pinot.spi.ingestion.segment.writer.SegmentWriter;
import org.apache.pinot.spi.recordenricher.RecordEnricherPipeline;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -77,6 +78,7 @@ public class FlinkSegmentWriter implements SegmentWriter {
private String _outputDirURI;
private Schema _schema;
private Set<String> _fieldsToRead;
private RecordEnricherPipeline _recordEnricherPipeline;
private RecordTransformer _recordTransformer;

private File _stagingDir;
Expand Down Expand Up @@ -137,6 +139,7 @@ public void init(TableConfig tableConfig, Schema schema, Map<String, String> bat

_schema = schema;
_fieldsToRead = _schema.getColumnNames();
_recordEnricherPipeline = RecordEnricherPipeline.fromTableConfig(_tableConfig);
_recordTransformer = CompositeTransformer.getDefaultTransformer(_tableConfig, _schema);
_avroSchema = SegmentProcessorAvroUtils.convertPinotSchemaToAvroSchema(_schema);
_reusableRecord = new GenericData.Record(_avroSchema);
Expand Down Expand Up @@ -172,6 +175,7 @@ private void resetBuffer()
public void collect(GenericRow row)
throws IOException {
long startTime = System.currentTimeMillis();
_recordEnricherPipeline.run(row);
GenericRow transform = _recordTransformer.transform(row);
SegmentProcessorAvroUtils.convertGenericRowToAvroRecord(transform, _reusableRecord, _fieldsToRead);
_rowCount++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.metrics.PinotMeter;
import org.apache.pinot.spi.recordenricher.RecordEnricherPipeline;
import org.apache.pinot.spi.stream.ConsumerPartitionState;
import org.apache.pinot.spi.stream.LongMsgOffset;
import org.apache.pinot.spi.stream.MessageBatch;
Expand Down Expand Up @@ -272,6 +273,7 @@ public void deleteSegmentFile() {
private final int _partitionGroupId;
private final PartitionGroupConsumptionStatus _partitionGroupConsumptionStatus;
final String _clientId;
private final RecordEnricherPipeline _recordEnricherPipeline;
private final TransformPipeline _transformPipeline;
private PartitionGroupConsumer _partitionGroupConsumer = null;
private StreamMetadataProvider _partitionMetadataProvider = null;
Expand Down Expand Up @@ -575,6 +577,7 @@ private boolean processStreamEvents(MessageBatch messagesAndOffsets, long idlePi
_numRowsErrored++;
} else {
try {
_recordEnricherPipeline.run(decodedRow.getResult());
_transformPipeline.processRow(decodedRow.getResult(), reusedResult);
} catch (Exception e) {
_numRowsErrored++;
Expand Down Expand Up @@ -1480,6 +1483,14 @@ public RealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableConf
new SegmentErrorInfo(now(), "Failed to initialize the StreamMessageDecoder", e));
throw e;
}

try {
_recordEnricherPipeline = RecordEnricherPipeline.fromTableConfig(tableConfig);
} catch (Exception e) {
_realtimeTableDataManager.addSegmentError(_segmentNameStr,
new SegmentErrorInfo(now(), "Failed to initialize the RecordEnricherPipeline", e));
throw e;
}
_transformPipeline = new TransformPipeline(tableConfig, schema);
// Acquire semaphore to create stream consumers
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.RecordReader;
import org.apache.pinot.spi.data.readers.RecordReaderFileConfig;
import org.apache.pinot.spi.recordenricher.RecordEnricherPipeline;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -289,6 +290,7 @@ private List<File> generateSegment(Map<String, GenericRowFileManager> partitionT
GenericRowFileRecordReader recordReaderForRange = recordReader.getRecordReaderForRange(startRowId, endRowId);
SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
driver.init(generatorConfig, new RecordReaderSegmentCreationDataSource(recordReaderForRange),
RecordEnricherPipeline.getPassThroughPipeline(),
TransformPipeline.getPassThroughPipeline());
driver.build();
outputSegmentDirs.add(driver.getOutputDirectory());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.RecordReader;
import org.apache.pinot.spi.data.readers.RecordReaderFileConfig;
import org.apache.pinot.spi.recordenricher.RecordEnricherPipeline;
import org.apache.pinot.spi.utils.StringUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -68,6 +69,7 @@ public class SegmentMapper {
private final List<FieldSpec> _fieldSpecs;
private final boolean _includeNullFields;
private final int _numSortFields;
private final RecordEnricherPipeline _recordEnricherPipeline;
private final CompositeTransformer _recordTransformer;
private final TimeHandler _timeHandler;
private final Partitioner[] _partitioners;
Expand All @@ -92,6 +94,7 @@ public SegmentMapper(List<RecordReaderFileConfig> recordReaderFileConfigs,
_fieldSpecs = pair.getLeft();
_numSortFields = pair.getRight();
_includeNullFields = tableConfig.getIndexingConfig().isNullHandlingEnabled();
_recordEnricherPipeline = RecordEnricherPipeline.fromTableConfig(tableConfig);
_recordTransformer = CompositeTransformer.composeAllTransformers(_customRecordTransformers, tableConfig, schema);
_timeHandler = TimeHandlerFactory.getTimeHandler(processorConfig);
List<PartitionerConfig> partitionerConfigs = processorConfig.getPartitionerConfigs();
Expand Down Expand Up @@ -166,6 +169,7 @@ private boolean completeMapAndTransformRow(RecordReader recordReader, GenericRow
observer.accept(String.format("Doing map phase on data from RecordReader (%d out of %d)", count, totalCount));
while (recordReader.hasNext() && (_adaptiveSizeBasedWriter.canWrite())) {
reuse = recordReader.next(reuse);
_recordEnricherPipeline.run(reuse);

// TODO: Add ComplexTypeTransformer here. Currently it is not idempotent so cannot add it

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ protected void buildSegment()

TableConfig tableConfig =
new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTimeColumnName(TIME)
.setIngestionConfig(new IngestionConfig(null, null, null,
.setIngestionConfig(new IngestionConfig(null, null, null, null,
Arrays.asList(new TransformConfig(M1_V2, "Groovy({INT_COL1_V3 == null || "
+ "INT_COL1_V3 == Integer.MIN_VALUE ? INT_COL1 : INT_COL1_V3 }, INT_COL1, INT_COL1_V3)")),
null, null, null))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.plugin.record.enricher.clp;

import com.fasterxml.jackson.databind.JsonNode;
import com.yscope.clp.compressorfrontend.BuiltInVariableHandlingRuleVersions;
import com.yscope.clp.compressorfrontend.EncodedMessage;
import com.yscope.clp.compressorfrontend.MessageEncoder;
import java.io.IOException;
import java.util.List;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.recordenricher.RecordEnricher;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.sql.parsers.rewriter.ClpRewriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* Enriches the record with CLP encoded fields.
* For a column 'x', it adds three new columns to the record:
* 1. 'x_logtype' - The logtype of the encoded message
* 2. 'x_dictVars' - The dictionary variables of the encoded message
* 3. 'x_encodedVars' - The encoded variables of the encoded message
*/
public class CLPEncodingEnricher implements RecordEnricher {
private static final Logger LOGGER = LoggerFactory.getLogger(CLPEncodingEnricher.class);
private final ClpEnricherConfig _config;
private final EncodedMessage _clpEncodedMessage;
private final MessageEncoder _clpMessageEncoder;

public CLPEncodingEnricher(JsonNode enricherProperties) throws IOException {
_config = JsonUtils.jsonNodeToObject(enricherProperties, ClpEnricherConfig.class);
_clpEncodedMessage = new EncodedMessage();
_clpMessageEncoder = new MessageEncoder(BuiltInVariableHandlingRuleVersions.VariablesSchemaV2,
BuiltInVariableHandlingRuleVersions.VariableEncodingMethodsV1);
}

@Override
public List<String> getInputColumns() {
return _config.getFields();
}

@Override
public void enrich(GenericRow record) {
try {
for (String field : _config.getFields()) {
Object value = record.getValue(field);
if (value != null) {
enrichWithClpEncodedFields(field, value, record);
}
}
} catch (Exception e) {
LOGGER.error("Failed to enrich record: {}", record);
}
}

private void enrichWithClpEncodedFields(String key, Object value, GenericRow to) {
String logtype = null;
Object[] dictVars = null;
Object[] encodedVars = null;
if (null != value) {
if (value instanceof String) {
String valueAsString = (String) value;
try {
_clpMessageEncoder.encodeMessage(valueAsString, _clpEncodedMessage);
logtype = _clpEncodedMessage.getLogTypeAsString();
encodedVars = _clpEncodedMessage.getEncodedVarsAsBoxedLongs();
dictVars = _clpEncodedMessage.getDictionaryVarsAsStrings();
} catch (IOException e) {
LOGGER.error("Can't encode field with CLP. name: '{}', value: '{}', error: {}", key, valueAsString,
e.getMessage());
}
} else {
LOGGER.error("Can't encode value of type {} with CLP. name: '{}', value: '{}'",
value.getClass().getSimpleName(), key, value);
}
}

to.putValue(key + ClpRewriter.LOGTYPE_COLUMN_SUFFIX, logtype);
to.putValue(key + ClpRewriter.DICTIONARY_VARS_COLUMN_SUFFIX, dictVars);
to.putValue(key + ClpRewriter.ENCODED_VARS_COLUMN_SUFFIX, encodedVars);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.plugin.record.enricher.clp;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.auto.service.AutoService;
import java.io.IOException;
import org.apache.pinot.spi.recordenricher.RecordEnricher;
import org.apache.pinot.spi.recordenricher.RecordEnricherFactory;
import org.apache.pinot.spi.recordenricher.RecordEnricherValidationConfig;
import org.apache.pinot.spi.utils.JsonUtils;

@AutoService(RecordEnricherFactory.class)
public class CLPEncodingEnricherFactory implements RecordEnricherFactory {
private static final String ENRICHER_TYPE = "clpEnricher";
@Override
public String getEnricherType() {
return ENRICHER_TYPE;
}

@Override
public RecordEnricher createEnricher(JsonNode enricherProps)
throws IOException {
return new CLPEncodingEnricher(enricherProps);
}

@Override
public void validateEnrichmentConfig(JsonNode enricherProps, RecordEnricherValidationConfig validationConfig) {
try {
ClpEnricherConfig config = JsonUtils.jsonNodeToObject(enricherProps, ClpEnricherConfig.class);
} catch (IOException e) {
throw new IllegalArgumentException("Failed to parse clp enricher config", e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.plugin.record.enricher.clp;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.List;


/**
* Configuration for the CLP enricher.
*/
public class ClpEnricherConfig {
private final List<String> _fields;

@JsonCreator
public ClpEnricherConfig(@JsonProperty("fields") List<String> fields) {
_fields = fields;
}

public List<String> getFields() {
return _fields;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.pinot.plugin.record.enricher.function;

import com.fasterxml.jackson.databind.JsonNode;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.apache.pinot.segment.local.function.FunctionEvaluator;
import org.apache.pinot.segment.local.function.FunctionEvaluatorFactory;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.recordenricher.RecordEnricher;
import org.apache.pinot.spi.utils.JsonUtils;


/**
* Enriches the record with custom functions.
*/
public class CustomFunctionEnricher implements RecordEnricher {
private final Map<String, FunctionEvaluator> _fieldToFunctionEvaluator;
private final List<String> _fieldsToExtract;

public CustomFunctionEnricher(JsonNode enricherProps) throws IOException {
CustomFunctionEnricherConfig config = JsonUtils.jsonNodeToObject(enricherProps, CustomFunctionEnricherConfig.class);
_fieldToFunctionEvaluator = new LinkedHashMap<>();
_fieldsToExtract = new ArrayList<>();
for (Map.Entry<String, String> entry : config.getFieldToFunctionMap().entrySet()) {
String column = entry.getKey();
String function = entry.getValue();
FunctionEvaluator functionEvaluator = FunctionEvaluatorFactory.getExpressionEvaluator(function);
_fieldToFunctionEvaluator.put(column, functionEvaluator);
_fieldsToExtract.addAll(functionEvaluator.getArguments());
}
}

@Override
public List<String> getInputColumns() {
return _fieldsToExtract;
}

@Override
public void enrich(GenericRow record) {
_fieldToFunctionEvaluator.forEach((field, evaluator) -> {
record.putValue(field, evaluator.evaluate(record));
});
}
}
Loading

0 comments on commit 07daa7b

Please sign in to comment.