Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gapfill: Add support for lowercase datatypes #11722

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.request.context.FunctionContext;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
Expand Down Expand Up @@ -134,6 +136,9 @@ protected void replaceColumnNameWithAlias(DataSchema dataSchema) {
for (int i = 0; i < dataSchema.getColumnNames().length; i++) {
if (columnNameToAliasMap.containsKey(dataSchema.getColumnNames()[i])) {
dataSchema.getColumnNames()[i] = columnNameToAliasMap.get(dataSchema.getColumnNames()[i]);
} else if (columnNameToAliasMap.containsKey(caseInsensitiveTypeString(dataSchema.getColumnNames()[i]))) {
dataSchema.getColumnNames()[i] =
columnNameToAliasMap.get(caseInsensitiveTypeString(dataSchema.getColumnNames()[i]));
Comment on lines +139 to +141
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. With this code, I think we can remove the first if statement to simplify the code(even though it always runs the caseInsensitiveTypeString code and may be a bit slower), what do you think?
if (columnNameToAliasMap.containsKey(dataSchema.getColumnNames()[i])) {
    dataSchema.getColumnNames()[i] = columnNameToAliasMap.get(dataSchema.getColumnNames()[i]);
}
  1. We can define a variable lowerCaseColumnName = caseInsensitiveTypeString(columnName) to avoid execute the same code twice.

}
}
}
Expand Down Expand Up @@ -226,4 +231,16 @@ protected List<Object[]> gapFillAndAggregate(List<Object[]> rows, DataSchema dat
DataSchema resultTableSchema) {
throw new UnsupportedOperationException("Not supported");
}

protected String caseInsensitiveTypeString(String columnName) {
Copy link
Contributor

@zhtaoxiang zhtaoxiang Oct 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(I haven't check the details of the logic, so I may be wrong):
the logic will also replace INT_column to int_column (similar for other types) if the parameter contains such string, is this intended?

String dataTypePattern = "(BOOLEAN|INT|LONG|FLOAT|DOUBLE|STRING)";
Matcher matcher = Pattern.compile(dataTypePattern, Pattern.CASE_INSENSITIVE).matcher(columnName);
StringBuffer result = new StringBuffer();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to make the code thread-safe? If not, StringBuilder will be faster

while (matcher.find()) {
String dataType = matcher.group().toLowerCase();
matcher.appendReplacement(result, dataType);
}
matcher.appendTail(result);
return result.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4071,6 +4071,89 @@ public void datetimeconvertGapfillTestAggregateAggregateOutOfBoundary() {
}
}

@Test
public void gapfillTestAggregateUpperCaseDataType() {
DateTimeFormatSpec dateTimeFormatter =
new DateTimeFormatSpec("1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS");
DateTimeGranularitySpec dateTimeGranularity = new DateTimeGranularitySpec("1:HOURS");
long start;

String gapfillQuery1 = "SELECT "
+ "time_col, SUM(occupied) as occupied_slots_count, time_col "
+ "FROM ("
+ " SELECT GapFill(time_col, "
+ " '1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', "
+ " '2021-11-07 8:00:00.000', '2021-11-07 10:00:00.000', '1:HOURS',"
+ " FILL(occupied, 'FILL_PREVIOUS_VALUE'), TIMESERIESON(levelId, lotId)) AS time_col,"
+ " occupied, lotId, levelId"
+ " FROM ("
+ " SELECT DATETIMECONVERT(eventTime, '1:MILLISECONDS:EPOCH', "
+ " '1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', '1:HOURS') AS time_col,"
+ " lastWithTime(isOccupied, eventTime, 'int') as occupied, lotId, levelId"
+ " FROM parkingData "
+ " WHERE eventTime >= 1636257600000 AND eventTime <= 1636286400000 "
+ " GROUP BY time_col, levelId, lotId "
+ " LIMIT 200 "
+ " ) "
+ " LIMIT 200 "
+ ") "
+ " GROUP BY time_col "
+ " LIMIT 200 ";

BrokerResponseNative gapfillBrokerResponse1 = getBrokerResponse(gapfillQuery1);

double[] expectedOccupiedSlotsCounts1 = new double[]{6, 4};
ResultTable gapFillResultTable1 = gapfillBrokerResponse1.getResultTable();
List<Object[]> gapFillRows1 = gapFillResultTable1.getRows();
Assert.assertEquals(gapFillRows1.size(), expectedOccupiedSlotsCounts1.length);
start = dateTimeFormatter.fromFormatToMillis("2021-11-07 08:00:00.000");
for (int i = 0; i < expectedOccupiedSlotsCounts1.length; i++) {
String firstTimeCol = (String) gapFillRows1.get(i)[0];
long timeStamp = dateTimeFormatter.fromFormatToMillis(firstTimeCol);
Assert.assertEquals(timeStamp, start);
Assert.assertEquals(expectedOccupiedSlotsCounts1[i], gapFillRows1.get(i)[1]);
start += dateTimeGranularity.granularityToMillis();
}

String gapfillQuery2 = "SELECT "
+ "time_col, SUM(occupied) as occupied_slots_count, time_col "
+ "FROM ("
+ " SELECT GapFill(time_col, "
+ " '1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', "
+ " '2021-11-07 8:00:00.000', '2021-11-07 10:00:00.000', '1:HOURS',"
+ " FILL(occupied, 'FILL_PREVIOUS_VALUE'), TIMESERIESON(levelId, lotId)) AS time_col,"
+ " occupied, lotId, levelId"
+ " FROM ("
+ " SELECT DATETIMECONVERT(eventTime, '1:MILLISECONDS:EPOCH', "
+ " '1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', '1:HOURS') AS time_col,"
+ " lastWithTime(isOccupied, eventTime, 'int') as occupied, lotId, levelId"
+ " FROM parkingData "
+ " WHERE eventTime >= 1636257600000 AND eventTime <= 1636286400000 "
+ " GROUP BY time_col, levelId, lotId "
+ " LIMIT 200 "
+ " ) "
+ " LIMIT 200 "
+ ") "
+ " WHERE occupied = 1 "
+ " GROUP BY time_col "
+ " LIMIT 200 ";

BrokerResponseNative gapfillBrokerResponse2 = getBrokerResponse(gapfillQuery2);

double[] expectedOccupiedSlotsCounts2 = new double[]{6, 4};
ResultTable gapFillResultTable2 = gapfillBrokerResponse2.getResultTable();
List<Object[]> gapFillRows2 = gapFillResultTable2.getRows();
Assert.assertEquals(gapFillRows2.size(), expectedOccupiedSlotsCounts2.length);
start = dateTimeFormatter.fromFormatToMillis("2021-11-07 08:00:00.000");
for (int i = 0; i < expectedOccupiedSlotsCounts2.length; i++) {
String firstTimeCol = (String) gapFillRows2.get(i)[0];
long timeStamp = dateTimeFormatter.fromFormatToMillis(firstTimeCol);
Assert.assertEquals(timeStamp, start);
Assert.assertEquals(expectedOccupiedSlotsCounts2[i], gapFillRows2.get(i)[1]);
start += dateTimeGranularity.granularityToMillis();
}
}

@AfterClass
public void tearDown()
throws IOException {
Expand Down
Loading