arguments = function.getOperands();
+
+ // Validate clpDecode's arguments
+ int numArgs = arguments.size();
+ if (numArgs < 1 || numArgs > 2) {
+ // Too few/many args for this rewriter, so do nothing and let it pass through to the clpDecode transform function
+ return;
+ }
+
+ Expression arg0 = arguments.get(0);
+ if (ExpressionType.IDENTIFIER != arg0.getType()) {
+ throw new SqlCompilationException("clpDecode: 1st argument must be a column group name (identifier).");
+ }
+ String columnGroupName = arg0.getIdentifier().getName();
+
+ Literal defaultValueLiteral = null;
+ if (numArgs > 1) {
+ Expression arg1 = arguments.get(1);
+ if (ExpressionType.LITERAL != arg1.getType()) {
+ throw new SqlCompilationException("clpDecode: 2nd argument must be a default value (literal).");
+ }
+ defaultValueLiteral = arg1.getLiteral();
+ }
+
+ // Replace the columnGroup with the individual columns
+ arguments.clear();
+ addCLPDecodeOperands(columnGroupName, defaultValueLiteral, function);
+ }
+
+ private ClpSqlSubqueryGenerationResult convertSubqueryToSql(String logtypeColumnName, String dictionaryVarsColumnName,
+ String encodedVarsColumnName, String wildcardQuery, int subqueryIdx, EightByteClpEncodedSubquery[] subqueries) {
+ EightByteClpEncodedSubquery subquery = subqueries[subqueryIdx];
+
+ if (!subquery.containsVariables()) {
+ Function f = createLogtypeMatchFunction(logtypeColumnName, subquery.getLogtypeQueryAsString(),
+ subquery.logtypeQueryContainsWildcards());
+ return new ClpSqlSubqueryGenerationResult(false, f);
+ }
+
+ Function subqueryFunc = new Function(SqlKind.AND.name());
+
+ Expression e;
+
+ // Add logtype query
+ Function f = createLogtypeMatchFunction(logtypeColumnName, subquery.getLogtypeQueryAsString(),
+ subquery.logtypeQueryContainsWildcards());
+ e = new Expression(ExpressionType.FUNCTION);
+ e.setFunctionCall(f);
+ subqueryFunc.addToOperands(e);
+
+ // Add any dictionary variables
+ int numDictVars = 0;
+ for (ByteSegment dictVar : subquery.getDictVars()) {
+ f = createStringColumnMatchFunction(SqlKind.EQUALS.name(), dictionaryVarsColumnName, dictVar.toString());
+ e = new Expression(ExpressionType.FUNCTION);
+ e.setFunctionCall(f);
+ subqueryFunc.addToOperands(e);
+
+ ++numDictVars;
+ }
+
+ // Add any encoded variables
+ int numEncodedVars = 0;
+ for (long encodedVar : subquery.getEncodedVars()) {
+ f = new Function(SqlKind.EQUALS.name());
+ f.addToOperands(RequestUtils.getIdentifierExpression(encodedVarsColumnName));
+ f.addToOperands(RequestUtils.getLiteralExpression(encodedVar));
+
+ e = new Expression(ExpressionType.FUNCTION);
+ e.setFunctionCall(f);
+ subqueryFunc.addToOperands(e);
+
+ ++numEncodedVars;
+ }
+
+ // Add any wildcard dictionary variables
+ for (VariableWildcardQuery varWildcardQuery : subquery.getDictVarWildcardQueries()) {
+ f = createStringColumnMatchFunction(_REGEXP_LIKE_LOWERCASE_FUNCTION_NAME, dictionaryVarsColumnName,
+ wildcardQueryToRegex(varWildcardQuery.getQuery().toString()));
+ e = new Expression(ExpressionType.FUNCTION);
+ e.setFunctionCall(f);
+ subqueryFunc.addToOperands(e);
+
+ ++numDictVars;
+ }
+
+ // Add any wildcard encoded variables
+ int numEncodedVarWildcardQueries = subquery.getNumEncodedVarWildcardQueries();
+ numEncodedVars += numEncodedVarWildcardQueries;
+ if (numEncodedVarWildcardQueries > 0) {
+ // Create call to clpEncodedVarsMatch
+ Expression clpEncodedVarsExp = RequestUtils.getFunctionExpression(
+ RequestUtils.canonicalizeFunctionNamePreservingSpecialKey(
+ TransformFunctionType.CLP_ENCODED_VARS_MATCH.getName()));
+ f = clpEncodedVarsExp.getFunctionCall();
+ f.addToOperands(RequestUtils.getIdentifierExpression(logtypeColumnName));
+ f.addToOperands(RequestUtils.getIdentifierExpression(encodedVarsColumnName));
+ f.addToOperands(RequestUtils.getLiteralExpression(wildcardQuery));
+ f.addToOperands(RequestUtils.getLiteralExpression(subqueryIdx));
+
+ // Create `clpEncodedVarsMatch(...) = true`
+ e = RequestUtils.getFunctionExpression(SqlKind.EQUALS.name());
+ f = e.getFunctionCall();
+ f.addToOperands(clpEncodedVarsExp);
+ f.addToOperands(RequestUtils.getLiteralExpression(true));
+
+ subqueryFunc.addToOperands(e);
+ }
+
+ // We require a decompress and match in the following cases:
+ // 1. There are >1 variables of a specific type (dict/encoded) in the query. Consider this query: " dv123 dv456 ".
+ // The corresponding SQL will look like:
+ // "x_logtype = '...' AND x_dictionaryVars = 'dv123' AND x_dictionaryVars = 'dv456'"
+ // This SQL will indeed match values which also match the query; but this SQL will also match values which
+ // *don't* match the query, like " dv456 dv123 ". This is because the SQL query doesn't encode the position of
+ // the variables.
+ // 2. There is no more than 1 variable of each type, but the logtype query contains wildcards. Consider this query:
+ // "user dv123 *". The corresponding SQL will look like:
+ // "REGEXP_LIKE(x_logtype, "user: \dv .*") AND x_dictionaryVars = 'dv123'",
+ // where "\dv" is a dictionary variable placeholder. This SQL could match the
+ // value "user dv123 joined" but it could also match "user dv456 joined dv123".
+ boolean requiresDecompAndMatch =
+ !(numDictVars < 2 && numEncodedVars < 2 && !subquery.logtypeQueryContainsWildcards());
+ return new ClpSqlSubqueryGenerationResult(requiresDecompAndMatch, subqueryFunc);
+ }
+
+ private Function createLogtypeMatchFunction(String columnName, String query, boolean containsWildcards) {
+ String funcName;
+ String funcQuery;
+ if (containsWildcards) {
+ funcName = _REGEXP_LIKE_LOWERCASE_FUNCTION_NAME;
+ funcQuery = wildcardQueryToRegex(query);
+ } else {
+ funcName = SqlKind.EQUALS.name();
+ funcQuery = query;
+ }
+ return createStringColumnMatchFunction(funcName, columnName, funcQuery);
+ }
+
+ private Function createStringColumnMatchFunction(String canonicalName, String columnName, String query) {
+ Function func = new Function(canonicalName);
+ func.addToOperands(RequestUtils.getIdentifierExpression(columnName));
+ func.addToOperands(RequestUtils.getLiteralExpression(query));
+ return func;
+ }
+
+ /**
+ * Converts a CLP-encoded column group into physical column names and adds them to the CLPDecode transform function's
+ * operands.
+ * @param columnGroupName Name of the CLP-encoded column group
+ * @param defaultValueLiteral Optional default value to pass through to the transform function
+ * @param clpDecode The function to add the operands to
+ */
+ private void addCLPDecodeOperands(String columnGroupName, @Nullable Literal defaultValueLiteral, Function clpDecode) {
+ addCLPDecodeOperands(columnGroupName + LOGTYPE_COLUMN_SUFFIX, columnGroupName + DICTIONARY_VARS_COLUMN_SUFFIX,
+ columnGroupName + ENCODED_VARS_COLUMN_SUFFIX, defaultValueLiteral, clpDecode);
+ }
+
+ /**
+ * Adds the given operands to the given CLPDecode transform function.
+ * @param logtypeColumnName
+ * @param dictionaryVarsColumnName
+ * @param encodedVarsColumnName
+ * @param defaultValueLiteral
+ * @param clpDecode
+ */
+ private void addCLPDecodeOperands(String logtypeColumnName, String dictionaryVarsColumnName,
+ String encodedVarsColumnName, @Nullable Literal defaultValueLiteral, Function clpDecode) {
+ clpDecode.addToOperands(RequestUtils.getIdentifierExpression(logtypeColumnName));
+ clpDecode.addToOperands(RequestUtils.getIdentifierExpression(dictionaryVarsColumnName));
+ clpDecode.addToOperands(RequestUtils.getIdentifierExpression(encodedVarsColumnName));
+ if (null != defaultValueLiteral) {
+ Expression e = new Expression(ExpressionType.LITERAL);
+ e.setLiteral(defaultValueLiteral);
+ clpDecode.addToOperands(e);
+ }
+ }
+
+ /**
+ * Converts a wildcard query into a regular expression. The wildcard query is a string which may contain two possible
+ * wildcards:
+ * 1. '*' that matches zero or more characters.
+ * 2. '?' that matches any single character.
+ * @param wildcardQuery
+ * @return The regular expression which matches the same values as the wildcard query.
+ */
+ private static String wildcardQueryToRegex(String wildcardQuery) {
+ boolean isEscaped = false;
+ StringBuilder queryWithSqlWildcards = new StringBuilder();
+
+ // Add begin anchor if necessary
+ if (!wildcardQuery.isEmpty() && '*' != wildcardQuery.charAt(0)) {
+ queryWithSqlWildcards.append('^');
+ }
+
+ int uncopiedIdx = 0;
+ for (int queryIdx = 0; queryIdx < wildcardQuery.length(); queryIdx++) {
+ char queryChar = wildcardQuery.charAt(queryIdx);
+ if (isEscaped) {
+ isEscaped = false;
+ } else {
+ if ('\\' == queryChar) {
+ isEscaped = true;
+ } else if (isWildcard(queryChar)) {
+ queryWithSqlWildcards.append(wildcardQuery, uncopiedIdx, queryIdx);
+ queryWithSqlWildcards.append('.');
+ uncopiedIdx = queryIdx;
+ } else {
+ for (final char metaChar : _NON_WILDCARD_REGEX_META_CHARACTERS) {
+ if (metaChar == queryChar) {
+ queryWithSqlWildcards.append(wildcardQuery, uncopiedIdx, queryIdx);
+ queryWithSqlWildcards.append('\\');
+ uncopiedIdx = queryIdx;
+ break;
+ }
+ }
+ }
+ }
+ }
+ if (uncopiedIdx < wildcardQuery.length()) {
+ queryWithSqlWildcards.append(wildcardQuery, uncopiedIdx, wildcardQuery.length());
+ }
+
+ // Add end anchor if necessary
+ if (!wildcardQuery.isEmpty() && '*' != wildcardQuery.charAt(wildcardQuery.length() - 1)) {
+ queryWithSqlWildcards.append('$');
+ }
+
+ return queryWithSqlWildcards.toString();
+ }
+
+ /**
+ * @param c
+ * @return Whether the given character is a wildcard.
+ */
+ private static boolean isWildcard(char c) {
+ return '*' == c || '?' == c;
+ }
+
+ /**
+ * Simple class to hold the result of turning a CLP subquery into SQL.
+ */
+ private static class ClpSqlSubqueryGenerationResult {
+ private final boolean _requiresDecompAndMatch;
+ private final Function _sqlFunc;
+
+ ClpSqlSubqueryGenerationResult(boolean requiresDecompAndMatch, Function sqlFunc) {
+ _requiresDecompAndMatch = requiresDecompAndMatch;
+ _sqlFunc = sqlFunc;
+ }
+
+ public boolean requiresDecompAndMatch() {
+ return _requiresDecompAndMatch;
+ }
+
+ public Function getSqlFunc() {
+ return _sqlFunc;
+ }
+ }
+}
diff --git a/pinot-common/src/test/java/org/apache/pinot/common/function/FunctionDefinitionRegistryTest.java b/pinot-common/src/test/java/org/apache/pinot/common/function/FunctionDefinitionRegistryTest.java
index ba4ba42a3ce..819c8b84c21 100644
--- a/pinot-common/src/test/java/org/apache/pinot/common/function/FunctionDefinitionRegistryTest.java
+++ b/pinot-common/src/test/java/org/apache/pinot/common/function/FunctionDefinitionRegistryTest.java
@@ -47,9 +47,9 @@ public class FunctionDefinitionRegistryTest {
// Scalar function
"scalar",
// Functions without scalar function counterpart as of now
- "arraylength", "arrayaverage", "arraymin", "arraymax", "arraysum", "clpdecode", "groovy", "inidset",
- "jsonextractscalar", "jsonextractindex", "jsonextractkey", "lookup", "mapvalue", "timeconvert", "valuein",
- "datetimeconvertwindowhop",
+ "arraylength", "arrayaverage", "arraymin", "arraymax", "arraysum", "clpdecode", "clpencodedvarsmatch", "groovy",
+ "inidset", "jsonextractscalar", "jsonextractindex", "jsonextractkey", "lookup", "mapvalue", "timeconvert",
+ "valuein", "datetimeconvertwindowhop",
// functions not needed for register b/c they are in std sql table or they will not be composed directly.
"in", "not_in", "and", "or", "range", "extract", "is_true", "is_not_true", "is_false", "is_not_false"
);
diff --git a/pinot-common/src/test/java/org/apache/pinot/sql/parsers/rewriter/CLPDecodeRewriterTest.java b/pinot-common/src/test/java/org/apache/pinot/sql/parsers/rewriter/CLPDecodeRewriterTest.java
deleted file mode 100644
index e6bdb8dff6a..00000000000
--- a/pinot-common/src/test/java/org/apache/pinot/sql/parsers/rewriter/CLPDecodeRewriterTest.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.sql.parsers.rewriter;
-
-import org.apache.pinot.sql.parsers.CalciteSqlParser;
-import org.apache.pinot.sql.parsers.SqlCompilationException;
-import org.testng.annotations.Test;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertThrows;
-
-
-public class CLPDecodeRewriterTest {
- private static final QueryRewriter _QUERY_REWRITER = new CLPDecodeRewriter();
-
- @Test
- public void testCLPDecodeRewrite() {
- // clpDecode rewrite from column group to individual columns
- testQueryRewrite("SELECT clpDecode(message) FROM clpTable",
- "SELECT clpDecode(message_logtype, message_dictionaryVars, message_encodedVars) FROM clpTable");
- testQueryRewrite("SELECT clpDecode(message, 'null') FROM clpTable",
- "SELECT clpDecode(message_logtype, message_dictionaryVars, message_encodedVars, 'null') FROM clpTable");
-
- // clpDecode passthrough
- testQueryRewrite("SELECT clpDecode(message_logtype, message_dictionaryVars, message_encodedVars) FROM clpTable",
- "SELECT clpDecode(message_logtype, message_dictionaryVars, message_encodedVars) FROM clpTable");
- testQueryRewrite(
- "SELECT clpDecode(message_logtype, message_dictionaryVars, message_encodedVars, 'null') FROM clpTable",
- "SELECT clpDecode(message_logtype, message_dictionaryVars, message_encodedVars, 'null') FROM clpTable");
- }
-
- @Test
- public void testUnsupportedCLPDecodeQueries() {
- testUnsupportedQuery("SELECT clpDecode('message') FROM clpTable");
- testUnsupportedQuery("SELECT clpDecode('message', 'default') FROM clpTable");
- testUnsupportedQuery("SELECT clpDecode('message', default) FROM clpTable");
- testUnsupportedQuery("SELECT clpDecode(message, default) FROM clpTable");
- }
-
- private void testQueryRewrite(String original, String expected) {
- assertEquals(_QUERY_REWRITER.rewrite(CalciteSqlParser.compileToPinotQuery(original)),
- CalciteSqlParser.compileToPinotQuery(expected));
- }
-
- private void testUnsupportedQuery(String query) {
- assertThrows(SqlCompilationException.class,
- () -> _QUERY_REWRITER.rewrite(CalciteSqlParser.compileToPinotQuery(query)));
- }
-}
diff --git a/pinot-common/src/test/java/org/apache/pinot/sql/parsers/rewriter/ClpRewriterTest.java b/pinot-common/src/test/java/org/apache/pinot/sql/parsers/rewriter/ClpRewriterTest.java
new file mode 100644
index 00000000000..987f4e449bb
--- /dev/null
+++ b/pinot-common/src/test/java/org/apache/pinot/sql/parsers/rewriter/ClpRewriterTest.java
@@ -0,0 +1,282 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.sql.parsers.rewriter;
+
+import com.yscope.clp.compressorfrontend.BuiltInVariableHandlingRuleVersions;
+import com.yscope.clp.compressorfrontend.EncodedMessage;
+import com.yscope.clp.compressorfrontend.MessageEncoder;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.pinot.common.request.Expression;
+import org.apache.pinot.common.request.Function;
+import org.apache.pinot.common.request.PinotQuery;
+import org.apache.pinot.sql.parsers.CalciteSqlParser;
+import org.apache.pinot.sql.parsers.SqlCompilationException;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertThrows;
+import static org.testng.Assert.fail;
+
+
+public class ClpRewriterTest {
+ private static final QueryRewriter _QUERY_REWRITER = new ClpRewriter();
+
+ @Test
+ public void testCLPDecodeRewrite() {
+ // clpDecode rewrite from column group to individual columns
+ testQueryRewrite("SELECT clpDecode(message) FROM clpTable",
+ "SELECT clpDecode(message_logtype, message_dictionaryVars, message_encodedVars) FROM clpTable");
+ testQueryRewrite("SELECT clpDecode(message, 'null') FROM clpTable",
+ "SELECT clpDecode(message_logtype, message_dictionaryVars, message_encodedVars, 'null') FROM clpTable");
+
+ // clpDecode passthrough
+ testQueryRewrite("SELECT clpDecode(message_logtype, message_dictionaryVars, message_encodedVars) FROM clpTable",
+ "SELECT clpDecode(message_logtype, message_dictionaryVars, message_encodedVars) FROM clpTable");
+ testQueryRewrite(
+ "SELECT clpDecode(message_logtype, message_dictionaryVars, message_encodedVars, 'null') FROM clpTable",
+ "SELECT clpDecode(message_logtype, message_dictionaryVars, message_encodedVars, 'null') FROM clpTable");
+ }
+
+ @Test
+ public void testClpMatchRewrite() {
+ MessageEncoder encoder = new MessageEncoder(BuiltInVariableHandlingRuleVersions.VariablesSchemaV2,
+ BuiltInVariableHandlingRuleVersions.VariableEncodingMethodsV1);
+ EncodedMessage encodedMessage = new EncodedMessage();
+ try {
+ String message;
+ String[] dictionaryVars;
+ Long[] encodedVars;
+
+ // Query with no wildcards and no variables
+ message = " INFO container ";
+ encoder.encodeMessage(message, encodedMessage);
+ testQueryRewrite(
+ String.format("SELECT * FROM clpTable WHERE clpMatch(message, '%s')", message),
+ String.format("SELECT * FROM clpTable WHERE message_logtype = '%s'", encodedMessage.getLogTypeAsString())
+ );
+
+ // Query with no wildcards and no variables using individual column names
+ testQueryRewrite(
+ String.format("SELECT * FROM clpTable WHERE clpMatch(message_logtype, message_dictionaryVars,"
+ + " message_encodedVars, '%s')", message),
+ String.format("SELECT * FROM clpTable WHERE message_logtype = '%s'", encodedMessage.getLogTypeAsString())
+ );
+
+ // Query with wildcards and no variables
+ message = " INFO container ";
+ encoder.encodeMessage(message, encodedMessage);
+ testQueryRewrite(
+ String.format("SELECT * FROM clpTable WHERE clpMatch(message, '*%s*')", message),
+ String.format("SELECT * FROM clpTable WHERE REGEXP_LIKE(message_logtype, '.*%s.*')",
+ encodedMessage.getLogTypeAsString())
+ );
+
+ // Query with no wildcards and a single dictionary var
+ message = " var123 ";
+ encoder.encodeMessage(message, encodedMessage);
+ dictionaryVars = encodedMessage.getDictionaryVarsAsStrings();
+ testQueryRewrite(
+ String.format("SELECT * FROM clpTable WHERE clpMatch(message, '%s')", message),
+ String.format("SELECT * FROM clpTable WHERE message_logtype = '%s' AND message_dictionaryVars = '%s'",
+ encodedMessage.getLogTypeAsString(), dictionaryVars[0])
+ );
+
+ // Query with no wildcards and a single encoded var
+ message = " 123 ";
+ encoder.encodeMessage(message, encodedMessage);
+ encodedVars = encodedMessage.getEncodedVarsAsBoxedLongs();
+ testQueryRewrite(
+ String.format("SELECT * FROM clpTable WHERE clpMatch(message, '%s')", message),
+ String.format("SELECT * FROM clpTable WHERE message_logtype = '%s' AND message_encodedVars = %s",
+ encodedMessage.getLogTypeAsString(), encodedVars[0])
+ );
+
+ // Query with no wildcards and multiple dictionary vars
+ message = " var123 var456 ";
+ encoder.encodeMessage(message, encodedMessage);
+ dictionaryVars = encodedMessage.getDictionaryVarsAsStrings();
+ testQueryRewrite(
+ String.format("SELECT * FROM clpTable WHERE clpMatch(message, '%s')", message),
+ String.format("SELECT * FROM clpTable WHERE message_logtype = '%s' AND message_dictionaryVars = '%s'"
+ + " AND message_dictionaryVars = '%s'"
+ + " AND REGEXP_LIKE(clpdecode(message_logtype, message_dictionaryVars, message_encodedVars, ''),"
+ + " '%s')",
+ encodedMessage.getLogTypeAsString(), dictionaryVars[0], dictionaryVars[1], String.format("^%s$", message))
+ );
+
+ // Query with no wildcards and multiple encoded vars
+ message = " 123 456 ";
+ encoder.encodeMessage(message, encodedMessage);
+ encodedVars = encodedMessage.getEncodedVarsAsBoxedLongs();
+ testQueryRewrite(
+ String.format("SELECT * FROM clpTable WHERE clpMatch(message, '%s')", message),
+ String.format("SELECT * FROM clpTable WHERE message_logtype = '%s' AND message_encodedVars = %s"
+ + " AND message_encodedVars = %s"
+ + " AND REGEXP_LIKE(clpdecode(message_logtype, message_dictionaryVars, message_encodedVars, ''),"
+ + " '%s')",
+ encodedMessage.getLogTypeAsString(), encodedVars[0], encodedVars[1], String.format("^%s$", message))
+ );
+
+ // Query with no wildcards, a dictionary var, and an encoded var
+ message = " var123 456 ";
+ encoder.encodeMessage(message, encodedMessage);
+ dictionaryVars = encodedMessage.getDictionaryVarsAsStrings();
+ encodedVars = encodedMessage.getEncodedVarsAsBoxedLongs();
+ testQueryRewrite(
+ String.format("SELECT * FROM clpTable WHERE clpMatch(message, '%s')", message),
+ String.format("SELECT * FROM clpTable WHERE message_logtype = '%s' AND message_dictionaryVars = '%s'"
+ + " AND message_encodedVars = %s", encodedMessage.getLogTypeAsString(), dictionaryVars[0],
+ encodedVars[0])
+ );
+
+ // Query with wildcards for a single dictionary var
+ message = "var123";
+ encoder.encodeMessage(message, encodedMessage);
+ dictionaryVars = encodedMessage.getDictionaryVarsAsStrings();
+ testQueryRewrite(
+ String.format("SELECT * FROM clpTable WHERE clpMatch(message, '*%s*')", message),
+ String.format("SELECT * FROM clpTable WHERE REGEXP_LIKE(message_logtype, '.*%s.*')"
+ + " AND REGEXP_LIKE(message_dictionaryVars, '.*%s.*')"
+ + " AND REGEXP_LIKE(clpdecode(message_logtype, message_dictionaryVars, message_encodedVars, ''),"
+ + " '%s')",
+ encodedMessage.getLogTypeAsString(), dictionaryVars[0], String.format(".*%s.*", message))
+ );
+
+ // Query with wildcards for a single var which could be a float encoded var, int encoded var, or dictionary var
+ encoder.encodeMessage("123", encodedMessage);
+ String subquery1Logtype = encodedMessage.getLogTypeAsString();
+ encoder.encodeMessage("123.0", encodedMessage);
+ String subquery2Logtype = encodedMessage.getLogTypeAsString();
+ encoder.encodeMessage("var123", encodedMessage);
+ String subquery3Logtype = encodedMessage.getLogTypeAsString();
+ message = "123";
+ testQueryRewrite(
+ String.format("SELECT * FROM clpTable WHERE clpMatch(message, '*%s*')", message),
+ String.format("SELECT * FROM clpTable WHERE ("
+ + "(REGEXP_LIKE(message_logtype, '.*%s.*')"
+ + " AND clpEncodedVarsMatch(message_logtype, message_encodedVars, '*%s*', 0))"
+ + " OR (REGEXP_LIKE(message_logtype, '.*%s.*')"
+ + " AND clpEncodedVarsMatch(message_logtype, message_encodedVars, '*%s*', 1))"
+ + " OR (REGEXP_LIKE(message_logtype, '.*%s.*') AND REGEXP_LIKE(message_dictionaryVars, '.*%s.*'))"
+ + ") AND REGEXP_LIKE(clpdecode(message_logtype, message_dictionaryVars, message_encodedVars, ''),"
+ + " '.*%s.*')",
+ subquery1Logtype, message, subquery2Logtype, message, subquery3Logtype, message, message)
+ );
+ } catch (IOException e) {
+ fail("Failed to encode message", e);
+ }
+ }
+
+ /**
+ * Flattens an AND expression such that any of its children that are AND ops are elided by adding their operands to
+ * the given expression.
+ *
+ * Ex: "x = '1' AND ('y' = 2 AND NOT 'z' = 3)" would be flattened to
+ * "x '1' AND 'y' = 2 AND NOT 'z' = 3"
+ * @param expr
+ */
+ private void flattenAndExpression(Expression expr) {
+ List newOperands = new ArrayList<>();
+ Function func = expr.getFunctionCall();
+ for (Expression childOp : func.getOperands()) {
+ if (!childOp.isSetFunctionCall()) {
+ newOperands.add(childOp);
+ continue;
+ }
+
+ Function childFunc = childOp.getFunctionCall();
+ if (childFunc.getOperator().equals(SqlKind.AND.name())) {
+ flattenAndExpression(childOp);
+ newOperands.addAll(childOp.getFunctionCall().getOperands());
+ } else {
+ flattenAllAndExpressions(childOp);
+ newOperands.add(childOp);
+ }
+ }
+ func.setOperands(newOperands);
+ }
+
+ /**
+ * Flattens all AND expressions in a given expression.
+ *
+ * Ex: "a = 0 OR (x = '1' AND ('y' = 2 AND NOT 'z' = 3))" would be flattened to
+ * "a = 0 OR (x '1' AND 'y' = 2 AND NOT 'z' = 3)"
+ * @param expr
+ */
+ private void flattenAllAndExpressions(Expression expr) {
+ if (!expr.isSetFunctionCall()) {
+ return;
+ }
+
+ List newOperands = new ArrayList<>();
+
+ Function func = expr.getFunctionCall();
+ if (func.getOperator().equals(SqlKind.AND.name())) {
+ flattenAndExpression(expr);
+ return;
+ }
+
+ // Recursively handle the expression's operands
+ for (Expression childOp : func.getOperands()) {
+ if (!childOp.isSetFunctionCall()) {
+ newOperands.add(childOp);
+ continue;
+ }
+
+ Function childFunc = childOp.getFunctionCall();
+ if (childFunc.getOperator().equals(SqlKind.AND.name())) {
+ flattenAndExpression(childOp);
+ } else {
+ flattenAllAndExpressions(childOp);
+ }
+ newOperands.add(childOp);
+ }
+
+ func.setOperands(newOperands);
+ }
+
+ @Test
+ public void testUnsupportedCLPDecodeQueries() {
+ testUnsupportedQuery("SELECT clpDecode('message') FROM clpTable");
+ testUnsupportedQuery("SELECT clpDecode('message', 'default') FROM clpTable");
+ testUnsupportedQuery("SELECT clpDecode('message', default) FROM clpTable");
+ testUnsupportedQuery("SELECT clpDecode(message, default) FROM clpTable");
+ }
+
+ private void testQueryRewrite(String original, String expected) {
+ PinotQuery originalQuery = _QUERY_REWRITER.rewrite(CalciteSqlParser.compileToPinotQuery(original));
+ PinotQuery expectedQuery = CalciteSqlParser.compileToPinotQuery(expected);
+ // Flatten any AND expressions in the rewritten query.
+ // NOTE: The rewritten query may have nested AND conditions of the form (A AND (B AND C)). If we don't flatten them,
+ // comparison with the expected query will fail.
+ Expression origQueryFilterExpr = originalQuery.getFilterExpression();
+ if (null != origQueryFilterExpr) {
+ flattenAllAndExpressions(origQueryFilterExpr);
+ }
+ assertEquals(originalQuery, expectedQuery);
+ }
+
+ private void testUnsupportedQuery(String query) {
+ assertThrows(SqlCompilationException.class,
+ () -> _QUERY_REWRITER.rewrite(CalciteSqlParser.compileToPinotQuery(query)));
+ }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/ClpEncodedVarsMatchTransformFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/ClpEncodedVarsMatchTransformFunction.java
new file mode 100644
index 00000000000..ba116e8eb53
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/ClpEncodedVarsMatchTransformFunction.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.transform.function;
+
+import com.google.common.base.Preconditions;
+import com.yscope.clp.compressorfrontend.AbstractClpEncodedSubquery;
+import com.yscope.clp.compressorfrontend.BuiltInVariableHandlingRuleVersions;
+import com.yscope.clp.compressorfrontend.EightByteClpEncodedSubquery;
+import com.yscope.clp.compressorfrontend.EightByteClpWildcardQueryEncoder;
+import com.yscope.clp.compressorfrontend.MessageDecoder;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.pinot.common.function.TransformFunctionType;
+import org.apache.pinot.core.operator.ColumnContext;
+import org.apache.pinot.core.operator.blocks.ValueBlock;
+import org.apache.pinot.core.operator.transform.TransformResultMetadata;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Performs a wildcard match on the encoded variables of a CLP-encoded column group. This is used by the clpMatch
+ * function (implemented using {@link org.apache.pinot.sql.parsers.rewriter.ClpRewriter}) and likely wouldn't be called
+ * manually by a user.
+ *
+ * Syntax:
+ *
+ * clpEncodedVarsMatch(columnGroupName_logtype, columnGroupName_encodedVars, wildcardQuery, subQueryIndex)
+ *
+ */
+public class ClpEncodedVarsMatchTransformFunction extends BaseTransformFunction {
+ private static final Logger _logger = LoggerFactory.getLogger(ClpEncodedVarsMatchTransformFunction.class);
+
+ private final List _transformFunctions = new ArrayList<>();
+ private byte[] _serializedVarTypes;
+ private byte[] _serializedVarWildcardQueries;
+ private int[] _varWildcardQueryEndIndexes;
+
+ @Override
+ public String getName() {
+ return TransformFunctionType.CLP_ENCODED_VARS_MATCH.getName();
+ }
+
+ @Override
+ public void init(List arguments, Map columnContextMap) {
+ Preconditions.checkArgument(arguments.size() == 4, "Syntax error: clpEncodedVarsMatch takes 4 arguments - "
+ + "clpEncodedVarsMatch(columnGroupName_logtype, columnGroupName_encodedVars, wildcardQuery, subQueryIndex");
+
+ Iterator argsIter = arguments.iterator();
+
+ TransformFunction f = argsIter.next();
+ Preconditions.checkArgument(f instanceof IdentifierTransformFunction, "1st argument must be an identifier");
+ _transformFunctions.add(f);
+
+ f = argsIter.next();
+ Preconditions.checkArgument(f instanceof IdentifierTransformFunction, "2nd argument must be an identifier");
+ _transformFunctions.add(f);
+
+ f = argsIter.next();
+ Preconditions.checkArgument(f instanceof LiteralTransformFunction, "3rd argument must be a string literal");
+ String wildcardQuery = ((LiteralTransformFunction) f).getStringLiteral();
+
+ f = argsIter.next();
+ Preconditions.checkArgument(f instanceof LiteralTransformFunction, "4th argument must be a long literal");
+ long subqueryIndex = ((LiteralTransformFunction) f).getLongLiteral();
+
+ EightByteClpWildcardQueryEncoder queryEncoder =
+ new EightByteClpWildcardQueryEncoder(BuiltInVariableHandlingRuleVersions.VariablesSchemaV2,
+ BuiltInVariableHandlingRuleVersions.VariableEncodingMethodsV1);
+ EightByteClpEncodedSubquery[] subqueries = queryEncoder.encode(wildcardQuery);
+ if (subqueryIndex < 0 || subqueryIndex > subqueries.length) {
+ throw new IllegalArgumentException("Invalid subquery index.");
+ }
+ EightByteClpEncodedSubquery subquery = subqueries[(int) subqueryIndex];
+ int numEncodedVarWildcardQueries = subquery.getNumEncodedVarWildcardQueries();
+ if (0 == numEncodedVarWildcardQueries) {
+ throw new IllegalArgumentException("Subquery doesn't contain any wildcard queries for encoded variables.");
+ }
+
+ try {
+ ByteArrayOutputStream serializedVarTypes = new ByteArrayOutputStream();
+ ByteArrayOutputStream serializedWildcardQueries = new ByteArrayOutputStream();
+ List serializedWildcardQueryEndIndices = new ArrayList<>();
+ for (AbstractClpEncodedSubquery.VariableWildcardQuery q : subquery.getEncodedVarWildcardQueries()) {
+ serializedVarTypes.write(q.getType());
+ serializedWildcardQueries.write(q.getQuery().toByteArray());
+ serializedWildcardQueryEndIndices.add(serializedWildcardQueries.size());
+ }
+ _serializedVarTypes = serializedVarTypes.toByteArray();
+ _serializedVarWildcardQueries = serializedWildcardQueries.toByteArray();
+ _varWildcardQueryEndIndexes = ArrayUtils.toPrimitive(serializedWildcardQueryEndIndices.toArray(new Integer[0]));
+ } catch (IOException e) {
+ throw new IllegalArgumentException("Wildcard query could not be serialized", e);
+ }
+ }
+
+ @Override
+ public TransformResultMetadata getResultMetadata() {
+ return new TransformResultMetadata(FieldSpec.DataType.BOOLEAN, true, false);
+ }
+
+ @Override
+ public int[] transformToIntValuesSV(ValueBlock valueBlock) {
+ int length = valueBlock.getNumDocs();
+ if (null == _intValuesSV) {
+ _intValuesSV = new int[length];
+ }
+
+ int functionIdx = 0;
+ TransformFunction logtypeTransformFunction = _transformFunctions.get(functionIdx++);
+ TransformFunction encodedVarsTransformFunction = _transformFunctions.get(functionIdx++);
+ byte[][] logtypes = logtypeTransformFunction.transformToBytesValuesSV(valueBlock);
+ long[][] encodedVars = encodedVarsTransformFunction.transformToLongValuesMV(valueBlock);
+
+ MessageDecoder clpMessageDecoder = new MessageDecoder(BuiltInVariableHandlingRuleVersions.VariablesSchemaV2,
+ BuiltInVariableHandlingRuleVersions.VariableEncodingMethodsV1);
+ try {
+ clpMessageDecoder.batchEncodedVarsWildcardMatch(logtypes, encodedVars, _serializedVarTypes,
+ _serializedVarWildcardQueries, _varWildcardQueryEndIndexes, _intValuesSV);
+ } catch (IOException ex) {
+ _logger.error("Failed to perform wildcard match on (CLP) encoded variables field.", ex);
+ Arrays.fill(_intValuesSV, 0);
+ }
+
+ return _intValuesSV;
+ }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/TransformFunctionFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/TransformFunctionFactory.java
index 82afb6dbeb2..d5e4d9d481d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/TransformFunctionFactory.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/TransformFunctionFactory.java
@@ -146,6 +146,7 @@ private static Map> createRegistry()
typeToImplementation.put(TransformFunctionType.IN_ID_SET, InIdSetTransformFunction.class);
typeToImplementation.put(TransformFunctionType.LOOKUP, LookupTransformFunction.class);
typeToImplementation.put(TransformFunctionType.CLP_DECODE, CLPDecodeTransformFunction.class);
+ typeToImplementation.put(TransformFunctionType.CLP_ENCODED_VARS_MATCH, ClpEncodedVarsMatchTransformFunction.class);
typeToImplementation.put(TransformFunctionType.EXTRACT, ExtractTransformFunction.class);
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/CLPDecodeTransformFunctionTest.java b/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/ClpTransformFunctionsTest.java
similarity index 69%
rename from pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/CLPDecodeTransformFunctionTest.java
rename to pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/ClpTransformFunctionsTest.java
index e0d419dca0e..67c1a6ef089 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/CLPDecodeTransformFunctionTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/ClpTransformFunctionsTest.java
@@ -63,7 +63,7 @@
import static org.testng.Assert.fail;
-public class CLPDecodeTransformFunctionTest {
+public class ClpTransformFunctionsTest {
private static final String SEGMENT_NAME = "testSegmentForClpDecode";
private static final String INDEX_DIR_PATH = FileUtils.getTempDirectoryPath() + File.separator + SEGMENT_NAME;
private static final String TIMESTAMP_COLUMN = "timestampColumn";
@@ -119,9 +119,10 @@ public void setup()
_logtypeValues[NUM_ROWS - 1] = clpEncodedMessage.getLogTypeAsString();
_dictVarValues[NUM_ROWS - 1] = clpEncodedMessage.getDictionaryVarsAsStrings();
_encodedVarValues[NUM_ROWS - 1] = clpEncodedMessage.getEncodedVarsAsBoxedLongs();
- // Corrupt the previous two rows, so we can test the default value
+ // Corrupt a row, so we can test the default value
+ // NOTE: We don't corrupt the encoded variables column since that would cause clpEncodedVarsMatch to detect an
+ // error and abandon the batch, rendering the test useless.
_dictVarValues[NUM_ROWS - 2] = null;
- _encodedVarValues[NUM_ROWS - 3] = null;
List rows = new ArrayList<>(NUM_ROWS);
for (int i = 0; i < NUM_ROWS; i++) {
@@ -159,7 +160,7 @@ public void deleteOldIndex() {
}
@Test
- public void testTransform() {
+ public void testClpDecode() {
ExpressionContext expression = RequestContextUtils.getExpression(
String.format("%s(%s,%s,%s)", TransformFunctionType.CLP_DECODE.getName(), LOGTYPE_COLUMN, DICT_VARS_COLUMN,
ENCODED_VARS_COLUMN));
@@ -168,14 +169,13 @@ public void testTransform() {
String[] expectedValues = new String[NUM_ROWS];
Arrays.fill(expectedValues, TEST_MESSAGE);
- expectedValues[NUM_ROWS - 3] = DEFAULT_DIMENSION_NULL_VALUE_OF_STRING;
expectedValues[NUM_ROWS - 2] = DEFAULT_DIMENSION_NULL_VALUE_OF_STRING;
expectedValues[NUM_ROWS - 1] = DEFAULT_DIMENSION_NULL_VALUE_OF_STRING;
- testTransformFunction(transformFunction, expectedValues);
+ testStringTransformFunc(transformFunction, expectedValues);
}
@Test
- public void testTransformWithDefaultValue() {
+ public void testClpDecodeWithDefaultValue() {
String defaultValue = "default";
ExpressionContext expression = RequestContextUtils.getExpression(
String.format("%s(%s,%s,%s,'%s')", TransformFunctionType.CLP_DECODE.getName(), LOGTYPE_COLUMN, DICT_VARS_COLUMN,
@@ -185,14 +185,13 @@ public void testTransformWithDefaultValue() {
String[] expectedValues = new String[NUM_ROWS];
Arrays.fill(expectedValues, TEST_MESSAGE);
- expectedValues[NUM_ROWS - 3] = defaultValue;
expectedValues[NUM_ROWS - 2] = defaultValue;
expectedValues[NUM_ROWS - 1] = DEFAULT_DIMENSION_NULL_VALUE_OF_STRING;
- testTransformFunction(transformFunction, expectedValues);
+ testStringTransformFunc(transformFunction, expectedValues);
}
@Test
- public void testInvalidArgs() {
+ public void testClpDecodeWithInvalidArg() {
String defaultValue = "default";
// 1st parameter literal
@@ -235,10 +234,100 @@ public void testInvalidArgs() {
});
}
- private void testTransformFunction(TransformFunction transformFunction, String[] expectedValues) {
+ @Test
+ public void testClpEncodedVarsMatch() {
+ String wildcardQuery;
+
+ // Test query which will match
+ wildcardQuery = "*51*";
+ // The query should generate three subqueries: One for an encoded integer var, one for an encoded float var, and one
+ // for a dictionary var, in that order.
+ testClpEncodedVarsMatch(wildcardQuery, 0, false);
+ testClpEncodedVarsMatch(wildcardQuery, 1, true);
+ }
+
+ @Test
+ public void testClpEncodedVarsMatchWithInvalidArg() {
+ String wildcardQuery = "*123*";
+ long subqueryIdx = 0;
+
+ // 1st parameter literal
+ assertThrows(BadQueryRequestException.class, () -> {
+ ExpressionContext expression = RequestContextUtils.getExpression(
+ String.format("%s('%s',%s,'%s',%s)", TransformFunctionType.CLP_ENCODED_VARS_MATCH.getName(), LOGTYPE_COLUMN,
+ ENCODED_VARS_COLUMN, wildcardQuery, subqueryIdx));
+ TransformFunctionFactory.get(expression, _dataSourceMap);
+ });
+
+ // 2nd parameter literal
+ assertThrows(BadQueryRequestException.class, () -> {
+ ExpressionContext expression = RequestContextUtils.getExpression(
+ String.format("%s(%s,'%s','%s',%s)", TransformFunctionType.CLP_ENCODED_VARS_MATCH.getName(), LOGTYPE_COLUMN,
+ ENCODED_VARS_COLUMN, wildcardQuery, subqueryIdx));
+ TransformFunctionFactory.get(expression, _dataSourceMap);
+ });
+
+ // 3rd parameter identifier
+ assertThrows(BadQueryRequestException.class, () -> {
+ ExpressionContext expression = RequestContextUtils.getExpression(
+ String.format("%s(%s,%s,%s,%s)", TransformFunctionType.CLP_ENCODED_VARS_MATCH.getName(), LOGTYPE_COLUMN,
+ ENCODED_VARS_COLUMN, ENCODED_VARS_COLUMN, subqueryIdx));
+ TransformFunctionFactory.get(expression, _dataSourceMap);
+ });
+
+ // 4th parameter identifier
+ assertThrows(BadQueryRequestException.class, () -> {
+ ExpressionContext expression = RequestContextUtils.getExpression(
+ String.format("%s('%s',%s,'%s',%s)", TransformFunctionType.CLP_ENCODED_VARS_MATCH.getName(),
+ LOGTYPE_COLUMN, ENCODED_VARS_COLUMN, wildcardQuery, ENCODED_VARS_COLUMN));
+ TransformFunctionFactory.get(expression, _dataSourceMap);
+ });
+
+ // Missing args
+ assertThrows(BadQueryRequestException.class, () -> {
+ ExpressionContext expression = RequestContextUtils.getExpression(
+ String.format("%s(%s)", TransformFunctionType.CLP_ENCODED_VARS_MATCH.getName(), LOGTYPE_COLUMN));
+ TransformFunctionFactory.get(expression, _dataSourceMap);
+ });
+ assertThrows(BadQueryRequestException.class, () -> {
+ ExpressionContext expression = RequestContextUtils.getExpression(
+ String.format("%s(%s,%s)", TransformFunctionType.CLP_ENCODED_VARS_MATCH.getName(), LOGTYPE_COLUMN,
+ ENCODED_VARS_COLUMN));
+ TransformFunctionFactory.get(expression, _dataSourceMap);
+ });
+ assertThrows(BadQueryRequestException.class, () -> {
+ ExpressionContext expression = RequestContextUtils.getExpression(
+ String.format("%s(%s,%s,'%s')", TransformFunctionType.CLP_ENCODED_VARS_MATCH.getName(), LOGTYPE_COLUMN,
+ ENCODED_VARS_COLUMN, wildcardQuery));
+ TransformFunctionFactory.get(expression, _dataSourceMap);
+ });
+ }
+
+ private void testClpEncodedVarsMatch(String wildcardQuery, int subqueryIdx, boolean shouldMatch) {
+ ExpressionContext expression = RequestContextUtils.getExpression(
+ String.format("%s(%s,%s,'%s',%s)", TransformFunctionType.CLP_ENCODED_VARS_MATCH.getName(), LOGTYPE_COLUMN,
+ ENCODED_VARS_COLUMN, wildcardQuery, subqueryIdx));
+ TransformFunction transformFunction = TransformFunctionFactory.get(expression, _dataSourceMap);
+ Assert.assertTrue(transformFunction instanceof ClpEncodedVarsMatchTransformFunction);
+
+ int[] expectedValues = new int[NUM_ROWS];
+ Arrays.fill(expectedValues, shouldMatch ? 1 : 0);
+ // The last row won't match since it's a null
+ expectedValues[NUM_ROWS - 1] = 0;
+ testIntTransformFunc(transformFunction, expectedValues);
+ }
+
+ private void testStringTransformFunc(TransformFunction transformFunction, String[] expectedValues) {
String[] values = transformFunction.transformToStringValuesSV(_projectionBlock);
for (int i = 0; i < NUM_ROWS; i++) {
assertEquals(values[i], expectedValues[i]);
}
}
+
+ private void testIntTransformFunc(TransformFunction transformFunction, int[] expectedValues) {
+ int[] values = transformFunction.transformToIntValuesSV(_projectionBlock);
+ for (int i = 0; i < NUM_ROWS; i++) {
+ assertEquals(values[i], expectedValues[i]);
+ }
+ }
}
diff --git a/pinot-plugins/pinot-input-format/pinot-clp-log/src/main/java/org/apache/pinot/plugin/inputformat/clplog/CLPLogRecordExtractor.java b/pinot-plugins/pinot-input-format/pinot-clp-log/src/main/java/org/apache/pinot/plugin/inputformat/clplog/CLPLogRecordExtractor.java
index 355b7e7ee15..16ca7749ba4 100644
--- a/pinot-plugins/pinot-input-format/pinot-clp-log/src/main/java/org/apache/pinot/plugin/inputformat/clplog/CLPLogRecordExtractor.java
+++ b/pinot-plugins/pinot-input-format/pinot-clp-log/src/main/java/org/apache/pinot/plugin/inputformat/clplog/CLPLogRecordExtractor.java
@@ -30,7 +30,7 @@
import org.apache.pinot.spi.data.readers.BaseRecordExtractor;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.RecordExtractorConfig;
-import org.apache.pinot.sql.parsers.rewriter.CLPDecodeRewriter;
+import org.apache.pinot.sql.parsers.rewriter.ClpRewriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -159,8 +159,8 @@ private void encodeFieldWithClp(String key, Object value, GenericRow to) {
}
}
- to.putValue(key + CLPDecodeRewriter.LOGTYPE_COLUMN_SUFFIX, logtype);
- to.putValue(key + CLPDecodeRewriter.DICTIONARY_VARS_COLUMN_SUFFIX, dictVars);
- to.putValue(key + CLPDecodeRewriter.ENCODED_VARS_COLUMN_SUFFIX, encodedVars);
+ to.putValue(key + ClpRewriter.LOGTYPE_COLUMN_SUFFIX, logtype);
+ to.putValue(key + ClpRewriter.DICTIONARY_VARS_COLUMN_SUFFIX, dictVars);
+ to.putValue(key + ClpRewriter.ENCODED_VARS_COLUMN_SUFFIX, encodedVars);
}
}
diff --git a/pinot-plugins/pinot-input-format/pinot-clp-log/src/test/java/org/apache/pinot/plugin/inputformat/clplog/CLPLogRecordExtractorTest.java b/pinot-plugins/pinot-input-format/pinot-clp-log/src/test/java/org/apache/pinot/plugin/inputformat/clplog/CLPLogRecordExtractorTest.java
index 5008b5af432..72058be3451 100644
--- a/pinot-plugins/pinot-input-format/pinot-clp-log/src/test/java/org/apache/pinot/plugin/inputformat/clplog/CLPLogRecordExtractorTest.java
+++ b/pinot-plugins/pinot-input-format/pinot-clp-log/src/test/java/org/apache/pinot/plugin/inputformat/clplog/CLPLogRecordExtractorTest.java
@@ -27,7 +27,7 @@
import java.util.Map;
import java.util.Set;
import org.apache.pinot.spi.data.readers.GenericRow;
-import org.apache.pinot.sql.parsers.rewriter.CLPDecodeRewriter;
+import org.apache.pinot.sql.parsers.rewriter.ClpRewriter;
import org.testng.annotations.Test;
import static org.apache.pinot.plugin.inputformat.clplog.CLPLogRecordExtractorConfig.FIELDS_FOR_CLP_ENCODING_CONFIG_KEY;
@@ -136,9 +136,9 @@ public void testEmptyCLPEncodingConfig() {
}
private void addCLPEncodedField(String fieldName, Set fields) {
- fields.add(fieldName + CLPDecodeRewriter.LOGTYPE_COLUMN_SUFFIX);
- fields.add(fieldName + CLPDecodeRewriter.DICTIONARY_VARS_COLUMN_SUFFIX);
- fields.add(fieldName + CLPDecodeRewriter.ENCODED_VARS_COLUMN_SUFFIX);
+ fields.add(fieldName + ClpRewriter.LOGTYPE_COLUMN_SUFFIX);
+ fields.add(fieldName + ClpRewriter.DICTIONARY_VARS_COLUMN_SUFFIX);
+ fields.add(fieldName + ClpRewriter.ENCODED_VARS_COLUMN_SUFFIX);
}
private GenericRow extract(Map props, Set fieldsToRead) {
@@ -163,12 +163,12 @@ private void validateClpEncodedField(GenericRow row, String fieldName, String ex
try {
// Decode and validate field
assertNull(row.getValue(fieldName));
- String logtype = (String) row.getValue(fieldName + CLPDecodeRewriter.LOGTYPE_COLUMN_SUFFIX);
+ String logtype = (String) row.getValue(fieldName + ClpRewriter.LOGTYPE_COLUMN_SUFFIX);
assertNotEquals(logtype, null);
String[] dictionaryVars =
- (String[]) row.getValue(fieldName + CLPDecodeRewriter.DICTIONARY_VARS_COLUMN_SUFFIX);
+ (String[]) row.getValue(fieldName + ClpRewriter.DICTIONARY_VARS_COLUMN_SUFFIX);
assertNotEquals(dictionaryVars, null);
- Long[] encodedVars = (Long[]) row.getValue(fieldName + CLPDecodeRewriter.ENCODED_VARS_COLUMN_SUFFIX);
+ Long[] encodedVars = (Long[]) row.getValue(fieldName + ClpRewriter.ENCODED_VARS_COLUMN_SUFFIX);
assertNotEquals(encodedVars, null);
long[] encodedVarsAsPrimitives = Arrays.stream(encodedVars).mapToLong(Long::longValue).toArray();