diff --git a/src/AggregateFunctions/Streaming/AggregateFunctionAny.cpp b/src/AggregateFunctions/Streaming/AggregateFunctionAny.cpp new file mode 100644 index 00000000000..dceddff462b --- /dev/null +++ b/src/AggregateFunctions/Streaming/AggregateFunctionAny.cpp @@ -0,0 +1,41 @@ +#include +#include +#include + +#include + +namespace DB +{ +namespace Streaming +{ +AggregateFunctionPtr createAggregateFunctionAnyLast( + const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings * settings) +{ + return AggregateFunctionPtr(createAggregateFunctionSingleValue( + name, argument_types, parameters, settings)); +} + +AggregateFunctionPtr +createAggregateFunctionAny(const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings * settings) +{ + return AggregateFunctionPtr(createAggregateFunctionSingleValue( + name, argument_types, parameters, settings)); +} + +void registerAggregateFunctionsAnyRetract(AggregateFunctionFactory & factory) +{ + /// FIXME:actually there is no retraction at all, we just ignore the data whose delta is -1 + AggregateFunctionProperties properties = {.returns_default_when_only_null = false, .is_order_dependent = true}; + factory.registerFunction("any_last_retract", {createAggregateFunctionAnyLast, properties}); + factory.registerFunction("any_retract", {createAggregateFunctionAny, properties}); + + factory.registerFunction("first_value_retract", {createAggregateFunctionAny, properties}, AggregateFunctionFactory::CaseInsensitive); + + factory.registerFunction("last_value_retract", {createAggregateFunctionAnyLast, properties}, AggregateFunctionFactory::CaseInsensitive); + + factory.registerFunction("earliest_retract", {createAggregateFunctionAny, properties}, AggregateFunctionFactory::CaseInsensitive); + + factory.registerFunction("latest_retract", {createAggregateFunctionAnyLast, properties}, AggregateFunctionFactory::CaseInsensitive); +} +} +} diff --git a/src/AggregateFunctions/Streaming/AggregateFunctionArgMinMax.h b/src/AggregateFunctions/Streaming/AggregateFunctionArgMinMax.h index f468e4cc9d7..61dc32b0171 100644 --- a/src/AggregateFunctions/Streaming/AggregateFunctionArgMinMax.h +++ b/src/AggregateFunctions/Streaming/AggregateFunctionArgMinMax.h @@ -18,7 +18,7 @@ extern const int ILLEGAL_TYPE_OF_ARGUMENT; namespace Streaming { /// Returns the first arg value found for the minimum/maximum value. Example: arg_max(arg, value). -/// There are similar problems and trade off as stated in AggregateFunctionsCountedValue in AggregateFunctionMinMax.h +/// There are similar problems and trade off as stated in AggregateFunctionsCountedValue in AggregateFunctionMinMaxAny.h /// In changelog mode, we need not only keep track the unique min/max value, but also need keep track the unique argument /// for each unique value. The following is one example how we keep tracking this in data structure /// For max sequence (value, arg, delta) and `retract_max = 3`: diff --git a/src/AggregateFunctions/Streaming/AggregateFunctionMax.cpp b/src/AggregateFunctions/Streaming/AggregateFunctionMax.cpp index 3c55686060b..523d2b9126d 100644 --- a/src/AggregateFunctions/Streaming/AggregateFunctionMax.cpp +++ b/src/AggregateFunctions/Streaming/AggregateFunctionMax.cpp @@ -1,4 +1,4 @@ -#include "HelpersMinMax.h" +#include "HelpersMinMaxAny.h" #include diff --git a/src/AggregateFunctions/Streaming/AggregateFunctionMin.cpp b/src/AggregateFunctions/Streaming/AggregateFunctionMin.cpp index 4568f1a4d72..c743fbb7dd1 100644 --- a/src/AggregateFunctions/Streaming/AggregateFunctionMin.cpp +++ b/src/AggregateFunctions/Streaming/AggregateFunctionMin.cpp @@ -1,4 +1,4 @@ -#include "HelpersMinMax.h" +#include "HelpersMinMaxAny.h" #include diff --git a/src/AggregateFunctions/Streaming/AggregateFunctionMinMax.h b/src/AggregateFunctions/Streaming/AggregateFunctionMinMax.h deleted file mode 100644 index b3c332c882a..00000000000 --- a/src/AggregateFunctions/Streaming/AggregateFunctionMinMax.h +++ /dev/null @@ -1,953 +0,0 @@ -#pragma once - -#include "CountedValueMap.h" - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include "config.h" - -#if USE_EMBEDDED_COMPILER -# include -# include -#endif - -namespace DB -{ -struct Settings; - -namespace ErrorCodes -{ -extern const int ILLEGAL_TYPE_OF_ARGUMENT; -extern const int NOT_IMPLEMENTED; -} - -namespace Streaming -{ -/** Aggregate functions that store one of passed values. - * For example: min, max, any, any_last. - */ - -/// For numeric values. -template -struct CountedValuesDataFixed -{ -private: - using Self = CountedValuesDataFixed; - using ColVecType = ColumnVectorOrDecimal; - - CountedValueMap values; - -public: - CountedValuesDataFixed() : CountedValuesDataFixed(1) { } - explicit CountedValuesDataFixed(int64_t size) : values(size) { } - - static constexpr bool is_nullable = false; - - bool has() const { return !values.empty(); } - - void insertResultInto(IColumn & to) const - { - if (has()) - assert_cast(to).getData().push_back(values.firstValue()); - else - assert_cast(to).insertDefault(); - } - - void write(WriteBuffer & buf, const ISerialization & /*serialization*/) const - { - /// Write size first - writeVarInt(values.size(), buf); - - /// Then write value / count - for (auto [val, count] : values) - { - writeBinary(val, buf); - writeVarUInt(count, buf); - } - } - - void read(ReadBuffer & buf, const ISerialization & /*serialization*/, Arena *) - { - Int64 size = 0; - readVarInt(size, buf); - - assert(size >= 0); - - values.setCapacity(std::max(size, values.capacity())); - - for (Int64 i = 0; i < size; ++i) - { - T value; - UInt32 count; - - readBinary(value, buf); - readVarUInt(count, buf); - - [[maybe_unused]] auto inserted = values.insert(std::move(value), count); - assert(inserted); - } - } - - bool add(const IColumn & column, size_t row_num, Arena *) - { - return values.insert(assert_cast(column).getData()[row_num]); - } - - bool negate(const IColumn & column, size_t row_num, Arena *) - { - return values.erase(assert_cast(column).getData()[row_num]); - } - - bool merge(const Self & rhs, Arena *) - { - values.merge(rhs.values); - return true; - } - - static bool allocatesMemoryInArena() { return false; } - -#if 0 -# if USE_EMBEDDED_COMPILER - - static constexpr bool is_compilable = true; - - static llvm::Value * getValuePtrFromAggregateDataPtr(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_ptr) - { - llvm::IRBuilder<> & b = static_cast &>(builder); - - static constexpr size_t value_offset_from_structure = offsetof(CountedValuesDataFixed, value); - - auto * type = toNativeType(builder); - auto * value_ptr_with_offset = b.CreateConstInBoundsGEP1_64(nullptr, aggregate_data_ptr, value_offset_from_structure); - auto * value_ptr = b.CreatePointerCast(value_ptr_with_offset, type->getPointerTo()); - - return value_ptr; - } - - static llvm::Value * getValueFromAggregateDataPtr(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_ptr) - { - llvm::IRBuilder<> & b = static_cast &>(builder); - - auto * type = toNativeType(builder); - auto * value_ptr = getValuePtrFromAggregateDataPtr(builder, aggregate_data_ptr); - - return b.CreateLoad(type, value_ptr); - } - - static void compileChange(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_ptr, llvm::Value * value_to_check) - { - llvm::IRBuilder<> & b = static_cast &>(builder); - - auto * has_value_ptr = b.CreatePointerCast(aggregate_data_ptr, b.getInt1Ty()->getPointerTo()); - b.CreateStore(b.getInt1(true), has_value_ptr); - - auto * value_ptr = getValuePtrFromAggregateDataPtr(b, aggregate_data_ptr); - b.CreateStore(value_to_check, value_ptr); - } - - static void - compileChangeMerge(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_dst_ptr, llvm::Value * aggregate_data_src_ptr) - { - auto * value_src = getValueFromAggregateDataPtr(builder, aggregate_data_src_ptr); - - compileChange(builder, aggregate_data_dst_ptr, value_src); - } - - static void compileChangeFirstTime(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_ptr, llvm::Value * value_to_check) - { - llvm::IRBuilder<> & b = static_cast &>(builder); - - auto * has_value_ptr = b.CreatePointerCast(aggregate_data_ptr, b.getInt1Ty()->getPointerTo()); - auto * has_value_value = b.CreateLoad(b.getInt1Ty(), has_value_ptr); - - auto * head = b.GetInsertBlock(); - - auto * join_block = llvm::BasicBlock::Create(head->getContext(), "join_block", head->getParent()); - auto * if_should_change = llvm::BasicBlock::Create(head->getContext(), "if_should_change", head->getParent()); - auto * if_should_not_change = llvm::BasicBlock::Create(head->getContext(), "if_should_not_change", head->getParent()); - - b.CreateCondBr(has_value_value, if_should_not_change, if_should_change); - - b.SetInsertPoint(if_should_not_change); - b.CreateBr(join_block); - - b.SetInsertPoint(if_should_change); - compileChange(builder, aggregate_data_ptr, value_to_check); - b.CreateBr(join_block); - - b.SetInsertPoint(join_block); - } - - static void - compileChangeFirstTimeMerge(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_dst_ptr, llvm::Value * aggregate_data_src_ptr) - { - llvm::IRBuilder<> & b = static_cast &>(builder); - - auto * has_value_dst_ptr = b.CreatePointerCast(aggregate_data_dst_ptr, b.getInt1Ty()->getPointerTo()); - auto * has_value_dst = b.CreateLoad(b.getInt1Ty(), has_value_dst_ptr); - - auto * has_value_src_ptr = b.CreatePointerCast(aggregate_data_src_ptr, b.getInt1Ty()->getPointerTo()); - auto * has_value_src = b.CreateLoad(b.getInt1Ty(), has_value_src_ptr); - - auto * head = b.GetInsertBlock(); - - auto * join_block = llvm::BasicBlock::Create(head->getContext(), "join_block", head->getParent()); - auto * if_should_change = llvm::BasicBlock::Create(head->getContext(), "if_should_change", head->getParent()); - auto * if_should_not_change = llvm::BasicBlock::Create(head->getContext(), "if_should_not_change", head->getParent()); - - b.CreateCondBr(b.CreateAnd(b.CreateNot(has_value_dst), has_value_src), if_should_change, if_should_not_change); - - b.SetInsertPoint(if_should_change); - compileChangeMerge(builder, aggregate_data_dst_ptr, aggregate_data_src_ptr); - b.CreateBr(join_block); - - b.SetInsertPoint(if_should_not_change); - b.CreateBr(join_block); - - b.SetInsertPoint(join_block); - } - - static void compileChangeEveryTime(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_ptr, llvm::Value * value_to_check) - { - compileChange(builder, aggregate_data_ptr, value_to_check); - } - - static void - compileChangeEveryTimeMerge(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_dst_ptr, llvm::Value * aggregate_data_src_ptr) - { - llvm::IRBuilder<> & b = static_cast &>(builder); - - auto * has_value_src_ptr = b.CreatePointerCast(aggregate_data_src_ptr, b.getInt1Ty()->getPointerTo()); - auto * has_value_src = b.CreateLoad(b.getInt1Ty(), has_value_src_ptr); - - auto * head = b.GetInsertBlock(); - - auto * join_block = llvm::BasicBlock::Create(head->getContext(), "join_block", head->getParent()); - auto * if_should_change = llvm::BasicBlock::Create(head->getContext(), "if_should_change", head->getParent()); - auto * if_should_not_change = llvm::BasicBlock::Create(head->getContext(), "if_should_not_change", head->getParent()); - - b.CreateCondBr(has_value_src, if_should_change, if_should_not_change); - - b.SetInsertPoint(if_should_change); - compileChangeMerge(builder, aggregate_data_dst_ptr, aggregate_data_src_ptr); - b.CreateBr(join_block); - - b.SetInsertPoint(if_should_not_change); - b.CreateBr(join_block); - - b.SetInsertPoint(join_block); - } - - template - static void compileChangeComparison(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_ptr, llvm::Value * value_to_check) - { - llvm::IRBuilder<> & b = static_cast &>(builder); - - auto * has_value_ptr = b.CreatePointerCast(aggregate_data_ptr, b.getInt1Ty()->getPointerTo()); - auto * has_value_value = b.CreateLoad(b.getInt1Ty(), has_value_ptr); - - auto * value = getValueFromAggregateDataPtr(b, aggregate_data_ptr); - - auto * head = b.GetInsertBlock(); - - auto * join_block = llvm::BasicBlock::Create(head->getContext(), "join_block", head->getParent()); - auto * if_should_change = llvm::BasicBlock::Create(head->getContext(), "if_should_change", head->getParent()); - auto * if_should_not_change = llvm::BasicBlock::Create(head->getContext(), "if_should_not_change", head->getParent()); - - auto is_signed = std::numeric_limits::is_signed; - - llvm::Value * should_change_after_comparison = nullptr; - - if constexpr (is_less) - { - if (value_to_check->getType()->isIntegerTy()) - should_change_after_comparison - = is_signed ? b.CreateICmpSLT(value_to_check, value) : b.CreateICmpULT(value_to_check, value); - else - should_change_after_comparison = b.CreateFCmpOLT(value_to_check, value); - } - else - { - if (value_to_check->getType()->isIntegerTy()) - should_change_after_comparison - = is_signed ? b.CreateICmpSGT(value_to_check, value) : b.CreateICmpUGT(value_to_check, value); - else - should_change_after_comparison = b.CreateFCmpOGT(value_to_check, value); - } - - b.CreateCondBr(b.CreateOr(b.CreateNot(has_value_value), should_change_after_comparison), if_should_change, if_should_not_change); - - b.SetInsertPoint(if_should_change); - compileChange(builder, aggregate_data_ptr, value_to_check); - b.CreateBr(join_block); - - b.SetInsertPoint(if_should_not_change); - b.CreateBr(join_block); - - b.SetInsertPoint(join_block); - } - - template - static void - compileChangeComparisonMerge(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_dst_ptr, llvm::Value * aggregate_data_src_ptr) - { - llvm::IRBuilder<> & b = static_cast &>(builder); - - auto * has_value_dst_ptr = b.CreatePointerCast(aggregate_data_dst_ptr, b.getInt1Ty()->getPointerTo()); - auto * has_value_dst = b.CreateLoad(b.getInt1Ty(), has_value_dst_ptr); - - auto * value_dst = getValueFromAggregateDataPtr(b, aggregate_data_dst_ptr); - - auto * has_value_src_ptr = b.CreatePointerCast(aggregate_data_src_ptr, b.getInt1Ty()->getPointerTo()); - auto * has_value_src = b.CreateLoad(b.getInt1Ty(), has_value_src_ptr); - - auto * value_src = getValueFromAggregateDataPtr(b, aggregate_data_src_ptr); - - auto * head = b.GetInsertBlock(); - - auto * join_block = llvm::BasicBlock::Create(head->getContext(), "join_block", head->getParent()); - auto * if_should_change = llvm::BasicBlock::Create(head->getContext(), "if_should_change", head->getParent()); - auto * if_should_not_change = llvm::BasicBlock::Create(head->getContext(), "if_should_not_change", head->getParent()); - - auto is_signed = std::numeric_limits::is_signed; - - llvm::Value * should_change_after_comparison = nullptr; - - if constexpr (is_less) - { - if (value_src->getType()->isIntegerTy()) - should_change_after_comparison = is_signed ? b.CreateICmpSLT(value_src, value_dst) : b.CreateICmpULT(value_src, value_dst); - else - should_change_after_comparison = b.CreateFCmpOLT(value_src, value_dst); - } - else - { - if (value_src->getType()->isIntegerTy()) - should_change_after_comparison = is_signed ? b.CreateICmpSGT(value_src, value_dst) : b.CreateICmpUGT(value_src, value_dst); - else - should_change_after_comparison = b.CreateFCmpOGT(value_src, value_dst); - } - - b.CreateCondBr( - b.CreateAnd(has_value_src, b.CreateOr(b.CreateNot(has_value_dst), should_change_after_comparison)), - if_should_change, - if_should_not_change); - - b.SetInsertPoint(if_should_change); - compileChangeMerge(builder, aggregate_data_dst_ptr, aggregate_data_src_ptr); - b.CreateBr(join_block); - - b.SetInsertPoint(if_should_not_change); - b.CreateBr(join_block); - - b.SetInsertPoint(join_block); - } - - static void compileChangeIfLess(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_ptr, llvm::Value * value_to_check) - { - static constexpr bool is_less = true; - compileChangeComparison(builder, aggregate_data_ptr, value_to_check); - } - - static void - compileChangeIfLessMerge(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_dst_ptr, llvm::Value * aggregate_data_src_ptr) - { - static constexpr bool is_less = true; - compileChangeComparisonMerge(builder, aggregate_data_dst_ptr, aggregate_data_src_ptr); - } - - static void compileChangeIfGreater(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_ptr, llvm::Value * value_to_check) - { - static constexpr bool is_less = false; - compileChangeComparison(builder, aggregate_data_ptr, value_to_check); - } - - static void - compileChangeIfGreaterMerge(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_dst_ptr, llvm::Value * aggregate_data_src_ptr) - { - static constexpr bool is_less = false; - compileChangeComparisonMerge(builder, aggregate_data_dst_ptr, aggregate_data_src_ptr); - } - - static llvm::Value * compileGetResult(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_ptr) - { - return getValueFromAggregateDataPtr(builder, aggregate_data_ptr); - } - -# endif -#endif -}; - -template -struct CountedValuesDataString -{ -private: - using Self = CountedValuesDataString; - using ColVecType = ColumnString; - - CountedValueMap values; - -public: - CountedValuesDataString() : CountedValuesDataString(1) { } - explicit CountedValuesDataString(int64_t size) : values(size) { } - - static constexpr bool is_nullable = false; - - bool has() const { return !values.empty(); } - - void insertResultInto(IColumn & to) const - { - if (has()) - { - const auto & v = values.firstValue(); - assert_cast(to).insertData(v.data(), v.size()); - } - else - assert_cast(to).insertDefault(); - } - - void write(WriteBuffer & buf, const ISerialization & /*serialization*/) const - { - /// Write size first - writeVarInt(values.size(), buf); - - /// Then write value / count - for (const auto & [val, count] : values) - { - writeBinary(val, buf); - writeVarUInt(count, buf); - } - } - - void read(ReadBuffer & buf, const ISerialization & /*serialization*/, Arena *) - { - Int64 size = 0; - readVarInt(size, buf); - - values.setCapacity(std::max(size, values.capacity())); - - for (Int64 i = 0; i < size; ++i) - { - String value; - UInt32 count; - - readBinary(value, buf); - readVarUInt(count, buf); - - [[maybe_unused]] auto inserted = values.insert(std::move(value), count); - assert(inserted); - } - } - - bool add(const IColumn & column, size_t row_num, Arena *) - { - return values.insert(assert_cast(column).getDataAt(row_num).toString()); - } - - bool negate(const IColumn & column, size_t row_num, Arena *) - { - return values.erase(assert_cast(column).getDataAt(row_num).toView()); - } - - bool merge(const Self & rhs, Arena *) - { - values.merge(rhs.values); - return true; - } - - static bool allocatesMemoryInArena() { return false; } - -#if USE_EMBEDDED_COMPILER - - static constexpr bool is_compilable = false; - -#endif -}; - -template -struct CountedValuesDataGeneric -{ -private: - using Self = CountedValuesDataGeneric; - - CountedValueMap values; - -public: - CountedValuesDataGeneric() : CountedValuesDataGeneric(1) { } - explicit CountedValuesDataGeneric(int64_t size) : values(size) { } - - static constexpr bool is_nullable = false; - - bool has() const { return !values.empty(); } - - void insertResultInto(IColumn & to) const - { - if (has()) - to.insert(values.firstValue()); - else - to.insertDefault(); - } - - void write(WriteBuffer & buf, const ISerialization & serialization) const - { - /// Write size first - writeVarInt(values.size(), buf); - - /// Then write value / count - for (const auto & [val, count] : values) - { - serialization.serializeBinary(val, buf, {}); - writeVarUInt(count, buf); - } - } - - void read(ReadBuffer & buf, const ISerialization & serialization, Arena *) - { - Int64 size = 0; - readVarInt(size, buf); - - values.setCapacity(std::max(size, values.capacity())); - - for (Int64 i = 0; i < size; ++i) - { - Field value; - UInt32 count; - - serialization.deserializeBinary(value, buf, {}); - readVarUInt(count, buf); - - [[maybe_unused]] auto inserted = values.insert(std::move(value), count); - assert(inserted); - } - } - - bool add(const IColumn & column, size_t row_num, Arena *) { return values.insert(column[row_num]); } - - bool negate(const IColumn & column, size_t row_num, Arena *) { return values.erase(column[row_num]); } - - bool merge(const Self & rhs, Arena *) - { - values.merge(rhs.values); - return true; - } - - static bool allocatesMemoryInArena() { return false; } - -#if USE_EMBEDDED_COMPILER - - static constexpr bool is_compilable = false; - -#endif -}; - -/** What is the difference between the aggregate functions min, max, any, any_last - * (the condition that the stored value is replaced by a new one, - * as well as, of course, the name). - */ - -template -struct AggregateFunctionMinData : Data -{ - using Self = AggregateFunctionMinData; - - using Data::Data; - - static const char * name() { return "min"; } - -#if 0 -# if USE_EMBEDDED_COMPILER - - static constexpr bool is_compilable = Data::is_compilable; - - static void compileChangeIfBetter(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_ptr, llvm::Value * value_to_check) - { - Data::compileChangeIfLess(builder, aggregate_data_ptr, value_to_check); - } - - static void - compileChangeIfBetterMerge(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_dst_ptr, llvm::Value * aggregate_data_src_ptr) - { - Data::compileChangeIfLessMerge(builder, aggregate_data_dst_ptr, aggregate_data_src_ptr); - } - -# endif -#endif -}; - -template -struct AggregateFunctionMaxData : Data -{ - using Self = AggregateFunctionMaxData; - - using Data::Data; - - static const char * name() { return "max"; } - -#if 0 -# if USE_EMBEDDED_COMPILER - - static constexpr bool is_compilable = Data::is_compilable; - - static void compileChangeIfBetter(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_ptr, llvm::Value * value_to_check) - { - Data::compileChangeIfGreater(builder, aggregate_data_ptr, value_to_check); - } - - static void - compileChangeIfBetterMerge(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_dst_ptr, llvm::Value * aggregate_data_src_ptr) - { - Data::compileChangeIfGreaterMerge(builder, aggregate_data_dst_ptr, aggregate_data_src_ptr); - } - -# endif -#endif -}; - -template -struct AggregateFunctionAnyData : Data -{ - using Self = AggregateFunctionAnyData; - - bool changeIfBetter(const IColumn & column, size_t row_num, Arena * arena) { return this->changeFirstTime(column, row_num, arena); } - bool changeIfBetter(const Self & to, Arena * arena) { return this->changeFirstTime(to, arena); } - - static const char * name() { return "any"; } - -#if 0 -# if USE_EMBEDDED_COMPILER - - static constexpr bool is_compilable = Data::is_compilable; - - static void compileChangeIfBetter(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_ptr, llvm::Value * value_to_check) - { - Data::compileChangeFirstTime(builder, aggregate_data_ptr, value_to_check); - } - - static void - compileChangeIfBetterMerge(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_dst_ptr, llvm::Value * aggregate_data_src_ptr) - { - Data::compileChangeFirstTimeMerge(builder, aggregate_data_dst_ptr, aggregate_data_src_ptr); - } - -# endif -#endif -}; - -template -struct AggregateFunctionAnyLastData : Data -{ - using Self = AggregateFunctionAnyLastData; - - bool changeIfBetter(const IColumn & column, size_t row_num, Arena * arena) { return this->changeEveryTime(column, row_num, arena); } - bool changeIfBetter(const Self & to, Arena * arena) { return this->changeEveryTime(to, arena); } - - static const char * name() { return "any_last"; } - -#if 0 -# if USE_EMBEDDED_COMPILER - - static constexpr bool is_compilable = Data::is_compilable; - - static void compileChangeIfBetter(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_ptr, llvm::Value * value_to_check) - { - Data::compileChangeEveryTime(builder, aggregate_data_ptr, value_to_check); - } - - static void - compileChangeIfBetterMerge(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_dst_ptr, llvm::Value * aggregate_data_src_ptr) - { - Data::compileChangeEveryTimeMerge(builder, aggregate_data_dst_ptr, aggregate_data_src_ptr); - } - -# endif -#endif -}; - -template -struct AggregateFunctionSingleValueOrNullData : Data -{ - static constexpr bool is_nullable = true; - - using Self = AggregateFunctionSingleValueOrNullData; - - bool first_value = true; - bool is_null = false; - - bool changeIfBetter(const IColumn & column, size_t row_num, Arena * arena) - { - if (first_value) - { - first_value = false; - this->change(column, row_num, arena); - return true; - } - else if (!this->isEqualTo(column, row_num)) - { - is_null = true; - } - return false; - } - - bool changeIfBetter(const Self & to, Arena * arena) - { - if (first_value) - { - first_value = false; - this->change(to, arena); - return true; - } - else if (!this->isEqualTo(to)) - { - is_null = true; - } - return false; - } - - void insertResultInto(IColumn & to) const - { - if (is_null || first_value) - { - to.insertDefault(); - } - else - { - ColumnNullable & col = typeid_cast(to); - col.getNullMapColumn().insertDefault(); - this->Data::insertResultInto(col.getNestedColumn()); - } - } - - static const char * name() { return "single_value_or_null"; } - -#if USE_EMBEDDED_COMPILER - - static constexpr bool is_compilable = false; - -#endif -}; - -/** Implement 'heavy hitters' algorithm. - * Selects most frequent value if its frequency is more than 50% in each thread of execution. - * Otherwise, selects some arbitrary value. - * http://www.cs.umd.edu/~samir/498/karp.pdf - */ -template -struct AggregateFunctionAnyHeavyData : Data -{ - UInt64 counter = 0; - - using Self = AggregateFunctionAnyHeavyData; - - bool changeIfBetter(const IColumn & column, size_t row_num, Arena * arena) - { - if (this->isEqualTo(column, row_num)) - { - ++counter; - } - else - { - if (counter == 0) - { - this->change(column, row_num, arena); - ++counter; - return true; - } - else - --counter; - } - return false; - } - - bool changeIfBetter(const Self & to, Arena * arena) - { - if (this->isEqualTo(to)) - { - counter += to.counter; - } - else - { - if ((!this->has() && to.has()) || counter < to.counter) - { - this->change(to, arena); - return true; - } - else - counter -= to.counter; - } - return false; - } - - void write(WriteBuffer & buf, const ISerialization & serialization) const - { - Data::write(buf, serialization); - writeBinary(counter, buf); - } - - void read(ReadBuffer & buf, const ISerialization & serialization, Arena * arena) - { - Data::read(buf, serialization, arena); - readBinary(counter, buf); - } - - static const char * name() { return "any_heavy"; } - -#if USE_EMBEDDED_COMPILER - - static constexpr bool is_compilable = false; - -#endif -}; - -/// In changelog mode, when we only keep around `retract_max` elements in CountedArgValueMap, there are situations -/// that we won't get the correct results. Examples retract_max = 3 -/// For max sequence (value, delta): (10, 1), (9, 1), (8, 1), (7, 1), (6, 1), (10, -1), (9, -1), (8, -1), (5, 1) -/// (7, 1) and (6, 1) will be dropped on the floor since we reach the capacity N = 3 when they appear -/// Then, 10, 9, 8 get retracted, the map is basically empty at this point of time -/// And then (5, 1) gets inserted and it is the only element in the map, so it is the max element at -/// this specific time but it is wrong since we dropped 7 and 6. -/// On the other hand, if we keep all unique values and their counts in the map (retract_max == 0), we can always get the correct result -/// so setting up a correct retract_max is a trade-off between an absolute accurate result and resource usage -template -class AggregateFunctionsCountedValue final : public IAggregateFunctionDataHelper> -{ -private: - SerializationPtr serialization; - UInt64 max_size; - -public: - explicit AggregateFunctionsCountedValue(const DataTypePtr & type, const Settings * settings) - : IAggregateFunctionDataHelper>({type}, {}) - , serialization(type->getDefaultSerialization()) - , max_size(settings->retract_max.value) - { - if (StringRef(Data::name()) == StringRef("min") || StringRef(Data::name()) == StringRef("max")) - { - if (!type->isComparable()) - throw Exception( - "Illegal type " + type->getName() + " of argument of aggregate function " + getName() - + " because the values of that data type are not comparable", - ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); - } - } - - void create(AggregateDataPtr place) const override { new (place) Data(max_size); } - - String getName() const override { return Data::name(); } - - DataTypePtr getReturnType() const override - { - auto result_type = this->argument_types.at(0); - if constexpr (Data::is_nullable) - return makeNullable(result_type); - return result_type; - } - - void add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena * arena) const override - { - this->data(place).add(*columns[0], row_num, arena); - } - - void negate(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena * arena) const final - { - this->data(place).negate(*columns[0], row_num, arena); - } - - void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena * arena) const override - { - this->data(place).merge(this->data(rhs), arena); - } - - void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional /* version */) const override - { - this->data(place).write(buf, *serialization); - } - - void deserialize(AggregateDataPtr place, ReadBuffer & buf, std::optional /* version */, Arena * arena) const override - { - this->data(place).read(buf, *serialization, arena); - } - - bool allocatesMemoryInArena() const override { return Data::allocatesMemoryInArena(); } - - void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena *) const override - { - this->data(place).insertResultInto(to); - } - -#if 0 -# if USE_EMBEDDED_COMPILER - - bool isCompilable() const override - { - if constexpr (!Data::is_compilable) - return false; - - return canBeNativeType(*this->argument_types[0]); - } - - void compileCreate(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_ptr) const override - { - llvm::IRBuilder<> & b = static_cast &>(builder); - - b.CreateMemSet( - aggregate_data_ptr, llvm::ConstantInt::get(b.getInt8Ty(), 0), this->sizeOfData(), llvm::assumeAligned(this->alignOfData())); - } - - void compileAdd( - llvm::IRBuilderBase & builder, - llvm::Value * aggregate_data_ptr, - const DataTypes &, - const std::vector & argument_values) const override - { - if constexpr (Data::is_compilable) - { - Data::compileChangeIfBetter(builder, aggregate_data_ptr, argument_values[0]); - } - else - { - throw Exception(getName() + " is not JIT-compilable", ErrorCodes::NOT_IMPLEMENTED); - } - } - - void - compileMerge(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_dst_ptr, llvm::Value * aggregate_data_src_ptr) const override - { - if constexpr (Data::is_compilable) - { - Data::compileChangeIfBetterMerge(builder, aggregate_data_dst_ptr, aggregate_data_src_ptr); - } - else - { - throw Exception(getName() + " is not JIT-compilable", ErrorCodes::NOT_IMPLEMENTED); - } - } - - llvm::Value * compileGetResult(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_ptr) const override - { - if constexpr (Data::is_compilable) - { - return Data::compileGetResult(builder, aggregate_data_ptr); - } - else - { - throw Exception(getName() + " is not JIT-compilable", ErrorCodes::NOT_IMPLEMENTED); - } - } - -# endif -#endif -}; -} -} diff --git a/src/AggregateFunctions/Streaming/AggregateFunctionMinMaxAny.h b/src/AggregateFunctions/Streaming/AggregateFunctionMinMaxAny.h new file mode 100644 index 00000000000..9427ba8da02 --- /dev/null +++ b/src/AggregateFunctions/Streaming/AggregateFunctionMinMaxAny.h @@ -0,0 +1,2003 @@ +#pragma once + +#include "CountedValueMap.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "config.h" +#include +#include + + +#if USE_EMBEDDED_COMPILER +# include +# include +#endif + +namespace DB +{ +struct Settings; + +namespace ErrorCodes +{ +extern const int ILLEGAL_TYPE_OF_ARGUMENT; +extern const int NOT_IMPLEMENTED; +extern const int TOO_LARGE_STRING_SIZE; +extern const int LOGICAL_ERROR; +} + +namespace Streaming +{ +/** Aggregate functions that store one of passed values. + * For example: min, max, any, any_last. + */ + +/// For numeric values. +template +struct CountedValuesDataFixed +{ +private: + using Self = CountedValuesDataFixed; + using ColVecType = ColumnVectorOrDecimal; + + CountedValueMap values; + +public: + using ValueType = T; + + CountedValuesDataFixed() : CountedValuesDataFixed(1) { } + explicit CountedValuesDataFixed(int64_t size) : values(size) { } + + static constexpr bool is_nullable = false; + + bool has() const { return !values.empty(); } + + void insertResultInto(IColumn & to) const + { + if (has()) + assert_cast(to).getData().push_back(values.firstValue()); + else + assert_cast(to).insertDefault(); + } + + void write(WriteBuffer & buf, const ISerialization & /*serialization*/) const + { + /// Write size first + writeVarInt(values.size(), buf); + + /// Then write value / count + for (auto [val, count] : values) + { + writeBinary(val, buf); + writeVarUInt(count, buf); + } + } + + void read(ReadBuffer & buf, const ISerialization & /*serialization*/, Arena *) + { + Int64 size = 0; + readVarInt(size, buf); + + assert(size >= 0); + + values.setCapacity(std::max(size, values.capacity())); + + for (Int64 i = 0; i < size; ++i) + { + T value; + UInt32 count; + + readBinary(value, buf); + readVarUInt(count, buf); + + [[maybe_unused]] auto inserted = values.insert(std::move(value), count); + assert(inserted); + } + } + + bool add(const IColumn & column, size_t row_num, Arena *) + { + return values.insert(assert_cast(column).getData()[row_num]); + } + + bool insert(const IColumn & column, size_t row_num, Arena * arena, UInt32 count) + { + return values.insert(assert_cast(column).getData()[row_num], count); + } + + bool negate(const IColumn & column, size_t row_num, Arena *) + { + return values.erase(assert_cast(column).getData()[row_num]); + } + + bool merge(const Self & rhs, Arena *) + { + values.merge(rhs.values); + return true; + } + + static bool allocatesMemoryInArena() { return false; } + +#if 0 +# if USE_EMBEDDED_COMPILER + + static constexpr bool is_compilable = true; + + static llvm::Value * getValuePtrFromAggregateDataPtr(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_ptr) + { + llvm::IRBuilder<> & b = static_cast &>(builder); + + static constexpr size_t value_offset_from_structure = offsetof(CountedValuesDataFixed, value); + + auto * type = toNativeType(builder); + auto * value_ptr_with_offset = b.CreateConstInBoundsGEP1_64(nullptr, aggregate_data_ptr, value_offset_from_structure); + auto * value_ptr = b.CreatePointerCast(value_ptr_with_offset, type->getPointerTo()); + + return value_ptr; + } + + static llvm::Value * getValueFromAggregateDataPtr(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_ptr) + { + llvm::IRBuilder<> & b = static_cast &>(builder); + + auto * type = toNativeType(builder); + auto * value_ptr = getValuePtrFromAggregateDataPtr(builder, aggregate_data_ptr); + + return b.CreateLoad(type, value_ptr); + } + + static void compileChange(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_ptr, llvm::Value * value_to_check) + { + llvm::IRBuilder<> & b = static_cast &>(builder); + + auto * has_value_ptr = b.CreatePointerCast(aggregate_data_ptr, b.getInt1Ty()->getPointerTo()); + b.CreateStore(b.getInt1(true), has_value_ptr); + + auto * value_ptr = getValuePtrFromAggregateDataPtr(b, aggregate_data_ptr); + b.CreateStore(value_to_check, value_ptr); + } + + static void + compileChangeMerge(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_dst_ptr, llvm::Value * aggregate_data_src_ptr) + { + auto * value_src = getValueFromAggregateDataPtr(builder, aggregate_data_src_ptr); + + compileChange(builder, aggregate_data_dst_ptr, value_src); + } + + static void compileChangeFirstTime(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_ptr, llvm::Value * value_to_check) + { + llvm::IRBuilder<> & b = static_cast &>(builder); + + auto * has_value_ptr = b.CreatePointerCast(aggregate_data_ptr, b.getInt1Ty()->getPointerTo()); + auto * has_value_value = b.CreateLoad(b.getInt1Ty(), has_value_ptr); + + auto * head = b.GetInsertBlock(); + + auto * join_block = llvm::BasicBlock::Create(head->getContext(), "join_block", head->getParent()); + auto * if_should_change = llvm::BasicBlock::Create(head->getContext(), "if_should_change", head->getParent()); + auto * if_should_not_change = llvm::BasicBlock::Create(head->getContext(), "if_should_not_change", head->getParent()); + + b.CreateCondBr(has_value_value, if_should_not_change, if_should_change); + + b.SetInsertPoint(if_should_not_change); + b.CreateBr(join_block); + + b.SetInsertPoint(if_should_change); + compileChange(builder, aggregate_data_ptr, value_to_check); + b.CreateBr(join_block); + + b.SetInsertPoint(join_block); + } + + static void + compileChangeFirstTimeMerge(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_dst_ptr, llvm::Value * aggregate_data_src_ptr) + { + llvm::IRBuilder<> & b = static_cast &>(builder); + + auto * has_value_dst_ptr = b.CreatePointerCast(aggregate_data_dst_ptr, b.getInt1Ty()->getPointerTo()); + auto * has_value_dst = b.CreateLoad(b.getInt1Ty(), has_value_dst_ptr); + + auto * has_value_src_ptr = b.CreatePointerCast(aggregate_data_src_ptr, b.getInt1Ty()->getPointerTo()); + auto * has_value_src = b.CreateLoad(b.getInt1Ty(), has_value_src_ptr); + + auto * head = b.GetInsertBlock(); + + auto * join_block = llvm::BasicBlock::Create(head->getContext(), "join_block", head->getParent()); + auto * if_should_change = llvm::BasicBlock::Create(head->getContext(), "if_should_change", head->getParent()); + auto * if_should_not_change = llvm::BasicBlock::Create(head->getContext(), "if_should_not_change", head->getParent()); + + b.CreateCondBr(b.CreateAnd(b.CreateNot(has_value_dst), has_value_src), if_should_change, if_should_not_change); + + b.SetInsertPoint(if_should_change); + compileChangeMerge(builder, aggregate_data_dst_ptr, aggregate_data_src_ptr); + b.CreateBr(join_block); + + b.SetInsertPoint(if_should_not_change); + b.CreateBr(join_block); + + b.SetInsertPoint(join_block); + } + + static void compileChangeEveryTime(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_ptr, llvm::Value * value_to_check) + { + compileChange(builder, aggregate_data_ptr, value_to_check); + } + + static void + compileChangeEveryTimeMerge(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_dst_ptr, llvm::Value * aggregate_data_src_ptr) + { + llvm::IRBuilder<> & b = static_cast &>(builder); + + auto * has_value_src_ptr = b.CreatePointerCast(aggregate_data_src_ptr, b.getInt1Ty()->getPointerTo()); + auto * has_value_src = b.CreateLoad(b.getInt1Ty(), has_value_src_ptr); + + auto * head = b.GetInsertBlock(); + + auto * join_block = llvm::BasicBlock::Create(head->getContext(), "join_block", head->getParent()); + auto * if_should_change = llvm::BasicBlock::Create(head->getContext(), "if_should_change", head->getParent()); + auto * if_should_not_change = llvm::BasicBlock::Create(head->getContext(), "if_should_not_change", head->getParent()); + + b.CreateCondBr(has_value_src, if_should_change, if_should_not_change); + + b.SetInsertPoint(if_should_change); + compileChangeMerge(builder, aggregate_data_dst_ptr, aggregate_data_src_ptr); + b.CreateBr(join_block); + + b.SetInsertPoint(if_should_not_change); + b.CreateBr(join_block); + + b.SetInsertPoint(join_block); + } + + template + static void compileChangeComparison(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_ptr, llvm::Value * value_to_check) + { + llvm::IRBuilder<> & b = static_cast &>(builder); + + auto * has_value_ptr = b.CreatePointerCast(aggregate_data_ptr, b.getInt1Ty()->getPointerTo()); + auto * has_value_value = b.CreateLoad(b.getInt1Ty(), has_value_ptr); + + auto * value = getValueFromAggregateDataPtr(b, aggregate_data_ptr); + + auto * head = b.GetInsertBlock(); + + auto * join_block = llvm::BasicBlock::Create(head->getContext(), "join_block", head->getParent()); + auto * if_should_change = llvm::BasicBlock::Create(head->getContext(), "if_should_change", head->getParent()); + auto * if_should_not_change = llvm::BasicBlock::Create(head->getContext(), "if_should_not_change", head->getParent()); + + auto is_signed = std::numeric_limits::is_signed; + + llvm::Value * should_change_after_comparison = nullptr; + + if constexpr (is_less) + { + if (value_to_check->getType()->isIntegerTy()) + should_change_after_comparison + = is_signed ? b.CreateICmpSLT(value_to_check, value) : b.CreateICmpULT(value_to_check, value); + else + should_change_after_comparison = b.CreateFCmpOLT(value_to_check, value); + } + else + { + if (value_to_check->getType()->isIntegerTy()) + should_change_after_comparison + = is_signed ? b.CreateICmpSGT(value_to_check, value) : b.CreateICmpUGT(value_to_check, value); + else + should_change_after_comparison = b.CreateFCmpOGT(value_to_check, value); + } + + b.CreateCondBr(b.CreateOr(b.CreateNot(has_value_value), should_change_after_comparison), if_should_change, if_should_not_change); + + b.SetInsertPoint(if_should_change); + compileChange(builder, aggregate_data_ptr, value_to_check); + b.CreateBr(join_block); + + b.SetInsertPoint(if_should_not_change); + b.CreateBr(join_block); + + b.SetInsertPoint(join_block); + } + + template + static void + compileChangeComparisonMerge(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_dst_ptr, llvm::Value * aggregate_data_src_ptr) + { + llvm::IRBuilder<> & b = static_cast &>(builder); + + auto * has_value_dst_ptr = b.CreatePointerCast(aggregate_data_dst_ptr, b.getInt1Ty()->getPointerTo()); + auto * has_value_dst = b.CreateLoad(b.getInt1Ty(), has_value_dst_ptr); + + auto * value_dst = getValueFromAggregateDataPtr(b, aggregate_data_dst_ptr); + + auto * has_value_src_ptr = b.CreatePointerCast(aggregate_data_src_ptr, b.getInt1Ty()->getPointerTo()); + auto * has_value_src = b.CreateLoad(b.getInt1Ty(), has_value_src_ptr); + + auto * value_src = getValueFromAggregateDataPtr(b, aggregate_data_src_ptr); + + auto * head = b.GetInsertBlock(); + + auto * join_block = llvm::BasicBlock::Create(head->getContext(), "join_block", head->getParent()); + auto * if_should_change = llvm::BasicBlock::Create(head->getContext(), "if_should_change", head->getParent()); + auto * if_should_not_change = llvm::BasicBlock::Create(head->getContext(), "if_should_not_change", head->getParent()); + + auto is_signed = std::numeric_limits::is_signed; + + llvm::Value * should_change_after_comparison = nullptr; + + if constexpr (is_less) + { + if (value_src->getType()->isIntegerTy()) + should_change_after_comparison = is_signed ? b.CreateICmpSLT(value_src, value_dst) : b.CreateICmpULT(value_src, value_dst); + else + should_change_after_comparison = b.CreateFCmpOLT(value_src, value_dst); + } + else + { + if (value_src->getType()->isIntegerTy()) + should_change_after_comparison = is_signed ? b.CreateICmpSGT(value_src, value_dst) : b.CreateICmpUGT(value_src, value_dst); + else + should_change_after_comparison = b.CreateFCmpOGT(value_src, value_dst); + } + + b.CreateCondBr( + b.CreateAnd(has_value_src, b.CreateOr(b.CreateNot(has_value_dst), should_change_after_comparison)), + if_should_change, + if_should_not_change); + + b.SetInsertPoint(if_should_change); + compileChangeMerge(builder, aggregate_data_dst_ptr, aggregate_data_src_ptr); + b.CreateBr(join_block); + + b.SetInsertPoint(if_should_not_change); + b.CreateBr(join_block); + + b.SetInsertPoint(join_block); + } + + static void compileChangeIfLess(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_ptr, llvm::Value * value_to_check) + { + static constexpr bool is_less = true; + compileChangeComparison(builder, aggregate_data_ptr, value_to_check); + } + + static void + compileChangeIfLessMerge(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_dst_ptr, llvm::Value * aggregate_data_src_ptr) + { + static constexpr bool is_less = true; + compileChangeComparisonMerge(builder, aggregate_data_dst_ptr, aggregate_data_src_ptr); + } + + static void compileChangeIfGreater(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_ptr, llvm::Value * value_to_check) + { + static constexpr bool is_less = false; + compileChangeComparison(builder, aggregate_data_ptr, value_to_check); + } + + static void + compileChangeIfGreaterMerge(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_dst_ptr, llvm::Value * aggregate_data_src_ptr) + { + static constexpr bool is_less = false; + compileChangeComparisonMerge(builder, aggregate_data_dst_ptr, aggregate_data_src_ptr); + } + + static llvm::Value * compileGetResult(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_ptr) + { + return getValueFromAggregateDataPtr(builder, aggregate_data_ptr); + } + +# endif +#endif +}; + +template +struct CountedValuesDataString +{ +private: + using Self = CountedValuesDataString; + using ColVecType = ColumnString; + + CountedValueMap values; + +public: + CountedValuesDataString() : CountedValuesDataString(1) { } + explicit CountedValuesDataString(int64_t size) : values(size) { } + + static constexpr bool is_nullable = false; + + bool has() const { return !values.empty(); } + + void insertResultInto(IColumn & to) const + { + if (has()) + { + const auto & v = values.firstValue(); + assert_cast(to).insertData(v.data(), v.size()); + } + else + assert_cast(to).insertDefault(); + } + + void write(WriteBuffer & buf, const ISerialization & /*serialization*/) const + { + /// Write size first + writeVarInt(values.size(), buf); + + /// Then write value / count + for (const auto & [val, count] : values) + { + writeBinary(val, buf); + writeVarUInt(count, buf); + } + } + + void read(ReadBuffer & buf, const ISerialization & /*serialization*/, Arena *) + { + Int64 size = 0; + readVarInt(size, buf); + + values.setCapacity(std::max(size, values.capacity())); + + for (Int64 i = 0; i < size; ++i) + { + String value; + UInt32 count; + + readBinary(value, buf); + readVarUInt(count, buf); + + [[maybe_unused]] auto inserted = values.insert(std::move(value), count); + assert(inserted); + } + } + + bool add(const IColumn & column, size_t row_num, Arena *) + { + return values.insert(assert_cast(column).getDataAt(row_num).toString()); + } + + bool insert(const IColumn & column, size_t row_num, Arena * arena, UInt32 count) + { + return values.insert(assert_cast(column).getDataAt(row_num).toString(), count); + } + + bool negate(const IColumn & column, size_t row_num, Arena *) + { + return values.erase(assert_cast(column).getDataAt(row_num).toView()); + } + + bool merge(const Self & rhs, Arena *) + { + values.merge(rhs.values); + return true; + } + + static bool allocatesMemoryInArena() { return false; } + +#if USE_EMBEDDED_COMPILER + + static constexpr bool is_compilable = false; + +#endif +}; + +template +struct CountedValuesDataGeneric +{ +private: + using Self = CountedValuesDataGeneric; + + CountedValueMap values; + +public: + CountedValuesDataGeneric() : CountedValuesDataGeneric(1) { } + explicit CountedValuesDataGeneric(int64_t size) : values(size) { } + + static constexpr bool is_nullable = false; + + bool has() const { return !values.empty(); } + + void insertResultInto(IColumn & to) const + { + if (has()) + to.insert(values.firstValue()); + else + to.insertDefault(); + } + + void write(WriteBuffer & buf, const ISerialization & serialization) const + { + /// Write size first + writeVarInt(values.size(), buf); + + /// Then write value / count + for (const auto & [val, count] : values) + { + serialization.serializeBinary(val, buf, {}); + writeVarUInt(count, buf); + } + } + + void read(ReadBuffer & buf, const ISerialization & serialization, Arena *) + { + Int64 size = 0; + readVarInt(size, buf); + + values.setCapacity(std::max(size, values.capacity())); + + for (Int64 i = 0; i < size; ++i) + { + Field value; + UInt32 count; + + serialization.deserializeBinary(value, buf, {}); + readVarUInt(count, buf); + + [[maybe_unused]] auto inserted = values.insert(std::move(value), count); + assert(inserted); + } + } + + bool add(const IColumn & column, size_t row_num, Arena *) { return values.insert(column[row_num]); } + + bool negate(const IColumn & column, size_t row_num, Arena *) { return values.erase(column[row_num]); } + + bool insert(const IColumn & column, size_t row_num, Arena * arena, UInt32 count) + { + return values.insert(column[row_num], count); + } + + bool merge(const Self & rhs, Arena *) + { + values.merge(rhs.values); + return true; + } + + static bool allocatesMemoryInArena() { return false; } + +#if USE_EMBEDDED_COMPILER + + static constexpr bool is_compilable = false; + +#endif +}; + +/** What is the difference between the aggregate functions min, max, any, any_last + * (the condition that the stored value is replaced by a new one, + * as well as, of course, the name). + */ + +template +struct AggregateFunctionMinData : Data +{ + using Self = AggregateFunctionMinData; + + using Data::Data; + + static const char * name() { return "min"; } + +#if 0 +# if USE_EMBEDDED_COMPILER + + static constexpr bool is_compilable = Data::is_compilable; + + static void compileChangeIfBetter(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_ptr, llvm::Value * value_to_check) + { + Data::compileChangeIfLess(builder, aggregate_data_ptr, value_to_check); + } + + static void + compileChangeIfBetterMerge(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_dst_ptr, llvm::Value * aggregate_data_src_ptr) + { + Data::compileChangeIfLessMerge(builder, aggregate_data_dst_ptr, aggregate_data_src_ptr); + } + +# endif +#endif +}; + +template +struct AggregateFunctionMaxData : Data +{ + using Self = AggregateFunctionMaxData; + + using Data::Data; + + static const char * name() { return "max"; } + +#if 0 +# if USE_EMBEDDED_COMPILER + + static constexpr bool is_compilable = Data::is_compilable; + + static void compileChangeIfBetter(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_ptr, llvm::Value * value_to_check) + { + Data::compileChangeIfGreater(builder, aggregate_data_ptr, value_to_check); + } + + static void + compileChangeIfBetterMerge(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_dst_ptr, llvm::Value * aggregate_data_src_ptr) + { + Data::compileChangeIfGreaterMerge(builder, aggregate_data_dst_ptr, aggregate_data_src_ptr); + } + +# endif +#endif +}; + +/// For numeric values. +template +struct SingleValueDataFixed +{ + using Self = SingleValueDataFixed; + using ColVecType = ColumnVectorOrDecimal; + bool has_value = false; /// We need to remember if at least one value has been passed. This is necessary for AggregateFunctionIf. + T value = T{}; + + static constexpr bool result_is_nullable = false; + static constexpr bool should_skip_null_arguments = true; + static constexpr bool is_any = false; + + bool has() const + { + return has_value; + } + + void insertResultInto(IColumn & to) const + { + if (has()) + assert_cast(to).getData().push_back(value); + else + assert_cast(to).insertDefault(); + } + + void write(WriteBuffer & buf, const ISerialization & /*serialization*/) const + { + writeBinary(has(), buf); + if (has()) + writeBinary(value, buf); + } + + void read(ReadBuffer & buf, const ISerialization & /*serialization*/, Arena *) + { + readBinary(has_value, buf); + if (has()) + readBinary(value, buf); + } + + + void change(const IColumn & column, size_t row_num, Arena *) + { + has_value = true; + value = assert_cast(column).getData()[row_num]; + } + + /// Assuming to.has() + void change(const Self & to, Arena *) + { + has_value = true; + value = to.value; + } + + bool changeFirstTime(const IColumn & column, size_t row_num, Arena * arena) + { + if (!has()) + { + change(column, row_num, arena); + return true; + } + else + return false; + } + + bool changeFirstTime(const Self & to, Arena * arena) + { + if (!has() && to.has()) + { + change(to, arena); + return true; + } + else + return false; + } + + bool changeEveryTime(const IColumn & column, size_t row_num, Arena * arena) + { + change(column, row_num, arena); + return true; + } + + bool changeEveryTime(const Self & to, Arena * arena) + { + if (to.has()) + { + change(to, arena); + return true; + } + else + return false; + } + + bool changeIfLess(const IColumn & column, size_t row_num, Arena * arena) + { + if (!has() || assert_cast(column).getData()[row_num] < value) + { + change(column, row_num, arena); + return true; + } + else + return false; + } + + bool changeIfLess(const Self & to, Arena * arena) + { + if (to.has() && (!has() || to.value < value)) + { + change(to, arena); + return true; + } + else + return false; + } + + void changeIfLess(T from) + { + if (!has() || from < value) + { + has_value = true; + value = from; + } + } + + bool changeIfGreater(const IColumn & column, size_t row_num, Arena * arena) + { + if (!has() || assert_cast(column).getData()[row_num] > value) + { + change(column, row_num, arena); + return true; + } + else + return false; + } + + bool changeIfGreater(const Self & to, Arena * arena) + { + if (to.has() && (!has() || to.value > value)) + { + change(to, arena); + return true; + } + else + return false; + } + + void changeIfGreater(T & from) + { + if (!has() || from > value) + { + has_value = true; + value = from; + } + } + + bool isEqualTo(const Self & to) const + { + return has() && to.value == value; + } + + bool isEqualTo(const IColumn & column, size_t row_num) const + { + return has() && assert_cast(column).getData()[row_num] == value; + } + + static bool allocatesMemoryInArena() + { + return false; + } + +#if USE_EMBEDDED_COMPILER + + static constexpr bool is_compilable = true; + + static llvm::Value * getValuePtrFromAggregateDataPtr(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_ptr) + { + llvm::IRBuilder<> & b = static_cast &>(builder); + + static constexpr size_t value_offset_from_structure = offsetof(SingleValueDataFixed, value); + auto * value_ptr = b.CreateConstInBoundsGEP1_64(b.getInt8Ty(), aggregate_data_ptr, value_offset_from_structure); + + return value_ptr; + } + + static llvm::Value * getValueFromAggregateDataPtr(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_ptr) + { + llvm::IRBuilder<> & b = static_cast &>(builder); + + auto * type = toNativeType(builder); + auto * value_ptr = getValuePtrFromAggregateDataPtr(builder, aggregate_data_ptr); + + return b.CreateLoad(type, value_ptr); + } + + static void compileChange(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_ptr, llvm::Value * value_to_check) + { + llvm::IRBuilder<> & b = static_cast &>(builder); + + auto * has_value_ptr = aggregate_data_ptr; + b.CreateStore(b.getInt1(true), has_value_ptr); + + auto * value_ptr = getValuePtrFromAggregateDataPtr(b, aggregate_data_ptr); + b.CreateStore(value_to_check, value_ptr); + } + + static void compileChangeMerge(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_dst_ptr, llvm::Value * aggregate_data_src_ptr) + { + auto * value_src = getValueFromAggregateDataPtr(builder, aggregate_data_src_ptr); + + compileChange(builder, aggregate_data_dst_ptr, value_src); + } + + static void compileChangeFirstTime(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_ptr, llvm::Value * value_to_check) + { + llvm::IRBuilder<> & b = static_cast &>(builder); + + auto * has_value_ptr = aggregate_data_ptr; + auto * has_value_value = b.CreateLoad(b.getInt1Ty(), has_value_ptr); + + auto * head = b.GetInsertBlock(); + + auto * join_block = llvm::BasicBlock::Create(head->getContext(), "join_block", head->getParent()); + auto * if_should_change = llvm::BasicBlock::Create(head->getContext(), "if_should_change", head->getParent()); + auto * if_should_not_change = llvm::BasicBlock::Create(head->getContext(), "if_should_not_change", head->getParent()); + + b.CreateCondBr(has_value_value, if_should_not_change, if_should_change); + + b.SetInsertPoint(if_should_not_change); + b.CreateBr(join_block); + + b.SetInsertPoint(if_should_change); + compileChange(builder, aggregate_data_ptr, value_to_check); + b.CreateBr(join_block); + + b.SetInsertPoint(join_block); + } + + static void compileChangeFirstTimeMerge(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_dst_ptr, llvm::Value * aggregate_data_src_ptr) + { + llvm::IRBuilder<> & b = static_cast &>(builder); + + auto * has_value_dst_ptr = aggregate_data_dst_ptr; + auto * has_value_dst = b.CreateLoad(b.getInt1Ty(), has_value_dst_ptr); + + auto * has_value_src_ptr = aggregate_data_src_ptr; + auto * has_value_src = b.CreateLoad(b.getInt1Ty(), has_value_src_ptr); + + auto * head = b.GetInsertBlock(); + + auto * join_block = llvm::BasicBlock::Create(head->getContext(), "join_block", head->getParent()); + auto * if_should_change = llvm::BasicBlock::Create(head->getContext(), "if_should_change", head->getParent()); + auto * if_should_not_change = llvm::BasicBlock::Create(head->getContext(), "if_should_not_change", head->getParent()); + + b.CreateCondBr(b.CreateAnd(b.CreateNot(has_value_dst), has_value_src), if_should_change, if_should_not_change); + + b.SetInsertPoint(if_should_change); + compileChangeMerge(builder, aggregate_data_dst_ptr, aggregate_data_src_ptr); + b.CreateBr(join_block); + + b.SetInsertPoint(if_should_not_change); + b.CreateBr(join_block); + + b.SetInsertPoint(join_block); + } + + static void compileChangeEveryTime(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_ptr, llvm::Value * value_to_check) + { + compileChange(builder, aggregate_data_ptr, value_to_check); + } + + static void compileChangeEveryTimeMerge(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_dst_ptr, llvm::Value * aggregate_data_src_ptr) + { + llvm::IRBuilder<> & b = static_cast &>(builder); + + auto * has_value_src_ptr = aggregate_data_src_ptr; + auto * has_value_src = b.CreateLoad(b.getInt1Ty(), has_value_src_ptr); + + auto * head = b.GetInsertBlock(); + + auto * join_block = llvm::BasicBlock::Create(head->getContext(), "join_block", head->getParent()); + auto * if_should_change = llvm::BasicBlock::Create(head->getContext(), "if_should_change", head->getParent()); + auto * if_should_not_change = llvm::BasicBlock::Create(head->getContext(), "if_should_not_change", head->getParent()); + + b.CreateCondBr(has_value_src, if_should_change, if_should_not_change); + + b.SetInsertPoint(if_should_change); + compileChangeMerge(builder, aggregate_data_dst_ptr, aggregate_data_src_ptr); + b.CreateBr(join_block); + + b.SetInsertPoint(if_should_not_change); + b.CreateBr(join_block); + + b.SetInsertPoint(join_block); + } + + template + static void compileChangeComparison(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_ptr, llvm::Value * value_to_check) + { + llvm::IRBuilder<> & b = static_cast &>(builder); + + auto * has_value_ptr = aggregate_data_ptr; + auto * has_value_value = b.CreateLoad(b.getInt1Ty(), has_value_ptr); + + auto * value = getValueFromAggregateDataPtr(b, aggregate_data_ptr); + + auto * head = b.GetInsertBlock(); + + auto * join_block = llvm::BasicBlock::Create(head->getContext(), "join_block", head->getParent()); + auto * if_should_change = llvm::BasicBlock::Create(head->getContext(), "if_should_change", head->getParent()); + auto * if_should_not_change = llvm::BasicBlock::Create(head->getContext(), "if_should_not_change", head->getParent()); + + auto is_signed = std::numeric_limits::is_signed; + + llvm::Value * should_change_after_comparison = nullptr; + + if constexpr (is_less) + { + if (value_to_check->getType()->isIntegerTy()) + should_change_after_comparison = is_signed ? b.CreateICmpSLT(value_to_check, value) : b.CreateICmpULT(value_to_check, value); + else + should_change_after_comparison = b.CreateFCmpOLT(value_to_check, value); + } + else + { + if (value_to_check->getType()->isIntegerTy()) + should_change_after_comparison = is_signed ? b.CreateICmpSGT(value_to_check, value) : b.CreateICmpUGT(value_to_check, value); + else + should_change_after_comparison = b.CreateFCmpOGT(value_to_check, value); + } + + b.CreateCondBr(b.CreateOr(b.CreateNot(has_value_value), should_change_after_comparison), if_should_change, if_should_not_change); + + b.SetInsertPoint(if_should_change); + compileChange(builder, aggregate_data_ptr, value_to_check); + b.CreateBr(join_block); + + b.SetInsertPoint(if_should_not_change); + b.CreateBr(join_block); + + b.SetInsertPoint(join_block); + } + + template + static void compileChangeComparisonMerge(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_dst_ptr, llvm::Value * aggregate_data_src_ptr) + { + llvm::IRBuilder<> & b = static_cast &>(builder); + + auto * has_value_dst_ptr = aggregate_data_dst_ptr; + auto * has_value_dst = b.CreateLoad(b.getInt1Ty(), has_value_dst_ptr); + + auto * value_dst = getValueFromAggregateDataPtr(b, aggregate_data_dst_ptr); + + auto * has_value_src_ptr = aggregate_data_src_ptr; + auto * has_value_src = b.CreateLoad(b.getInt1Ty(), has_value_src_ptr); + + auto * value_src = getValueFromAggregateDataPtr(b, aggregate_data_src_ptr); + + auto * head = b.GetInsertBlock(); + + auto * join_block = llvm::BasicBlock::Create(head->getContext(), "join_block", head->getParent()); + auto * if_should_change = llvm::BasicBlock::Create(head->getContext(), "if_should_change", head->getParent()); + auto * if_should_not_change = llvm::BasicBlock::Create(head->getContext(), "if_should_not_change", head->getParent()); + + auto is_signed = std::numeric_limits::is_signed; + + llvm::Value * should_change_after_comparison = nullptr; + + if constexpr (is_less) + { + if (value_src->getType()->isIntegerTy()) + should_change_after_comparison = is_signed ? b.CreateICmpSLT(value_src, value_dst) : b.CreateICmpULT(value_src, value_dst); + else + should_change_after_comparison = b.CreateFCmpOLT(value_src, value_dst); + } + else + { + if (value_src->getType()->isIntegerTy()) + should_change_after_comparison = is_signed ? b.CreateICmpSGT(value_src, value_dst) : b.CreateICmpUGT(value_src, value_dst); + else + should_change_after_comparison = b.CreateFCmpOGT(value_src, value_dst); + } + + b.CreateCondBr(b.CreateAnd(has_value_src, b.CreateOr(b.CreateNot(has_value_dst), should_change_after_comparison)), if_should_change, if_should_not_change); + + b.SetInsertPoint(if_should_change); + compileChangeMerge(builder, aggregate_data_dst_ptr, aggregate_data_src_ptr); + b.CreateBr(join_block); + + b.SetInsertPoint(if_should_not_change); + b.CreateBr(join_block); + + b.SetInsertPoint(join_block); + } + + static void compileChangeIfLess(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_ptr, llvm::Value * value_to_check) + { + static constexpr bool is_less = true; + compileChangeComparison(builder, aggregate_data_ptr, value_to_check); + } + + static void compileChangeIfLessMerge(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_dst_ptr, llvm::Value * aggregate_data_src_ptr) + { + static constexpr bool is_less = true; + compileChangeComparisonMerge(builder, aggregate_data_dst_ptr, aggregate_data_src_ptr); + } + + static void compileChangeIfGreater(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_ptr, llvm::Value * value_to_check) + { + static constexpr bool is_less = false; + compileChangeComparison(builder, aggregate_data_ptr, value_to_check); + } + + static void compileChangeIfGreaterMerge(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_dst_ptr, llvm::Value * aggregate_data_src_ptr) + { + static constexpr bool is_less = false; + compileChangeComparisonMerge(builder, aggregate_data_dst_ptr, aggregate_data_src_ptr); + } + + static llvm::Value * compileGetResult(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_ptr) + { + return getValueFromAggregateDataPtr(builder, aggregate_data_ptr); + } + +#endif +}; + +struct Compatibility +{ + /// Old versions used to store terminating null-character in SingleValueDataString. + /// Then -WithTerminatingZero methods were removed from IColumn interface, + /// because these methods are quite dangerous and easy to misuse. It introduced incompatibility. + /// See https://github.com/ClickHouse/ClickHouse/pull/41431 and https://github.com/ClickHouse/ClickHouse/issues/42916 + /// Here we keep these functions for compatibility. + /// It's safe because there's no way unsanitized user input (without \0 at the end) can reach these functions. + + static StringRef getDataAtWithTerminatingZero(const ColumnString & column, size_t n) + { + auto res = column.getDataAt(n); + /// ColumnString always reserves extra byte for null-character after string. + /// But getDataAt returns StringRef without the null-character. Let's add it. + chassert(res.data[res.size] == '\0'); + ++res.size; + return res; + } + + static void insertDataWithTerminatingZero(ColumnString & column, const char * pos, size_t length) + { + /// String already has terminating null-character. + /// But insertData will add another one unconditionally. Trim existing null-character to avoid duplication. + chassert(0 < length); + chassert(pos[length - 1] == '\0'); + column.insertData(pos, length - 1); + } +}; + +/** For strings. Short strings are stored in the object itself, and long strings are allocated separately. + * NOTE It could also be suitable for arrays of numbers. + */ +struct SingleValueDataString +{ +private: + using Self = SingleValueDataString; + + /// 0 size indicates that there is no value. Empty string must has terminating '\0' and, therefore, size of empty string is 1 + UInt32 size = 0; + UInt32 capacity = 0; /// power of two or zero + char * large_data; + +public: + static constexpr UInt32 AUTOMATIC_STORAGE_SIZE = 64; + static constexpr UInt32 MAX_SMALL_STRING_SIZE = AUTOMATIC_STORAGE_SIZE - sizeof(size) - sizeof(capacity) - sizeof(large_data); + static constexpr UInt32 MAX_STRING_SIZE = std::numeric_limits::max(); + +private: + char small_data[MAX_SMALL_STRING_SIZE]; /// Including the terminating zero. + +public: + static constexpr bool result_is_nullable = false; + static constexpr bool should_skip_null_arguments = true; + static constexpr bool is_any = false; + + bool has() const + { + return size; + } + +private: + char * getDataMutable() + { + return size <= MAX_SMALL_STRING_SIZE ? small_data : large_data; + } + + const char * getData() const + { + const char * data_ptr = size <= MAX_SMALL_STRING_SIZE ? small_data : large_data; + /// It must always be terminated with null-character + chassert(0 < size); + chassert(data_ptr[size - 1] == '\0'); + return data_ptr; + } + + StringRef getStringRef() const + { + return StringRef(getData(), size); + } + +public: + void insertResultInto(IColumn & to) const + { + if (has()) + Compatibility::insertDataWithTerminatingZero(assert_cast(to), getData(), size); + else + assert_cast(to).insertDefault(); + } + + void write(WriteBuffer & buf, const ISerialization & /*serialization*/) const + { + if (unlikely(MAX_STRING_SIZE < size)) + throw Exception(ErrorCodes::LOGICAL_ERROR, "String size is too big ({}), it's a bug", size); + + /// For serialization we use signed Int32 (for historical reasons), -1 means "no value" + Int32 size_to_write = size ? size : -1; + writeBinary(size_to_write, buf); + if (has()) + buf.write(getData(), size); + } + + void allocateLargeDataIfNeeded(UInt32 size_to_reserve, Arena * arena) + { + if (capacity < size_to_reserve) + { + if (unlikely(MAX_STRING_SIZE < size_to_reserve)) + throw Exception(ErrorCodes::TOO_LARGE_STRING_SIZE, "String size is too big ({}), maximum: {}", + size_to_reserve, MAX_STRING_SIZE); + + size_t rounded_capacity = roundUpToPowerOfTwoOrZero(size_to_reserve); + chassert(rounded_capacity <= MAX_STRING_SIZE + 1); /// rounded_capacity <= 2^31 + capacity = static_cast(rounded_capacity); + + /// Don't free large_data here. + large_data = arena->alloc(capacity); + } + } + + void read(ReadBuffer & buf, const ISerialization & /*serialization*/, Arena * arena) + { + /// For serialization we use signed Int32 (for historical reasons), -1 means "no value" + Int32 rhs_size_signed; + readBinary(rhs_size_signed, buf); + + if (rhs_size_signed < 0) + { + /// Don't free large_data here. + size = 0; + return; + } + + UInt32 rhs_size = rhs_size_signed; + if (rhs_size <= MAX_SMALL_STRING_SIZE) + { + /// Don't free large_data here. + size = rhs_size; + buf.readStrict(small_data, size); + } + else + { + /// Reserve one byte more for null-character + allocateLargeDataIfNeeded(rhs_size + 1, arena); + size = rhs_size; + buf.readStrict(large_data, size); + } + + /// Check if the string we read is null-terminated (getDataMutable does not have the assertion) + if (0 < size && getDataMutable()[size - 1] == '\0') + return; + + /// It's not null-terminated, but it must be (for historical reasons). There are two variants: + /// - The value was serialized by one of the incompatible versions of ClickHouse. We had some range of versions + /// that used to serialize SingleValueDataString without terminating '\0'. Let's just append it. + /// - An attacker sent crafted data. Sanitize it and append '\0'. + /// In all other cases the string must be already null-terminated. + + /// NOTE We cannot add '\0' unconditionally, because it will be duplicated. + /// NOTE It's possible that a string that actually ends with '\0' was written by one of the incompatible versions. + /// Unfortunately, we cannot distinguish it from normal string written by normal version. + /// So such strings will be trimmed. + + if (size == MAX_SMALL_STRING_SIZE) + { + /// Special case: We have to move value to large_data + allocateLargeDataIfNeeded(size + 1, arena); + memcpy(large_data, small_data, size); + } + + /// We have enough space to append + ++size; + getDataMutable()[size - 1] = '\0'; + } + + /// Assuming to.has() + void changeImpl(StringRef value, Arena * arena) + { + if (unlikely(MAX_STRING_SIZE < value.size)) + throw Exception(ErrorCodes::TOO_LARGE_STRING_SIZE, "String size is too big ({}), maximum: {}", + value.size, MAX_STRING_SIZE); + + UInt32 value_size = static_cast(value.size); + + if (value_size <= MAX_SMALL_STRING_SIZE) + { + /// Don't free large_data here. + size = value_size; + + if (size > 0) + memcpy(small_data, value.data, size); + } + else + { + allocateLargeDataIfNeeded(value_size, arena); + size = value_size; + memcpy(large_data, value.data, size); + } + } + + void change(const IColumn & column, size_t row_num, Arena * arena) + { + changeImpl(Compatibility::getDataAtWithTerminatingZero(assert_cast(column), row_num), arena); + } + + void change(const Self & to, Arena * arena) + { + changeImpl(to.getStringRef(), arena); + } + + bool changeFirstTime(const IColumn & column, size_t row_num, Arena * arena) + { + if (!has()) + { + change(column, row_num, arena); + return true; + } + else + return false; + } + + bool changeFirstTime(const Self & to, Arena * arena) + { + if (!has() && to.has()) + { + change(to, arena); + return true; + } + else + return false; + } + + bool changeEveryTime(const IColumn & column, size_t row_num, Arena * arena) + { + change(column, row_num, arena); + return true; + } + + bool changeEveryTime(const Self & to, Arena * arena) + { + if (to.has()) + { + change(to, arena); + return true; + } + else + return false; + } + + bool changeIfLess(const IColumn & column, size_t row_num, Arena * arena) + { + if (!has() || Compatibility::getDataAtWithTerminatingZero(assert_cast(column), row_num) < getStringRef()) + { + change(column, row_num, arena); + return true; + } + else + return false; + } + + bool changeIfLess(const Self & to, Arena * arena) + { + if (to.has() && (!has() || to.getStringRef() < getStringRef())) + { + change(to, arena); + return true; + } + else + return false; + } + + bool changeIfGreater(const IColumn & column, size_t row_num, Arena * arena) + { + if (!has() || Compatibility::getDataAtWithTerminatingZero(assert_cast(column), row_num) > getStringRef()) + { + change(column, row_num, arena); + return true; + } + else + return false; + } + + bool changeIfGreater(const Self & to, Arena * arena) + { + if (to.has() && (!has() || to.getStringRef() > getStringRef())) + { + change(to, arena); + return true; + } + else + return false; + } + + bool isEqualTo(const Self & to) const + { + return has() && to.getStringRef() == getStringRef(); + } + + bool isEqualTo(const IColumn & column, size_t row_num) const + { + return has() && Compatibility::getDataAtWithTerminatingZero(assert_cast(column), row_num) == getStringRef(); + } + + static bool allocatesMemoryInArena() + { + return true; + } + +#if USE_EMBEDDED_COMPILER + + static constexpr bool is_compilable = false; + +#endif + +}; + +static_assert( + sizeof(SingleValueDataString) == SingleValueDataString::AUTOMATIC_STORAGE_SIZE, + "Incorrect size of SingleValueDataString struct"); + + +/// For any other value types. +struct SingleValueDataGeneric +{ +private: + using Self = SingleValueDataGeneric; + Field value; + +public: + static constexpr bool result_is_nullable = false; + static constexpr bool should_skip_null_arguments = true; + static constexpr bool is_any = false; + + bool has() const { return !value.isNull(); } + + void insertResultInto(IColumn & to) const + { + if (has()) + to.insert(value); + else + to.insertDefault(); + } + + void write(WriteBuffer & buf, const ISerialization & serialization) const + { + if (!value.isNull()) + { + writeBinary(true, buf); + serialization.serializeBinary(value, buf, {}); + } + else + writeBinary(false, buf); + } + + void read(ReadBuffer & buf, const ISerialization & serialization, Arena *) + { + bool is_not_null; + readBinary(is_not_null, buf); + + if (is_not_null) + serialization.deserializeBinary(value, buf, {}); + } + + void change(const IColumn & column, size_t row_num, Arena *) { column.get(row_num, value); } + + void change(const Self & to, Arena *) { value = to.value; } + + bool changeFirstTime(const IColumn & column, size_t row_num, Arena * arena) + { + if (!has()) + { + change(column, row_num, arena); + return true; + } + else + return false; + } + + bool changeFirstTime(const Self & to, Arena * arena) + { + if (!has() && to.has()) + { + change(to, arena); + return true; + } + else + return false; + } + + bool changeEveryTime(const IColumn & column, size_t row_num, Arena * arena) + { + change(column, row_num, arena); + return true; + } + + bool changeEveryTime(const Self & to, Arena * arena) + { + if (to.has()) + { + change(to, arena); + return true; + } + else + return false; + } + + bool changeIfLess(const IColumn & column, size_t row_num, Arena * arena) + { + if (!has()) + { + change(column, row_num, arena); + return true; + } + else + { + Field new_value; + column.get(row_num, new_value); + if (new_value < value) + { + value = new_value; + return true; + } + else + return false; + } + } + + bool changeIfLess(const Self & to, Arena * arena) + { + if (!to.has()) + return false; + if (!has() || to.value < value) + { + change(to, arena); + return true; + } + else + return false; + } + + bool changeIfGreater(const IColumn & column, size_t row_num, Arena * arena) + { + if (!has()) + { + change(column, row_num, arena); + return true; + } + else + { + Field new_value; + column.get(row_num, new_value); + if (new_value > value) + { + value = new_value; + return true; + } + else + return false; + } + } + + bool changeIfGreater(const Self & to, Arena * arena) + { + if (!to.has()) + return false; + if (!has() || to.value > value) + { + change(to, arena); + return true; + } + else + return false; + } + + bool isEqualTo(const IColumn & column, size_t row_num) const { return has() && value == column[row_num]; } + + bool isEqualTo(const Self & to) const { return has() && to.value == value; } + + static bool allocatesMemoryInArena() + { + return false; + } + +#if USE_EMBEDDED_COMPILER + + static constexpr bool is_compilable = false; + +#endif + +}; + +template +struct AggregateFunctionSingleValueOrNullData : Data +{ + static constexpr bool result_is_nullable = true; + + using Self = AggregateFunctionSingleValueOrNullData; + + bool first_value = true; + bool is_null = false; + + bool changeIfBetter(const IColumn & column, size_t row_num, Arena * arena) + { + if (first_value) + { + first_value = false; + this->change(column, row_num, arena); + return true; + } + else if (!this->isEqualTo(column, row_num)) + { + is_null = true; + } + return false; + } + + bool changeIfBetter(const Self & to, Arena * arena) + { + if (first_value) + { + first_value = false; + this->change(to, arena); + return true; + } + else if (!this->isEqualTo(to)) + { + is_null = true; + } + return false; + } + + void insertResultInto(IColumn & to) const + { + if (is_null || first_value) + { + to.insertDefault(); + } + else + { + ColumnNullable & col = typeid_cast(to); + col.getNullMapColumn().insertDefault(); + this->Data::insertResultInto(col.getNestedColumn()); + } + } + + static const char * name() { return "single_value_or_null"; } + +#if USE_EMBEDDED_COMPILER + + static constexpr bool is_compilable = false; + +#endif +}; + + +template +struct AggregateFunctionAnyData : Data +{ + using Self = AggregateFunctionAnyData; + static constexpr bool is_any = true; + + bool changeIfBetter(const IColumn & column, size_t row_num, Arena * arena) { return this->changeFirstTime(column, row_num, arena); } + bool changeIfBetter(const Self & to, Arena * arena) { return this->changeFirstTime(to, arena); } + void addManyDefaults(const IColumn & column, size_t /*length*/, Arena * arena) { this->changeFirstTime(column, 0, arena); } + + static const char * name() { return "any"; } + +#if USE_EMBEDDED_COMPILER + + static constexpr bool is_compilable = Data::is_compilable; + + static void compileChangeIfBetter(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_ptr, llvm::Value * value_to_check) + { + Data::compileChangeFirstTime(builder, aggregate_data_ptr, value_to_check); + } + + static void compileChangeIfBetterMerge(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_dst_ptr, llvm::Value * aggregate_data_src_ptr) + { + Data::compileChangeFirstTimeMerge(builder, aggregate_data_dst_ptr, aggregate_data_src_ptr); + } + +#endif +}; + +template +struct AggregateFunctionAnyLastData : Data +{ + using Self = AggregateFunctionAnyLastData; + + bool changeIfBetter(const IColumn & column, size_t row_num, Arena * arena) { return this->changeEveryTime(column, row_num, arena); } + bool changeIfBetter(const Self & to, Arena * arena) { return this->changeEveryTime(to, arena); } + void addManyDefaults(const IColumn & column, size_t /*length*/, Arena * arena) { this->changeEveryTime(column, 0, arena); } + + static const char * name() { return "anyLast"; } + +#if USE_EMBEDDED_COMPILER + + static constexpr bool is_compilable = Data::is_compilable; + + static void compileChangeIfBetter(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_ptr, llvm::Value * value_to_check) + { + Data::compileChangeEveryTime(builder, aggregate_data_ptr, value_to_check); + } + + static void compileChangeIfBetterMerge(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_dst_ptr, llvm::Value * aggregate_data_src_ptr) + { + Data::compileChangeEveryTimeMerge(builder, aggregate_data_dst_ptr, aggregate_data_src_ptr); + } + +#endif +}; +/** Implement 'heavy hitters' algorithm. + * Selects most frequent value if its frequency is more than 50% in each thread of execution. + * Otherwise, selects some arbitrary value. + * http://www.cs.umd.edu/~samir/498/karp.pdf + */ +template +struct AggregateFunctionAnyHeavyData : Data +{ + UInt64 counter = 0; + + using Self = AggregateFunctionAnyHeavyData; + + bool changeIfBetter(const IColumn & column, size_t row_num, Arena * arena) + { + if (this->isEqualTo(column, row_num)) + { + ++counter; + } + else + { + if (counter == 0) + { + this->change(column, row_num, arena); + ++counter; + return true; + } + else + --counter; + } + return false; + } + + bool changeIfBetter(const Self & to, Arena * arena) + { + if (this->isEqualTo(to)) + { + counter += to.counter; + } + else + { + if ((!this->has() && to.has()) || counter < to.counter) + { + this->change(to, arena); + return true; + } + else + counter -= to.counter; + } + return false; + } + + void write(WriteBuffer & buf, const ISerialization & serialization) const + { + Data::write(buf, serialization); + writeBinary(counter, buf); + } + + void read(ReadBuffer & buf, const ISerialization & serialization, Arena * arena) + { + Data::read(buf, serialization, arena); + readBinary(counter, buf); + } + + static const char * name() { return "any_heavy"; } + +#if USE_EMBEDDED_COMPILER + + static constexpr bool is_compilable = false; + +#endif +}; + +/// In changelog mode, when we only keep around `retract_max` elements in CountedArgValueMap, there are situations +/// that we won't get the correct results. Examples retract_max = 3 +/// For max sequence (value, delta): (10, 1), (9, 1), (8, 1), (7, 1), (6, 1), (10, -1), (9, -1), (8, -1), (5, 1) +/// (7, 1) and (6, 1) will be dropped on the floor since we reach the capacity N = 3 when they appear +/// Then, 10, 9, 8 get retracted, the map is basically empty at this point of time +/// And then (5, 1) gets inserted and it is the only element in the map, so it is the max element at +/// this specific time but it is wrong since we dropped 7 and 6. +/// On the other hand, if we keep all unique values and their counts in the map (retract_max == 0), we can always get the correct result +/// so setting up a correct retract_max is a trade-off between an absolute accurate result and resource usage +template +class AggregateFunctionsCountedValue final : public IAggregateFunctionDataHelper> +{ +private: + SerializationPtr serialization; + UInt64 max_size; + +public: + explicit AggregateFunctionsCountedValue(const DataTypePtr & type, const Settings * settings) + : IAggregateFunctionDataHelper>({type}, {}) + , serialization(type->getDefaultSerialization()) + , max_size(settings->retract_max.value) + { + if (StringRef(Data::name()) == StringRef("min") || StringRef(Data::name()) == StringRef("max")) + { + if (!type->isComparable()) + throw Exception( + "Illegal type " + type->getName() + " of argument of aggregate function " + getName() + + " because the values of that data type are not comparable", + ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); + } + } + + void create(AggregateDataPtr place) const override { new (place) Data(max_size); } + + String getName() const override { return Data::name(); } + + DataTypePtr getReturnType() const override + { + auto result_type = this->argument_types.at(0); + if constexpr (Data::is_nullable) + return makeNullable(result_type); + return result_type; + } + + void add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena * arena) const override + { + this->data(place).add(*columns[0], row_num, arena); + } + + void negate(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena * arena) const final + { + this->data(place).negate(*columns[0], row_num, arena); + } + + void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena * arena) const override + { + this->data(place).merge(this->data(rhs), arena); + } + + void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional /* version */) const override + { + this->data(place).write(buf, *serialization); + } + + void deserialize(AggregateDataPtr place, ReadBuffer & buf, std::optional /* version */, Arena * arena) const override + { + this->data(place).read(buf, *serialization, arena); + } + + bool allocatesMemoryInArena() const override { return Data::allocatesMemoryInArena(); } + + void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena *) const override + { + this->data(place).insertResultInto(to); + } + +#if 0 +# if USE_EMBEDDED_COMPILER + + bool isCompilable() const override + { + if constexpr (!Data::is_compilable) + return false; + + return canBeNativeType(*this->argument_types[0]); + } + + void compileCreate(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_ptr) const override + { + llvm::IRBuilder<> & b = static_cast &>(builder); + + b.CreateMemSet( + aggregate_data_ptr, llvm::ConstantInt::get(b.getInt8Ty(), 0), this->sizeOfData(), llvm::assumeAligned(this->alignOfData())); + } + + void compileAdd( + llvm::IRBuilderBase & builder, + llvm::Value * aggregate_data_ptr, + const DataTypes &, + const std::vector & argument_values) const override + { + if constexpr (Data::is_compilable) + { + Data::compileChangeIfBetter(builder, aggregate_data_ptr, argument_values[0]); + } + else + { + throw Exception(getName() + " is not JIT-compilable", ErrorCodes::NOT_IMPLEMENTED); + } + } + + void + compileMerge(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_dst_ptr, llvm::Value * aggregate_data_src_ptr) const override + { + if constexpr (Data::is_compilable) + { + Data::compileChangeIfBetterMerge(builder, aggregate_data_dst_ptr, aggregate_data_src_ptr); + } + else + { + throw Exception(getName() + " is not JIT-compilable", ErrorCodes::NOT_IMPLEMENTED); + } + } + + llvm::Value * compileGetResult(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_ptr) const override + { + if constexpr (Data::is_compilable) + { + return Data::compileGetResult(builder, aggregate_data_ptr); + } + else + { + throw Exception(getName() + " is not JIT-compilable", ErrorCodes::NOT_IMPLEMENTED); + } + } + +# endif +#endif +}; + +template +class AggregateFunctionsSingleValue final : public IAggregateFunctionDataHelper> +{ + static constexpr bool is_any = Data::is_any; + +private: + SerializationPtr serialization; + +public: + explicit AggregateFunctionsSingleValue(const DataTypePtr & type) + : IAggregateFunctionDataHelper>({type}, {}) + , serialization(type->getDefaultSerialization()) + { + } + + String getName() const override { return Data::name(); } + + DataTypePtr getReturnType() const override + { + auto result_type = this->argument_types.at(0); + if constexpr (Data::result_is_nullable) + return makeNullable(result_type); + return result_type; + } + + void add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena * arena) const override + { + this->data(place).changeIfBetter(*columns[0], row_num, arena); + } + + void negate(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena * arena) const final + { + /// do nothing if _tp_delta is -1 + } + void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena * arena) const override + { + this->data(place).changeIfBetter(this->data(rhs), arena); + } + + void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional /* version */) const override + { + this->data(place).write(buf, *serialization); + } + + void deserialize(AggregateDataPtr place, ReadBuffer & buf, std::optional /* version */, Arena * arena) const override + { + this->data(place).read(buf, *serialization, arena); + } + + bool allocatesMemoryInArena() const override + { + return Data::allocatesMemoryInArena(); + } + + void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena *) const override + { + this->data(place).insertResultInto(to); + } + +#if USE_EMBEDDED_COMPILER + + bool isCompilable() const override + { + if constexpr (!Data::is_compilable) + return false; + + return canBeNativeType(*this->argument_types[0]); + } + + + void compileCreate(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_ptr) const override + { + llvm::IRBuilder<> & b = static_cast &>(builder); + + b.CreateMemSet(aggregate_data_ptr, llvm::ConstantInt::get(b.getInt8Ty(), 0), this->sizeOfData(), llvm::assumeAligned(this->alignOfData())); + } + + void compileAdd(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_ptr, const DataTypes &, const std::vector & argument_values) const override + { + if constexpr (Data::is_compilable) + { + Data::compileChangeIfBetter(builder, aggregate_data_ptr, argument_values[0]); + } + else + { + throw Exception(getName() + " is not JIT-compilable", ErrorCodes::NOT_IMPLEMENTED); + } + } + + void compileMerge(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_dst_ptr, llvm::Value * aggregate_data_src_ptr) const override + { + if constexpr (Data::is_compilable) + { + Data::compileChangeIfBetterMerge(builder, aggregate_data_dst_ptr, aggregate_data_src_ptr); + } + else + { + throw Exception(getName() + " is not JIT-compilable", ErrorCodes::NOT_IMPLEMENTED); + } + } + + llvm::Value * compileGetResult(llvm::IRBuilderBase & builder, llvm::Value * aggregate_data_ptr) const override + { + if constexpr (Data::is_compilable) + { + return Data::compileGetResult(builder, aggregate_data_ptr); + } + else + { + throw Exception(getName() + " is not JIT-compilable", ErrorCodes::NOT_IMPLEMENTED); + } + } + +#endif +}; +} +} diff --git a/src/AggregateFunctions/Streaming/HelpersMinMax.h b/src/AggregateFunctions/Streaming/HelpersMinMaxAny.h similarity index 55% rename from src/AggregateFunctions/Streaming/HelpersMinMax.h rename to src/AggregateFunctions/Streaming/HelpersMinMaxAny.h index 172a9951e90..7f7a148a6dd 100644 --- a/src/AggregateFunctions/Streaming/HelpersMinMax.h +++ b/src/AggregateFunctions/Streaming/HelpersMinMaxAny.h @@ -1,6 +1,6 @@ #pragma once -#include "AggregateFunctionMinMax.h" +#include "AggregateFunctionMinMaxAny.h" #include #include @@ -53,5 +53,39 @@ static IAggregateFunction * createAggregateFunctionCountedValue( return new AggregateFunctionTemplate>>(argument_type, settings); } + +template