Skip to content

Commit

Permalink
Fixing the bytes literal usage in leaf stage
Browse files Browse the repository at this point in the history
  • Loading branch information
xiangfu0 committed Oct 4, 2023
1 parent c96221f commit a52c914
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableSet;
import com.google.protobuf.ByteString;
import java.math.BigDecimal;
import java.util.HashMap;
import java.util.Map;
Expand All @@ -39,6 +38,7 @@
import org.apache.pinot.common.request.Literal;
import org.apache.pinot.common.request.PinotQuery;
import org.apache.pinot.spi.utils.BigDecimalUtils;
import org.apache.pinot.spi.utils.ByteArray;
import org.apache.pinot.spi.utils.BytesUtils;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.sql.FilterKind;
Expand Down Expand Up @@ -175,6 +175,11 @@ public static Expression getLiteralExpression(String value) {
return expression;
}

/**
* TODO: We should use {@link #getLiteralExpression(ByteArray)} instead of this method to handle binary.
*
*/
@Deprecated
public static Expression getLiteralExpression(byte[] value) {
Expression expression = createNewLiteralExpression();
// TODO(After 1.0.0): This is for backward-compatibility, we can set the binary value directly instead of
Expand All @@ -183,9 +188,9 @@ public static Expression getLiteralExpression(byte[] value) {
return expression;
}

public static Expression getLiteralExpression(ByteString value) {
public static Expression getLiteralExpression(ByteArray value) {
Expression expression = createNewLiteralExpression();
expression.getLiteral().setBinaryValue(value.toByteArray());
expression.getLiteral().setBinaryValue(value.getBytes());
return expression;
}

Expand All @@ -201,6 +206,9 @@ public static Expression getNullLiteralExpression() {
return expression;
}

/**
* The input object should only be internal data type representation.
*/
public static Expression getLiteralExpression(Object object) {
if (object == null) {
return getNullLiteralExpression();
Expand All @@ -218,16 +226,20 @@ public static Expression getLiteralExpression(Object object) {
if (object instanceof Double) {
return RequestUtils.getLiteralExpression(((Double) object).doubleValue());
}
// byte[] is not an internal data type, put it here for backward compatibility, we should use ByteArray instead.
if (object instanceof byte[]) {
return RequestUtils.getLiteralExpression((byte[]) object);
}
if (object instanceof ByteString) {
return RequestUtils.getLiteralExpression((ByteString) object);
if (object instanceof ByteArray) {
return RequestUtils.getLiteralExpression((ByteArray) object);
}
if (object instanceof Boolean) {
return RequestUtils.getLiteralExpression(((Boolean) object).booleanValue());
}
return RequestUtils.getLiteralExpression(object.toString());
if (object instanceof String) {
return RequestUtils.getLiteralExpression((String) object);
}
throw new UnsupportedOperationException("Unsupported data type: " + object.getClass() + " with value: " + object);
}

public static Expression getFunctionExpression(String canonicalName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -447,4 +447,29 @@ public void testStUnionQuery(boolean useMultiStageQueryEngine)
+ "05e89a7503b81b64042bddabe27179cc05e89a85caafbc24042be215336deb9c05e899ba1b196104042be385c67dfe3";
Assert.assertEquals(actualResult, expectedResult);
}

@Test(dataProvider = "useV2QueryEngine")
public void testStPointWithLiteralWithV2(boolean useMultiStageQueryEngine)
throws Exception {
setUseMultiStageQueryEngine(useMultiStageQueryEngine);

String query =
String.format("Select "
+ "ST_Point(1,2) "
+ "FROM %s a "
+ "JOIN %s b "
+ "ON a.wkt1=b.wkt1 "
+ "LIMIT 10",
getTableName(),
getTableName());
JsonNode pinotResponse = postQuery(query);
JsonNode rows = pinotResponse.get("resultTable").get("rows");
for (int i = 0; i < rows.size(); i++) {
JsonNode record = rows.get(i);
Point point = GeometryUtils.GEOMETRY_FACTORY.createPoint(new Coordinate(1, 2));
byte[] expectedValue = GeometrySerializer.serialize(point);
byte[] actualValue = BytesUtils.toBytes(record.get(0).asText());
assertEquals(actualValue, expectedValue);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.calcite.util.Sarg;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.spi.utils.BooleanUtils;
import org.apache.pinot.spi.utils.ByteArray;
import org.checkerframework.checker.nullness.qual.Nullable;


Expand All @@ -66,6 +67,12 @@ public static RexExpression.InputRef fromRexInputRef(RexInputRef rexInputRef) {

public static RexExpression.Literal fromRexLiteral(RexLiteral rexLiteral) {
ColumnDataType dataType = RelToPlanNodeConverter.convertToColumnDataType(rexLiteral.getType());
// Calcite may parse the string literal to OBJECT type, e.g. TIMEUNIT.HOUR/YEAR etc.
// Here we convert it to back to STRING type, so there shouldn't be any unexpected RexExpression.Literal.
if (dataType == ColumnDataType.OBJECT) {
dataType = ColumnDataType.STRING;
return new RexExpression.Literal(dataType, rexLiteral.getValue().toString());
}
return new RexExpression.Literal(dataType, convertValue(dataType, rexLiteral.getValue()));
}

Expand All @@ -90,7 +97,7 @@ private static Object convertValue(ColumnDataType dataType, @Nullable Comparable
case STRING:
return ((NlsString) value).getValue();
case BYTES:
return ((ByteString) value).getBytes();
return new ByteArray(((ByteString) value).getBytes());
default:
return value;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.Map;
import java.util.Set;
import org.apache.pinot.common.proto.Plan;
import org.apache.pinot.spi.utils.ByteArray;


/**
Expand Down Expand Up @@ -129,10 +130,13 @@ private static Plan.LiteralField stringField(String val) {
return Plan.LiteralField.newBuilder().setStringField(val).build();
}

private static Plan.LiteralField bytesField(ByteString val) {
return Plan.LiteralField.newBuilder().setBytesField(val).build();
private static Plan.LiteralField bytesField(ByteArray val) {
return Plan.LiteralField.newBuilder().setBytesField(ByteString.copyFrom(val.getBytes())).build();
}

/**
* Serialize an internal data type value
*/
private static Plan.MemberVariableField serializeMemberVariable(Object fieldObject) {
Plan.MemberVariableField.Builder builder = Plan.MemberVariableField.newBuilder();
if (fieldObject instanceof Boolean) {
Expand All @@ -148,9 +152,11 @@ private static Plan.MemberVariableField serializeMemberVariable(Object fieldObje
} else if (fieldObject instanceof String) {
builder.setLiteralField(stringField((String) fieldObject));
} else if (fieldObject instanceof byte[]) {
builder.setLiteralField(bytesField(ByteString.copyFrom((byte[]) fieldObject)));
throw new IllegalStateException("byte[] is not supported, use ByteArray instead");
} else if (fieldObject instanceof GregorianCalendar) {
builder.setLiteralField(longField(((GregorianCalendar) fieldObject).getTimeInMillis()));
throw new IllegalStateException("GregorianCalendar is not supported, use long instead");
} else if (fieldObject instanceof ByteArray) {
builder.setLiteralField(bytesField((ByteArray) fieldObject));
} else if (fieldObject instanceof List) {
builder.setListField(serializeListMemberVariable(fieldObject));
} else if (fieldObject instanceof Map) {
Expand Down Expand Up @@ -215,10 +221,11 @@ private static Object constructLiteral(Plan.LiteralField literalField) {
case STRINGFIELD:
return literalField.getStringField();
case BYTESFIELD:
return literalField.getBytesField();
return new ByteArray(literalField.getBytesField().toByteArray());
case LITERALFIELD_NOT_SET:
default:
return null;
default:
throw new IllegalStateException("Unknown LiteralField type: " + literalField.getLiteralFieldCase());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,9 @@ public void setUp()
_mailboxService.start();

QueryServerEnclosure server1 = new QueryServerEnclosure(factory1);
QueryServerEnclosure server2 = new QueryServerEnclosure(factory2);
server1.start();
// Start server1 to ensure the next server will have a different port.
QueryServerEnclosure server2 = new QueryServerEnclosure(factory2);
server2.start();
// this doesn't test the QueryServer functionality so the server port can be the same as the mailbox port.
// this is only use for test identifier purpose.
Expand Down

0 comments on commit a52c914

Please sign in to comment.