Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add hyperLogLogPlus aggregation function for distinct count #11346

Merged
merged 3 commits into from
Sep 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pinot.core.common;

import com.clearspring.analytics.stream.cardinality.HyperLogLog;
import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
import com.clearspring.analytics.stream.cardinality.RegisterSet;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -139,7 +140,9 @@ public enum ObjectType {
KllDataSketch(36),
IntegerTupleSketch(37),
FrequentStringsSketch(38),
FrequentLongsSketch(39);
FrequentLongsSketch(39),
HyperLogLogPlus(40);


private final int _value;

Expand Down Expand Up @@ -235,6 +238,8 @@ public static ObjectType getObjectType(Object value) {
return ObjectType.FrequentStringsSketch;
} else if (value instanceof LongsSketch) {
return ObjectType.FrequentLongsSketch;
} else if (value instanceof HyperLogLogPlus) {
return ObjectType.HyperLogLogPlus;
} else {
throw new IllegalArgumentException("Unsupported type of value: " + value.getClass().getSimpleName());
}
Expand Down Expand Up @@ -563,6 +568,34 @@ private HyperLogLog deserialize(IntBuffer intBuffer) {
}
};

public static final ObjectSerDe<HyperLogLogPlus> HYPER_LOG_LOG_PLUS_SER_DE = new ObjectSerDe<HyperLogLogPlus>() {

@Override
public byte[] serialize(HyperLogLogPlus hyperLogLogPlus) {
try {
return hyperLogLogPlus.getBytes();
} catch (IOException e) {
throw new RuntimeException("Caught exception while serializing HyperLogLogPlus", e);
}
}

@Override
public HyperLogLogPlus deserialize(byte[] bytes) {
try {
return HyperLogLogPlus.Builder.build(bytes);
} catch (IOException e) {
throw new RuntimeException("Caught exception while serializing HyperLogLogPlus", e);
}
}

@Override
public HyperLogLogPlus deserialize(ByteBuffer byteBuffer) {
byte[] bytes = new byte[byteBuffer.remaining()];
byteBuffer.get(bytes);
return deserialize(bytes);
}
};

