Skip to content

Commit

Permalink
Add hyperLogLogPlus aggregation function for distinct count (#11346)
Browse files Browse the repository at this point in the history
* Add hyperLogLogPlus aggregation function for distinct count

* address code comments

* address code comments
  • Loading branch information
deemoliu authored Sep 21, 2023
1 parent 3dce808 commit 3501b86
Show file tree
Hide file tree
Showing 21 changed files with 1,613 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pinot.core.common;

import com.clearspring.analytics.stream.cardinality.HyperLogLog;
import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
import com.clearspring.analytics.stream.cardinality.RegisterSet;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -139,7 +140,9 @@ public enum ObjectType {
KllDataSketch(36),
IntegerTupleSketch(37),
FrequentStringsSketch(38),
FrequentLongsSketch(39);
FrequentLongsSketch(39),
HyperLogLogPlus(40);


private final int _value;

Expand Down Expand Up @@ -235,6 +238,8 @@ public static ObjectType getObjectType(Object value) {
return ObjectType.FrequentStringsSketch;
} else if (value instanceof LongsSketch) {
return ObjectType.FrequentLongsSketch;
} else if (value instanceof HyperLogLogPlus) {
return ObjectType.HyperLogLogPlus;
} else {
throw new IllegalArgumentException("Unsupported type of value: " + value.getClass().getSimpleName());
}
Expand Down Expand Up @@ -563,6 +568,34 @@ private HyperLogLog deserialize(IntBuffer intBuffer) {
}
};

public static final ObjectSerDe<HyperLogLogPlus> HYPER_LOG_LOG_PLUS_SER_DE = new ObjectSerDe<HyperLogLogPlus>() {

@Override
public byte[] serialize(HyperLogLogPlus hyperLogLogPlus) {
try {
return hyperLogLogPlus.getBytes();
} catch (IOException e) {
throw new RuntimeException("Caught exception while serializing HyperLogLogPlus", e);
}
}

@Override
public HyperLogLogPlus deserialize(byte[] bytes) {
try {
return HyperLogLogPlus.Builder.build(bytes);
} catch (IOException e) {
throw new RuntimeException("Caught exception while serializing HyperLogLogPlus", e);
}
}

@Override
public HyperLogLogPlus deserialize(ByteBuffer byteBuffer) {
byte[] bytes = new byte[byteBuffer.remaining()];
byteBuffer.get(bytes);
return deserialize(bytes);
}
};

public static final ObjectSerDe<DistinctTable> DISTINCT_TABLE_SER_DE = new ObjectSerDe<DistinctTable>() {

@Override
Expand Down Expand Up @@ -1377,6 +1410,7 @@ public LongsSketch deserialize(ByteBuffer byteBuffer) {
DATA_SKETCH_INT_TUPLE_SER_DE,
FREQUENT_STRINGS_SKETCH_SER_DE,
FREQUENT_LONGS_SKETCH_SER_DE,
HYPER_LOG_LOG_PLUS_SER_DE,
};
//@formatter:on

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pinot.core.operator.query;

import com.clearspring.analytics.stream.cardinality.HyperLogLog;
import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
import it.unimi.dsi.fastutil.doubles.DoubleOpenHashSet;
import it.unimi.dsi.fastutil.floats.FloatOpenHashSet;
import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
Expand All @@ -36,7 +37,9 @@
import org.apache.pinot.core.operator.blocks.results.AggregationResultsBlock;
import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
import org.apache.pinot.core.query.aggregation.function.DistinctCountHLLAggregationFunction;
import org.apache.pinot.core.query.aggregation.function.DistinctCountHLLPlusAggregationFunction;
import org.apache.pinot.core.query.aggregation.function.DistinctCountRawHLLAggregationFunction;
import org.apache.pinot.core.query.aggregation.function.DistinctCountRawHLLPlusAggregationFunction;
import org.apache.pinot.core.query.aggregation.function.DistinctCountSmartHLLAggregationFunction;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.segment.local.customobject.MinMaxRangePair;
Expand All @@ -50,8 +53,8 @@
* Aggregation operator that utilizes dictionary or column metadata for serving aggregation queries to avoid scanning.
* The scanless operator is selected in the plan maker, if the query is of aggregation type min, max, minmaxrange,
* distinctcount, distinctcounthll, distinctcountrawhll, segmentpartitioneddistinctcount, distinctcountsmarthll,
* and the column has a dictionary, or has column metadata with min and max value defined. It also supports count(*) if
* the query has no filter.
* distinctcounthllplus, distinctcountrawhllplus, and the column has a dictionary, or has column metadata with min and
* max value defined. It also supports count(*) if the query has no filter.
* We don't use this operator if the segment has star tree,
* as the dictionary will have aggregated values for the metrics, and dimensions will have star node value.
*
Expand Down Expand Up @@ -118,6 +121,17 @@ protected AggregationResultsBlock getNextBlock() {
result = getDistinctCountHLLResult(Objects.requireNonNull(dataSource.getDictionary()),
((DistinctCountRawHLLAggregationFunction) aggregationFunction).getDistinctCountHLLAggregationFunction());
break;
case DISTINCTCOUNTHLLPLUS:
case DISTINCTCOUNTHLLPLUSMV:
result = getDistinctCountHLLPlusResult(Objects.requireNonNull(dataSource.getDictionary()),
(DistinctCountHLLPlusAggregationFunction) aggregationFunction);
break;
case DISTINCTCOUNTRAWHLLPLUS:
case DISTINCTCOUNTRAWHLLPLUSMV:
result = getDistinctCountHLLPlusResult(Objects.requireNonNull(dataSource.getDictionary()),
((DistinctCountRawHLLPlusAggregationFunction) aggregationFunction)
.getDistinctCountHLLPlusAggregationFunction());
break;
case SEGMENTPARTITIONEDDISTINCTCOUNT:
result = (long) Objects.requireNonNull(dataSource.getDictionary()).length();
break;
Expand Down Expand Up @@ -215,6 +229,15 @@ private static HyperLogLog getDistinctValueHLL(Dictionary dictionary, int log2m)
return hll;
}

private static HyperLogLogPlus getDistinctValueHLLPlus(Dictionary dictionary, int p, int sp) {
HyperLogLogPlus hllPlus = new HyperLogLogPlus(p, sp);
int length = dictionary.length();
for (int i = 0; i < length; i++) {
hllPlus.offer(dictionary.get(i));
}
return hllPlus;
}

private static HyperLogLog getDistinctCountHLLResult(Dictionary dictionary,
DistinctCountHLLAggregationFunction function) {
if (dictionary.getValueType() == FieldSpec.DataType.BYTES) {
Expand All @@ -234,6 +257,25 @@ private static HyperLogLog getDistinctCountHLLResult(Dictionary dictionary,
}
}

private static HyperLogLogPlus getDistinctCountHLLPlusResult(Dictionary dictionary,
DistinctCountHLLPlusAggregationFunction function) {
if (dictionary.getValueType() == FieldSpec.DataType.BYTES) {
// Treat BYTES value as serialized HyperLogLogPlus
try {
HyperLogLogPlus hllplus = ObjectSerDeUtils.HYPER_LOG_LOG_PLUS_SER_DE.deserialize(dictionary.getBytesValue(0));
int length = dictionary.length();
for (int i = 1; i < length; i++) {
hllplus.addAll(ObjectSerDeUtils.HYPER_LOG_LOG_PLUS_SER_DE.deserialize(dictionary.getBytesValue(i)));
}
return hllplus;
} catch (Exception e) {
throw new RuntimeException("Caught exception while merging HyperLogLogPluses", e);
}
} else {
return getDistinctValueHLLPlus(dictionary, function.getP(), function.getSp());
}
}

private static Object getDistinctCountSmartHLLResult(Dictionary dictionary,
DistinctCountSmartHLLAggregationFunction function) {
if (dictionary.length() > function.getThreshold()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ public class AggregationPlanNode implements PlanNode {
private static final EnumSet<AggregationFunctionType> DICTIONARY_BASED_FUNCTIONS =
EnumSet.of(MIN, MINMV, MAX, MAXMV, MINMAXRANGE, MINMAXRANGEMV, DISTINCTCOUNT, DISTINCTCOUNTMV, DISTINCTCOUNTHLL,
DISTINCTCOUNTHLLMV, DISTINCTCOUNTRAWHLL, DISTINCTCOUNTRAWHLLMV, SEGMENTPARTITIONEDDISTINCTCOUNT,
DISTINCTCOUNTSMARTHLL, DISTINCTSUM, DISTINCTAVG, DISTINCTSUMMV, DISTINCTAVGMV);
DISTINCTCOUNTSMARTHLL, DISTINCTSUM, DISTINCTAVG, DISTINCTSUMMV, DISTINCTAVGMV, DISTINCTCOUNTHLLPLUS,
DISTINCTCOUNTHLLPLUSMV, DISTINCTCOUNTRAWHLLPLUS, DISTINCTCOUNTRAWHLLPLUSMV);

// DISTINCTCOUNT excluded because consuming segment metadata contains unknown cardinality when there is no dictionary
private static final EnumSet<AggregationFunctionType> METADATA_BASED_FUNCTIONS =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,14 @@ public static AggregationFunction getAggregationFunction(FunctionContext functio
return new DistinctCountHLLMVAggregationFunction(arguments);
case DISTINCTCOUNTRAWHLLMV:
return new DistinctCountRawHLLMVAggregationFunction(arguments);
case DISTINCTCOUNTHLLPLUS:
return new DistinctCountHLLPlusAggregationFunction(arguments);
case DISTINCTCOUNTRAWHLLPLUS:
return new DistinctCountRawHLLPlusAggregationFunction(arguments);
case DISTINCTCOUNTHLLPLUSMV:
return new DistinctCountHLLPlusMVAggregationFunction(arguments);
case DISTINCTCOUNTRAWHLLPLUSMV:
return new DistinctCountRawHLLPlusMVAggregationFunction(arguments);
case DISTINCTSUMMV:
return new DistinctSumMVAggregationFunction(arguments);
case DISTINCTAVGMV:
Expand Down
Loading

0 comments on commit 3501b86

Please sign in to comment.