public static final ObjectSerDe<DistinctTable> DISTINCT_TABLE_SER_DE = new ObjectSerDe<DistinctTable>() {

@Override
Expand Down Expand Up @@ -1377,6 +1410,7 @@ public LongsSketch deserialize(ByteBuffer byteBuffer) {
DATA_SKETCH_INT_TUPLE_SER_DE,
FREQUENT_STRINGS_SKETCH_SER_DE,
FREQUENT_LONGS_SKETCH_SER_DE,
HYPER_LOG_LOG_PLUS_SER_DE,
};
//@formatter:on

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pinot.core.operator.query;

import com.clearspring.analytics.stream.cardinality.HyperLogLog;
import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
import it.unimi.dsi.fastutil.doubles.DoubleOpenHashSet;
import it.unimi.dsi.fastutil.floats.FloatOpenHashSet;
import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
Expand All @@ -36,7 +37,9 @@
import org.apache.pinot.core.operator.blocks.results.AggregationResultsBlock;
import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
import org.apache.pinot.core.query.aggregation.function.DistinctCountHLLAggregationFunction;
import org.apache.pinot.core.query.aggregation.function.DistinctCountHLLPlusAggregationFunction;
import org.apache.pinot.core.query.aggregation.function.DistinctCountRawHLLAggregationFunction;
import org.apache.pinot.core.query.aggregation.function.DistinctCountRawHLLPlusAggregationFunction;
import org.apache.pinot.core.query.aggregation.function.DistinctCountSmartHLLAggregationFunction;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.segment.local.customobject.MinMaxRangePair;
Expand All @@ -50,8 +53,8 @@
* Aggregation operator that utilizes dictionary or column metadata for serving aggregation queries to avoid scanning.
* The scanless operator is selected in the plan maker, if the query is of aggregation type min, max, minmaxrange,
* distinctcount, distinctcounthll, distinctcountrawhll, segmentpartitioneddistinctcount, distinctcountsmarthll,
* and the column has a dictionary, or has column metadata with min and max value defined. It also supports count(*) if
* the query has no filter.
* distinctcounthllplus, distinctcountrawhllplus, and the column has a dictionary, or has column metadata with min and
* max value defined. It also supports count(*) if the query has no filter.
* We don't use this operator if the segment has star tree,
* as the dictionary will have aggregated values for the metrics, and dimensions will have star node value.
*
Expand Down Expand Up @@ -118,6 +121,17 @@ protected AggregationResultsBlock getNextBlock() {
result = getDistinctCountHLLResult(Objects.requireNonNull(dataSource.getDictionary()),
((DistinctCountRawHLLAggregationFunction) aggregationFunction).getDistinctCountHLLAggregationFunction());
break;
case DISTINCTCOUNTHLLPLUS:
case DISTINCTCOUNTHLLPLUSMV:
result = getDistinctCountHLLPlusResult(Objects.requireNonNull(dataSource.getDictionary()),
(DistinctCountHLLPlusAggregationFunction) aggregationFunction);
break;
case DISTINCTCOUNTRAWHLLPLUS:
case DISTINCTCOUNTRAWHLLPLUSMV:
result = getDistinctCountHLLPlusResult(Objects.requireNonNull(dataSource.getDictionary()),
((DistinctCountRawHLLPlusAggregationFunction) aggregationFunction)
.getDistinctCountHLLPlusAggregationFunction());
break;
case SEGMENTPARTITIONEDDISTINCTCOUNT:
result = (long) Objects.requireNonNull(dataSource.getDictionary()).length();
break;
Expand Down Expand Up @@ -215,6 +229,15 @@ private static HyperLogLog getDistinctValueHLL(Dictionary dictionary, int log2m)
return hll;
}

private static HyperLogLogPlus getDistinctValueHLLPlus(Dictionary dictionary, int p, int sp) {
HyperLogLogPlus hllPlus = new HyperLogLogPlus(p, sp);
int length = dictionary.length();
for (int i = 0; i < length; i++) {
hllPlus.offer(dictionary.get(i));
}
return hllPlus;
}

private static HyperLogLog getDistinctCountHLLResult(Dictionary dictionary,
DistinctCountHLLAggregationFunction function) {
if (dictionary.getValueType() == FieldSpec.DataType.BYTES) {
Expand All @@ -234,6 +257,25 @@ private static HyperLogLog getDistinctCountHLLResult(Dictionary dictionary,
}
}

private static HyperLogLogPlus getDistinctCountHLLPlusResult(Dictionary dictionary,
DistinctCountHLLPlusAggregationFunction function) {
if (dictionary.getValueType() == FieldSpec.DataType.BYTES) {
// Treat BYTES value as serialized HyperLogLogPlus
try {
HyperLogLogPlus hllplus = ObjectSerDeUtils.HYPER_LOG_LOG_PLUS_SER_DE.deserialize(dictionary.getBytesValue(0));
int length = dictionary.length();
for (int i = 1; i < length; i++) {
hllplus.addAll(ObjectSerDeUtils.HYPER_LOG_LOG_PLUS_SER_DE.deserialize(dictionary.getBytesValue(i)));
}
return hllplus;
} catch (Exception e) {
throw new RuntimeException("Caught exception while merging HyperLogLogPluses", e);
}
} else {
return getDistinctValueHLLPlus(dictionary, function.getP(), function.getSp());
}
}

private static Object getDistinctCountSmartHLLResult(Dictionary dictionary,
DistinctCountSmartHLLAggregationFunction function) {
if (dictionary.length() > function.getThreshold()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ public class AggregationPlanNode implements PlanNode {
private static final EnumSet<AggregationFunctionType> DICTIONARY_BASED_FUNCTIONS =
EnumSet.of(MIN, MINMV, MAX, MAXMV, MINMAXRANGE, MINMAXRANGEMV, DISTINCTCOUNT, DISTINCTCOUNTMV, DISTINCTCOUNTHLL,
DISTINCTCOUNTHLLMV, DISTINCTCOUNTRAWHLL, DISTINCTCOUNTRAWHLLMV, SEGMENTPARTITIONEDDISTINCTCOUNT,
DISTINCTCOUNTSMARTHLL, DISTINCTSUM, DISTINCTAVG, DISTINCTSUMMV, DISTINCTAVGMV);
DISTINCTCOUNTSMARTHLL, DISTINCTSUM, DISTINCTAVG, DISTINCTSUMMV, DISTINCTAVGMV, DISTINCTCOUNTHLLPLUS,
DISTINCTCOUNTHLLPLUSMV, DISTINCTCOUNTRAWHLLPLUS, DISTINCTCOUNTRAWHLLPLUSMV);

// DISTINCTCOUNT excluded because consuming segment metadata contains unknown cardinality when there is no dictionary
private static final EnumSet<AggregationFunctionType> METADATA_BASED_FUNCTIONS =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,14 @@ public static AggregationFunction getAggregationFunction(FunctionContext functio
return new DistinctCountHLLMVAggregationFunction(arguments);
case DISTINCTCOUNTRAWHLLMV:
return new DistinctCountRawHLLMVAggregationFunction(arguments);
case DISTINCTCOUNTHLLPLUS:
return new DistinctCountHLLPlusAggregationFunction(arguments);
case DISTINCTCOUNTRAWHLLPLUS:
return new DistinctCountRawHLLPlusAggregationFunction(arguments);
case DISTINCTCOUNTHLLPLUSMV:
return new DistinctCountHLLPlusMVAggregationFunction(arguments);
case DISTINCTCOUNTRAWHLLPLUSMV:
return new DistinctCountRawHLLPlusMVAggregationFunction(arguments);
case DISTINCTSUMMV:
return new DistinctSumMVAggregationFunction(arguments);
case DISTINCTAVGMV:
Expand Down
Loading
Loading