diff --git a/.circleci/dist_compile.yml b/.circleci/dist_compile.yml index 3278a67f6565..94a71eab4f7e 100644 --- a/.circleci/dist_compile.yml +++ b/.circleci/dist_compile.yml @@ -138,85 +138,8 @@ executors: check: docker: - image : ghcr.io/facebookincubator/velox-dev:check-avx - macos-intel: - macos: - xcode: "14.3.0" - resource_class: macos.x86.medium.gen2 - macos-m1: - macos: - xcode: "14.2.0" - resource_class: macos.m1.large.gen1 jobs: - macos-build: - parameters: - os: - type: executor - executor: << parameters.os >> - environment: - ICU_SOURCE: BUNDLED - simdjson_SOURCE: BUNDLED - steps: - - checkout - - update-submodules - - restore_cache: - name: "Restore Dependency Cache" - # The version number in the key can be incremented - # to manually avoid the case where bad dependencies - # are cached, and has no other meaning. - # If you update it, be sure to update save_cache too. - key: velox-circleci-macos-{{ arch }}-deps-v1-{{ checksum ".circleci/config.yml" }}-{{ checksum "scripts/setup-macos.sh" }} - - run: - name: "Install dependencies" - command: | - set -xu - mkdir -p ~/deps ~/deps-src - curl -L https://github.com/Homebrew/brew/tarball/master | tar xz --strip 1 -C ~/deps - PATH=~/deps/bin:${PATH} DEPENDENCY_DIR=~/deps-src INSTALL_PREFIX=~/deps PROMPT_ALWAYS_RESPOND=n ./scripts/setup-macos.sh - rm -rf ~/deps/.git ~/deps/Library/Taps/ # Reduce cache size by 70%. - no_output_timeout: 20m - - save_cache: - name: "Save Dependency Cache" - # The version number in the key can be incremented - # to manually avoid the case where bad dependencies - # are cached, and has no other meaning. - # If you update it, be sure to update restore_cache too. - key: velox-circleci-macos-{{ arch }}-deps-v1-{{ checksum ".circleci/config.yml" }}-{{ checksum "scripts/setup-macos.sh" }} - paths: - - ~/deps - - run: - name: "Calculate merge-base date for CCache" - command: git show -s --format=%cd --date="format:%Y%m%d" $(git merge-base origin/main HEAD) | tee merge-base-date - - restore_cache: - name: "Restore CCache cache" - keys: - - velox-ccache-debug-{{ arch }}-{{ checksum "merge-base-date" }} - - run: - name: "Build on MacOS" - command: | - export PATH=~/deps/bin:~/deps/opt/bison/bin:~/deps/opt/flex/bin:${PATH} - mkdir -p .ccache - export CCACHE_DIR=$(pwd)/.ccache - ccache -sz -M 5Gi - cmake \ - -B _build/debug \ - -GNinja \ - -DTREAT_WARNINGS_AS_ERRORS=1 \ - -DENABLE_ALL_WARNINGS=1 \ - -DVELOX_ENABLE_PARQUET=ON \ - -DCMAKE_BUILD_TYPE=Debug \ - -DCMAKE_PREFIX_PATH=~/deps \ - -DCMAKE_CXX_COMPILER_LAUNCHER=ccache \ - -DFLEX_INCLUDE_DIR=~/deps/opt/flex/include - ninja -C _build/debug - ccache -s - no_output_timeout: 1h - - save_cache: - name: "Save CCache cache" - key: velox-ccache-debug-{{ arch }}-{{ checksum "merge-base-date" }} - paths: - - .ccache/ - linux-build: executor: build environment: @@ -681,10 +604,6 @@ workflows: - linux-build-options - linux-adapters - linux-presto-fuzzer-run - - macos-build: - matrix: - parameters: - os: [macos-intel] - format-check - header-check - doc-gen-job: @@ -692,14 +611,6 @@ workflows: branches: only: - main - - macos-build: - matrix: - parameters: - os: [ macos-m1 ] - filters: - branches: - only: - - main shorter-fuzzer: unless: << pipeline.parameters.run-longer-expression-fuzzer >> @@ -708,10 +619,6 @@ workflows: - linux-pr-fuzzer-run - linux-build-options - linux-adapters - - macos-build: - matrix: - parameters: - os: [ macos-intel ] - format-check - header-check - doc-gen-job: @@ -719,11 +626,3 @@ workflows: branches: only: - main - - macos-build: - matrix: - parameters: - os: [ macos-m1 ] - filters: - branches: - only: - - main diff --git a/.github/workflows/macos.yml b/.github/workflows/macos.yml new file mode 100644 index 000000000000..7c6c87661cd7 --- /dev/null +++ b/.github/workflows/macos.yml @@ -0,0 +1,81 @@ +# Copyright (c) Facebook, Inc. and its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +name: macOS Build + +on: + push: + pull_request: + +permissions: + contents: read + +concurrency: + group: ${{ github.workflow }}-${{ github.repository }}-${{ github.head_ref || github.sha }} + cancel-in-progress: true + +jobs: + macos-build: + name: "${{ matrix.os }}" + strategy: + fail-fast: false + matrix: + # macos-13 = x86_64 Mac + # macos-14 = arm64 Mac + os: [macos-13, macos-14] + runs-on: ${{ matrix.os }} + env: + CCACHE_DIR: '${{ github.workspace }}/.ccache' + steps: + - name: Checkout + uses: actions/checkout@v4 + with: + submodules: recursive + - name: Install Dependencies + run: | + brew install \ + bison boost ccache double-conversion flex fmt gflags glog \ + icu4c libevent libsodium lz4 lzo ninja openssl range-v3 simdjson \ + snappy thrift xz xsimd zstd + + echo "NJOBS=`sysctl -n hw.ncpu`" >> $GITHUB_ENV + + - name: Cache ccache + uses: actions/cache@v4 + with: + path: '${{ env.CCACHE_DIR }}' + key: ccache-macos-${{ matrix.os }}-${{ hashFiles('velox/*') }} + restore-keys: ccache-macos-${{ matrix.os }} + + - name: Configure Build + env: + folly_SOURCE: BUNDLED + run: | + ccache -sz -M 5Gi + cmake \ + -B _build/debug \ + -GNinja \ + -DTREAT_WARNINGS_AS_ERRORS=1 \ + -DENABLE_ALL_WARNINGS=1 \ + -DVELOX_ENABLE_PARQUET=ON \ + -DCMAKE_BUILD_TYPE=Debug + + - name: Build + run: | + cmake --build _build/debug -j $NJOBS + ccache -s + - name: Run Tests + if: false + run: ctest -j $NJOBS --test-dir _build/debug --output-on-failure + diff --git a/CMakeLists.txt b/CMakeLists.txt index b9c88d4add33..1c7dc7d568d3 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -418,7 +418,7 @@ if(${VELOX_ENABLE_DUCKDB}) endif() set_source(fmt) -resolve_dependency(fmt) +resolve_dependency(fmt 9.0.0) if(NOT ${VELOX_BUILD_MINIMAL}) find_package(ZLIB REQUIRED) diff --git a/scripts/setup-adapters.sh b/scripts/setup-adapters.sh index 632dbd33b700..675ff4c0291d 100755 --- a/scripts/setup-adapters.sh +++ b/scripts/setup-adapters.sh @@ -84,6 +84,9 @@ function install_gcs-sdk-cpp { } function install_azure-storage-sdk-cpp { + # Disable VCPKG to install additional static dependencies under the VCPKG installed path + # instead of using system pre-installed dependencies. + export AZURE_SDK_DISABLE_AUTO_VCPKG=ON vcpkg_commit_id=7a6f366cefd27210f6a8309aed10c31104436509 github_checkout azure/azure-sdk-for-cpp azure-storage-files-datalake_12.8.0 sed -i "s/set(VCPKG_COMMIT_STRING .*)/set(VCPKG_COMMIT_STRING $vcpkg_commit_id)/" cmake-modules/AzureVcpkg.cmake diff --git a/scripts/setup-ubuntu.sh b/scripts/setup-ubuntu.sh index 14ac9f144b91..69760cf85ec0 100755 --- a/scripts/setup-ubuntu.sh +++ b/scripts/setup-ubuntu.sh @@ -24,11 +24,12 @@ CPU_TARGET="${CPU_TARGET:-avx}" COMPILER_FLAGS=$(get_cxx_flags "$CPU_TARGET") export COMPILER_FLAGS FB_OS_VERSION=v2023.12.04.00 +FMT_VERSION=10.1.1 NPROC=$(getconf _NPROCESSORS_ONLN) DEPENDENCY_DIR=${DEPENDENCY_DIR:-$(pwd)} export CMAKE_BUILD_TYPE=Release -# Install all velox and folly dependencies. +# Install all velox and folly dependencies. # The is an issue on 22.04 where a version conflict prevents glog install, # installing libunwind first fixes this. apt update && apt install sudo @@ -46,7 +47,6 @@ sudo --preserve-env apt update && sudo --preserve-env apt install -y libunwind-d libboost-all-dev \ libicu-dev \ libdouble-conversion-dev \ - libfmt-dev \ libgoogle-glog-dev \ libbz2-dev \ libgflags-dev \ @@ -87,6 +87,11 @@ function prompt { ) 2> /dev/null } +function install_fmt { + github_checkout fmtlib/fmt "${FMT_VERSION}" + cmake_install -DFMT_TEST=OFF +} + function install_folly { github_checkout facebook/folly "${FB_OS_VERSION}" cmake_install -DBUILD_TESTS=OFF -DFOLLY_HAVE_INT128_T=ON @@ -120,6 +125,7 @@ function install_conda { } function install_velox_deps { + run_and_time install_fmt run_and_time install_folly run_and_time install_fizz run_and_time install_wangle diff --git a/velox/connectors/hive/HiveConnectorUtil.cpp b/velox/connectors/hive/HiveConnectorUtil.cpp index 4fe174ed7949..464d7560691a 100644 --- a/velox/connectors/hive/HiveConnectorUtil.cpp +++ b/velox/connectors/hive/HiveConnectorUtil.cpp @@ -377,7 +377,8 @@ std::shared_ptr makeScanSpec( } std::unique_ptr parseSerdeParameters( - const std::unordered_map& serdeParameters) { + const std::unordered_map& serdeParameters, + const std::unordered_map& tableParameters) { auto fieldIt = serdeParameters.find(dwio::common::SerDeOptions::kFieldDelim); if (fieldIt == serdeParameters.end()) { fieldIt = serdeParameters.find("serialization.format"); @@ -393,9 +394,13 @@ std::unique_ptr parseSerdeParameters( auto mapKeyIt = serdeParameters.find(dwio::common::SerDeOptions::kMapKeyDelim); + auto nullStringIt = tableParameters.find( + dwio::common::TableParameter::kSerializationNullFormat); + if (fieldIt == serdeParameters.end() && collectionIt == serdeParameters.end() && - mapKeyIt == serdeParameters.end()) { + mapKeyIt == serdeParameters.end() && + nullStringIt == tableParameters.end()) { return nullptr; } @@ -413,6 +418,7 @@ std::unique_ptr parseSerdeParameters( } auto serDeOptions = std::make_unique( fieldDelim, collectionDelim, mapKeyDelim); + serDeOptions->nullString = nullStringIt->second; return serDeOptions; } @@ -420,15 +426,15 @@ void configureReaderOptions( dwio::common::ReaderOptions& readerOptions, const std::shared_ptr& hiveConfig, const Config* sessionProperties, - const RowTypePtr& fileSchema, - std::shared_ptr hiveSplit) { + const std::shared_ptr& hiveTableHandle, + const std::shared_ptr& hiveSplit) { readerOptions.setMaxCoalesceBytes(hiveConfig->maxCoalescedBytes()); readerOptions.setMaxCoalesceDistance(hiveConfig->maxCoalescedDistanceBytes()); readerOptions.setFileColumnNamesReadAsLowerCase( hiveConfig->isFileColumnNamesReadAsLowerCase(sessionProperties)); readerOptions.setUseColumnNamesForColumnMapping( hiveConfig->isOrcUseColumnNames(sessionProperties)); - readerOptions.setFileSchema(fileSchema); + readerOptions.setFileSchema(hiveTableHandle->dataColumns()); readerOptions.setFooterEstimatedSize(hiveConfig->footerEstimatedSize()); readerOptions.setFilePreloadThreshold(hiveConfig->filePreloadThreshold()); @@ -439,7 +445,8 @@ void configureReaderOptions( dwio::common::toString(readerOptions.getFileFormat()), dwio::common::toString(hiveSplit->fileFormat)); } else { - auto serDeOptions = parseSerdeParameters(hiveSplit->serdeParameters); + auto serDeOptions = parseSerdeParameters( + hiveSplit->serdeParameters, hiveTableHandle->tableParameters()); if (serDeOptions) { readerOptions.setSerDeOptions(*serDeOptions); } diff --git a/velox/connectors/hive/HiveConnectorUtil.h b/velox/connectors/hive/HiveConnectorUtil.h index 51335f09e76a..67426bef78ca 100644 --- a/velox/connectors/hive/HiveConnectorUtil.h +++ b/velox/connectors/hive/HiveConnectorUtil.h @@ -26,6 +26,7 @@ namespace facebook::velox::connector::hive { class HiveColumnHandle; +class HiveTableHandle; class HiveConfig; struct HiveConnectorSplit; @@ -57,8 +58,8 @@ void configureReaderOptions( dwio::common::ReaderOptions& readerOptions, const std::shared_ptr& config, const Config* sessionProperties, - const RowTypePtr& fileSchema, - std::shared_ptr hiveSplit); + const std::shared_ptr& hiveTableHandle, + const std::shared_ptr& hiveSplit); void configureRowReaderOptions( dwio::common::RowReaderOptions& rowReaderOptions, diff --git a/velox/connectors/hive/SplitReader.cpp b/velox/connectors/hive/SplitReader.cpp index b6cce9860087..92376e566d38 100644 --- a/velox/connectors/hive/SplitReader.cpp +++ b/velox/connectors/hive/SplitReader.cpp @@ -82,7 +82,7 @@ void SplitReader::configureReaderOptions() { baseReaderOpts_, hiveConfig_, connectorQueryCtx_->sessionProperties(), - hiveTableHandle_->dataColumns(), + hiveTableHandle_, hiveSplit_); } diff --git a/velox/connectors/hive/storage_adapters/abfs/AbfsFileSystem.cpp b/velox/connectors/hive/storage_adapters/abfs/AbfsFileSystem.cpp index de86aa7f4386..4382fabb84a6 100644 --- a/velox/connectors/hive/storage_adapters/abfs/AbfsFileSystem.cpp +++ b/velox/connectors/hive/storage_adapters/abfs/AbfsFileSystem.cpp @@ -13,20 +13,23 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + #include "velox/connectors/hive/storage_adapters/abfs/AbfsFileSystem.h" -#include "velox/common/file/File.h" -#include "velox/connectors/hive/HiveConfig.h" -#include "velox/connectors/hive/storage_adapters/abfs/AbfsReadFile.h" -#include "velox/connectors/hive/storage_adapters/abfs/AbfsUtil.h" -#include "velox/core/Config.h" #include #include #include #include +#include "velox/common/file/File.h" +#include "velox/connectors/hive/HiveConfig.h" +#include "velox/connectors/hive/storage_adapters/abfs/AbfsReadFile.h" +#include "velox/connectors/hive/storage_adapters/abfs/AbfsWriteFile.h" +#include "velox/core/Config.h" + namespace facebook::velox::filesystems::abfs { using namespace Azure::Storage::Blobs; + class AbfsConfig { public: AbfsConfig(const Config* config) : config_(config) {} @@ -49,14 +52,12 @@ class AbfsReadFile::Impl { constexpr static uint64_t kReadConcurrency = 8; public: - explicit Impl(const std::string& path, const std::string& connectStr) - : path_(path), connectStr_(connectStr) { - auto abfsAccount = AbfsAccount(path_); - fileSystem_ = abfsAccount.fileSystem(); + explicit Impl(const std::string& path, const std::string& connectStr) { + auto abfsAccount = AbfsAccount(path); fileName_ = abfsAccount.filePath(); fileClient_ = std::make_unique(BlobClient::CreateFromConnectionString( - connectStr_, fileSystem_, fileName_)); + connectStr, abfsAccount.fileSystem(), fileName_)); } void initialize() { @@ -153,9 +154,6 @@ class AbfsReadFile::Impl { reinterpret_cast(position), length); } - const std::string path_; - const std::string connectStr_; - std::string fileSystem_; std::string fileName_; std::unique_ptr fileClient_; @@ -250,4 +248,13 @@ std::unique_ptr AbfsFileSystem::openFileForRead( abfsfile->initialize(); return abfsfile; } + +std::unique_ptr AbfsFileSystem::openFileForWrite( + std::string_view path, + const FileOptions& /*unused*/) { + auto abfsfile = std::make_unique( + std::string(path), impl_->connectionString(std::string(path))); + abfsfile->initialize(); + return abfsfile; +} } // namespace facebook::velox::filesystems::abfs diff --git a/velox/connectors/hive/storage_adapters/abfs/AbfsFileSystem.h b/velox/connectors/hive/storage_adapters/abfs/AbfsFileSystem.h index f67789243545..4b8ec74d5954 100644 --- a/velox/connectors/hive/storage_adapters/abfs/AbfsFileSystem.h +++ b/velox/connectors/hive/storage_adapters/abfs/AbfsFileSystem.h @@ -44,9 +44,7 @@ class AbfsFileSystem : public FileSystem { std::unique_ptr openFileForWrite( std::string_view path, - const FileOptions& options = {}) override { - VELOX_UNSUPPORTED("write for abfs not implemented"); - } + const FileOptions& options = {}) override; void rename( std::string_view path, diff --git a/velox/connectors/hive/storage_adapters/abfs/AbfsWriteFile.cpp b/velox/connectors/hive/storage_adapters/abfs/AbfsWriteFile.cpp new file mode 100644 index 000000000000..c231954258f5 --- /dev/null +++ b/velox/connectors/hive/storage_adapters/abfs/AbfsWriteFile.cpp @@ -0,0 +1,169 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/connectors/hive/storage_adapters/abfs/AbfsWriteFile.h" + +#include + +namespace facebook::velox::filesystems::abfs { +class BlobStorageFileClient final : public IBlobStorageFileClient { + public: + BlobStorageFileClient(std::unique_ptr client) + : client_(std::move(client)) {} + + void create() override { + client_->Create(); + } + + PathProperties getProperties() override { + return client_->GetProperties().Value; + } + + void append(const uint8_t* buffer, size_t size, uint64_t offset) override { + auto bodyStream = Azure::Core::IO::MemoryBodyStream(buffer, size); + client_->Append(bodyStream, offset); + } + + void flush(uint64_t position) override { + client_->Flush(position); + } + + void close() override { + // do nothing. + } + + private: + const std::unique_ptr client_; +}; + +class AbfsWriteFile::Impl { + public: + explicit Impl(const std::string& path, const std::string& connectStr) + : path_(path), connectStr_(connectStr) { + // Make it a no-op if invoked twice. + if (position_ != -1) { + return; + } + position_ = 0; + } + + void initialize() { + if (!blobStorageFileClient_) { + auto abfsAccount = AbfsAccount(path_); + blobStorageFileClient_ = std::make_unique( + std::make_unique( + DataLakeFileClient::CreateFromConnectionString( + connectStr_, + abfsAccount.fileSystem(), + abfsAccount.filePath()))); + } + + VELOX_CHECK(!checkIfFileExists(), "File already exists"); + blobStorageFileClient_->create(); + } + + void testingSetFileClient( + const std::shared_ptr& blobStorageManager) { + blobStorageFileClient_ = blobStorageManager; + } + + void close() { + if (!closed_) { + flush(); + blobStorageFileClient_->close(); + closed_ = true; + } + } + + void flush() { + if (!closed_) { + blobStorageFileClient_->flush(position_); + } + } + + void append(std::string_view data) { + VELOX_CHECK(!closed_, "File is not open"); + if (data.size() == 0) { + return; + } + append(data.data(), data.size()); + } + + uint64_t size() const { + return blobStorageFileClient_->getProperties().FileSize; + } + + void append(const char* buffer, size_t size) { + blobStorageFileClient_->append( + reinterpret_cast(buffer), size, position_); + position_ += size; + } + + private: + bool checkIfFileExists() { + try { + blobStorageFileClient_->getProperties(); + return true; + } catch (Azure::Storage::StorageException& e) { + if (e.StatusCode == Azure::Core::Http::HttpStatusCode::NotFound) { + return false; + } else { + throwStorageExceptionWithOperationDetails("GetProperties", path_, e); + } + } + } + + const std::string path_; + const std::string connectStr_; + std::string fileSystem_; + std::string fileName_; + std::shared_ptr blobStorageFileClient_; + + uint64_t position_ = -1; + bool closed_ = false; +}; + +AbfsWriteFile::AbfsWriteFile( + const std::string& path, + const std::string& connectStr) { + impl_ = std::make_shared(path, connectStr); +} + +void AbfsWriteFile::initialize() { + impl_->initialize(); +} + +void AbfsWriteFile::close() { + impl_->close(); +} + +void AbfsWriteFile::flush() { + impl_->flush(); +} + +void AbfsWriteFile::append(std::string_view data) { + impl_->append(data); +} + +uint64_t AbfsWriteFile::size() const { + return impl_->size(); +} + +void AbfsWriteFile::testingSetFileClient( + const std::shared_ptr& fileClient) { + impl_->testingSetFileClient(fileClient); +} +} // namespace facebook::velox::filesystems::abfs diff --git a/velox/connectors/hive/storage_adapters/abfs/AbfsWriteFile.h b/velox/connectors/hive/storage_adapters/abfs/AbfsWriteFile.h new file mode 100644 index 000000000000..72549720344f --- /dev/null +++ b/velox/connectors/hive/storage_adapters/abfs/AbfsWriteFile.h @@ -0,0 +1,88 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "velox/common/file/File.h" +#include "velox/connectors/hive/storage_adapters/abfs/AbfsUtil.h" + +namespace Azure::Storage::Files::DataLake::Models { +class PathProperties; +} + +namespace facebook::velox::filesystems::abfs { +using namespace Azure::Storage::Files::DataLake; +using namespace Azure::Storage::Files::DataLake::Models; + +/* + * We are using the DFS (Data Lake Storage) endpoint for Azure Blob File write + * operations because the DFS endpoint is designed to be compatible with file + * operation semantics, such as `Append` to a file and file `Flush` operations. + * The legacy Blob endpoint can only be used for blob level append and flush + * operations. When using the Blob endpoint, we would need to manually manage + * the creation, appending, and committing of file-related blocks. + * + * However, the Azurite Simulator does not yet support the DFS endpoint. + * (For more information, see https://github.com/Azure/Azurite/issues/553 and + * https://github.com/Azure/Azurite/issues/409). + * You can find a comparison between DFS and Blob endpoints here: + * https://github.com/Azure/Azurite/wiki/ADLS-Gen2-Implementation-Guidance + * + * To facilitate unit testing of file write scenarios, we define the + * IBlobStorageFileClient here, which can be mocked during testing. + */ +class IBlobStorageFileClient { + public: + virtual void create() = 0; + virtual PathProperties getProperties() = 0; + virtual void append(const uint8_t* buffer, size_t size, uint64_t offset) = 0; + virtual void flush(uint64_t position) = 0; + virtual void close() = 0; +}; + +/// Implementation of abfs write file. Nothing written to the file should be +/// read back until it is closed. +class AbfsWriteFile : public WriteFile { + public: + constexpr static uint64_t kNaturalWriteSize = 8 << 20; // 8M + /// The constructor. + /// @param path The file path to write. + /// @param connectStr the connection string used to auth the storage account. + AbfsWriteFile(const std::string& path, const std::string& connectStr); + + /// check any issue reading file. + void initialize(); + + /// Get the file size. + uint64_t size() const override; + + /// Flush the data. + void flush() override; + + /// Write the data by append mode. + void append(std::string_view data) override; + + /// Close the file. + void close() override; + + /// Used by tests to override the FileSystem client. + void testingSetFileClient( + const std::shared_ptr& fileClient); + + protected: + class Impl; + std::shared_ptr impl_; +}; +} // namespace facebook::velox::filesystems::abfs diff --git a/velox/connectors/hive/storage_adapters/abfs/CMakeLists.txt b/velox/connectors/hive/storage_adapters/abfs/CMakeLists.txt index 74e5a8d81c91..cd4cee572e5e 100644 --- a/velox/connectors/hive/storage_adapters/abfs/CMakeLists.txt +++ b/velox/connectors/hive/storage_adapters/abfs/CMakeLists.txt @@ -17,7 +17,8 @@ add_library(velox_abfs RegisterAbfsFileSystem.cpp) if(VELOX_ENABLE_ABFS) - target_sources(velox_abfs PRIVATE AbfsFileSystem.cpp AbfsUtils.cpp) + target_sources(velox_abfs PRIVATE AbfsFileSystem.cpp AbfsWriteFile.cpp + AbfsUtils.cpp) target_link_libraries( velox_abfs PUBLIC velox_file @@ -25,6 +26,7 @@ if(VELOX_ENABLE_ABFS) velox_hive_config velox_dwio_common_exception Azure::azure-storage-blobs + Azure::azure-storage-files-datalake Folly::folly glog::glog fmt::fmt) diff --git a/velox/connectors/hive/storage_adapters/abfs/tests/AbfsFileSystemTest.cpp b/velox/connectors/hive/storage_adapters/abfs/tests/AbfsFileSystemTest.cpp index 76dc3e834261..44fa5a3c8508 100644 --- a/velox/connectors/hive/storage_adapters/abfs/tests/AbfsFileSystemTest.cpp +++ b/velox/connectors/hive/storage_adapters/abfs/tests/AbfsFileSystemTest.cpp @@ -14,29 +14,32 @@ * limitations under the License. */ -#include "velox/connectors/hive/storage_adapters/abfs/AbfsFileSystem.h" -#include "gtest/gtest.h" +#include +#include +#include +#include + #include "velox/common/base/tests/GTestUtils.h" #include "velox/common/file/File.h" #include "velox/common/file/FileSystems.h" #include "velox/connectors/hive/FileHandle.h" #include "velox/connectors/hive/HiveConfig.h" +#include "velox/connectors/hive/storage_adapters/abfs/AbfsFileSystem.h" #include "velox/connectors/hive/storage_adapters/abfs/AbfsReadFile.h" +#include "velox/connectors/hive/storage_adapters/abfs/AbfsWriteFile.h" #include "velox/connectors/hive/storage_adapters/abfs/tests/AzuriteServer.h" +#include "velox/connectors/hive/storage_adapters/abfs/tests/MockBlobStorageFileClient.h" #include "velox/exec/tests/utils/PortUtil.h" #include "velox/exec/tests/utils/TempFilePath.h" -#include -#include - using namespace facebook::velox; - +using namespace facebook::velox::filesystems::abfs; using ::facebook::velox::common::Region; constexpr int kOneMB = 1 << 20; static const std::string filePath = "test_file.txt"; static const std::string fullFilePath = - facebook::velox::filesystems::test::AzuriteABFSEndpoint + filePath; + filesystems::test::AzuriteABFSEndpoint + filePath; class AbfsFileSystemTest : public testing::Test { public: @@ -55,14 +58,11 @@ class AbfsFileSystemTest : public testing::Test { } public: - std::shared_ptr - azuriteServer; + std::shared_ptr azuriteServer; void SetUp() override { auto port = facebook::velox::exec::test::getFreePort(); - azuriteServer = - std::make_shared( - port); + azuriteServer = std::make_shared(port); azuriteServer->start(); auto tempFile = createFile(); azuriteServer->addFile(tempFile->path, filePath); @@ -72,13 +72,50 @@ class AbfsFileSystemTest : public testing::Test { azuriteServer->stop(); } + std::unique_ptr openFileForWrite( + std::string_view path, + std::shared_ptr client) { + auto abfsfile = std::make_unique( + std::string(path), azuriteServer->connectionStr()); + abfsfile->testingSetFileClient(client); + abfsfile->initialize(); + return abfsfile; + } + + static std::string generateRandomData(int size) { + static const char charset[] = + "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"; + + std::string data(size, ' '); + + for (int i = 0; i < size; ++i) { + int index = rand() % (sizeof(charset) - 1); + data[i] = charset[index]; + } + + return data; + } + private: - static std::shared_ptr<::exec::test::TempFilePath> createFile() { + static std::shared_ptr<::exec::test::TempFilePath> createFile( + uint64_t size = -1) { auto tempFile = ::exec::test::TempFilePath::create(); - tempFile->append("aaaaa"); - tempFile->append("bbbbb"); - tempFile->append(std::string(kOneMB, 'c')); - tempFile->append("ddddd"); + if (size == -1) { + tempFile->append("aaaaa"); + tempFile->append("bbbbb"); + tempFile->append(std::string(kOneMB, 'c')); + tempFile->append("ddddd"); + } else { + const uint64_t totalSize = size * 1024 * 1024; + const uint64_t chunkSize = 5 * 1024 * 1024; + uint64_t remainingSize = totalSize; + while (remainingSize > 0) { + uint64_t dataSize = std::min(remainingSize, chunkSize); + std::string randomData = generateRandomData(dataSize); + tempFile->append(randomData); + remainingSize -= dataSize; + } + } return tempFile; } }; @@ -181,13 +218,44 @@ TEST_F(AbfsFileSystemTest, missingFile) { abfs->openFileForRead(abfsFile), error_code::kFileNotFound, "404"); } -TEST_F(AbfsFileSystemTest, openFileForWriteNotImplemented) { - auto hiveConfig = AbfsFileSystemTest::hiveConfig( - {{"fs.azure.account.key.test.dfs.core.windows.net", - azuriteServer->connectionStr()}}); - auto abfs = std::make_shared(hiveConfig); +TEST_F(AbfsFileSystemTest, OpenFileForWriteTest) { + const std::string abfsFile = + filesystems::test::AzuriteABFSEndpoint + "writetest.txt"; + auto mockClient = + std::make_shared( + filesystems::test::MockBlobStorageFileClient()); + auto abfsWriteFile = openFileForWrite(abfsFile, mockClient); + EXPECT_EQ(abfsWriteFile->size(), 0); + std::string dataContent = ""; + uint64_t totalSize = 0; + std::string randomData = + AbfsFileSystemTest::generateRandomData(1 * 1024 * 1024); + for (int i = 0; i < 8; ++i) { + abfsWriteFile->append(randomData); + dataContent += randomData; + } + totalSize = randomData.size() * 8; + abfsWriteFile->flush(); + EXPECT_EQ(abfsWriteFile->size(), totalSize); + + randomData = AbfsFileSystemTest::generateRandomData(9 * 1024 * 1024); + dataContent += randomData; + abfsWriteFile->append(randomData); + totalSize += randomData.size(); + randomData = AbfsFileSystemTest::generateRandomData(2 * 1024 * 1024); + dataContent += randomData; + totalSize += randomData.size(); + abfsWriteFile->append(randomData); + abfsWriteFile->flush(); + EXPECT_EQ(abfsWriteFile->size(), totalSize); + abfsWriteFile->flush(); + abfsWriteFile->close(); + VELOX_ASSERT_THROW(abfsWriteFile->append("abc"), "File is not open"); VELOX_ASSERT_THROW( - abfs->openFileForWrite(fullFilePath), "write for abfs not implemented"); + openFileForWrite(abfsFile, mockClient), "File already exists"); + std::string fileContent = mockClient->readContent(); + ASSERT_EQ(fileContent.size(), dataContent.size()); + ASSERT_EQ(fileContent, dataContent); } TEST_F(AbfsFileSystemTest, renameNotImplemented) { @@ -243,9 +311,7 @@ TEST_F(AbfsFileSystemTest, credNotFOund) { const std::string abfsFile = std::string("abfs://test@test1.dfs.core.windows.net/test"); auto hiveConfig = AbfsFileSystemTest::hiveConfig({}); - auto abfs = - std::make_shared( - hiveConfig); + auto abfs = std::make_shared(hiveConfig); VELOX_ASSERT_THROW( abfs->openFileForRead(abfsFile), "Failed to find storage credentials"); } diff --git a/velox/connectors/hive/storage_adapters/abfs/tests/AzuriteServer.h b/velox/connectors/hive/storage_adapters/abfs/tests/AzuriteServer.h index 165cb2767c11..4836183f3819 100644 --- a/velox/connectors/hive/storage_adapters/abfs/tests/AzuriteServer.h +++ b/velox/connectors/hive/storage_adapters/abfs/tests/AzuriteServer.h @@ -36,8 +36,8 @@ static const std::string AzuriteAccountKey{ "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=="}; static const std::string AzuriteABFSEndpoint = fmt::format( "abfs://{}@{}.dfs.core.windows.net/", - AzuriteAccountName, - AzuriteContainerName); + AzuriteContainerName, + AzuriteAccountName); class AzuriteServer { public: diff --git a/velox/connectors/hive/storage_adapters/abfs/tests/CMakeLists.txt b/velox/connectors/hive/storage_adapters/abfs/tests/CMakeLists.txt index 297a7db4e1bc..2fb451171b22 100644 --- a/velox/connectors/hive/storage_adapters/abfs/tests/CMakeLists.txt +++ b/velox/connectors/hive/storage_adapters/abfs/tests/CMakeLists.txt @@ -13,7 +13,7 @@ # limitations under the License. add_executable(velox_abfs_test AbfsFileSystemTest.cpp AbfsUtilTest.cpp - AzuriteServer.cpp) + AzuriteServer.cpp MockBlobStorageFileClient.cpp) add_test(velox_abfs_test velox_abfs_test) target_link_libraries( velox_abfs_test @@ -26,4 +26,5 @@ target_link_libraries( velox_exec gtest gtest_main - Azure::azure-storage-blobs) + Azure::azure-storage-blobs + Azure::azure-storage-files-datalake) diff --git a/velox/connectors/hive/storage_adapters/abfs/tests/MockBlobStorageFileClient.cpp b/velox/connectors/hive/storage_adapters/abfs/tests/MockBlobStorageFileClient.cpp new file mode 100644 index 000000000000..5f0cf9fa1efd --- /dev/null +++ b/velox/connectors/hive/storage_adapters/abfs/tests/MockBlobStorageFileClient.cpp @@ -0,0 +1,60 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/connectors/hive/storage_adapters/abfs/tests/MockBlobStorageFileClient.h" + +#include + +#include + +using namespace Azure::Storage::Files::DataLake; +namespace facebook::velox::filesystems::test { +void MockBlobStorageFileClient::create() { + fileStream_ = std::ofstream( + filePath_, + std::ios_base::out | std::ios_base::binary | std::ios_base::app); +} + +PathProperties MockBlobStorageFileClient::getProperties() { + if (!std::filesystem::exists(filePath_)) { + Azure::Storage::StorageException exp(filePath_ + "doesn't exists"); + exp.StatusCode = Azure::Core::Http::HttpStatusCode::NotFound; + throw exp; + } + std::ifstream file(filePath_, std::ios::binary | std::ios::ate); + uint64_t size = static_cast(file.tellg()); + PathProperties ret; + ret.FileSize = size; + return ret; +} + +void MockBlobStorageFileClient::append( + const uint8_t* buffer, + size_t size, + uint64_t offset) { + fileStream_.seekp(offset); + fileStream_.write(reinterpret_cast(buffer), size); +} + +void MockBlobStorageFileClient::flush(uint64_t position) { + fileStream_.flush(); +} + +void MockBlobStorageFileClient::close() { + fileStream_.flush(); + fileStream_.close(); +} +} // namespace facebook::velox::filesystems::test diff --git a/velox/connectors/hive/storage_adapters/abfs/tests/MockBlobStorageFileClient.h b/velox/connectors/hive/storage_adapters/abfs/tests/MockBlobStorageFileClient.h new file mode 100644 index 000000000000..046cb094c1b1 --- /dev/null +++ b/velox/connectors/hive/storage_adapters/abfs/tests/MockBlobStorageFileClient.h @@ -0,0 +1,56 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/connectors/hive/storage_adapters/abfs/AbfsWriteFile.h" + +#include "velox/exec/tests/utils/TempFilePath.h" + +using namespace facebook::velox; +using namespace facebook::velox::filesystems::abfs; + +namespace facebook::velox::filesystems::test { +// A mocked blob storage file client backend with local file store. +class MockBlobStorageFileClient : public IBlobStorageFileClient { + public: + MockBlobStorageFileClient() { + auto tempFile = ::exec::test::TempFilePath::create(); + filePath_ = tempFile->path; + } + + void create() override; + PathProperties getProperties() override; + void append(const uint8_t* buffer, size_t size, uint64_t offset) override; + void flush(uint64_t position) override; + void close() override; + + // for testing purpose to verify the written content if correct. + std::string readContent() { + std::ifstream inputFile(filePath_); + std::string content; + inputFile.seekg(0, std::ios::end); + std::streamsize fileSize = inputFile.tellg(); + inputFile.seekg(0, std::ios::beg); + content.resize(fileSize); + inputFile.read(&content[0], fileSize); + inputFile.close(); + return content; + } + + private: + std::string filePath_; + std::ofstream fileStream_; +}; +} // namespace facebook::velox::filesystems::test diff --git a/velox/core/QueryConfig.h b/velox/core/QueryConfig.h index 774c87f52517..39bf52cfdab9 100644 --- a/velox/core/QueryConfig.h +++ b/velox/core/QueryConfig.h @@ -578,7 +578,7 @@ class QueryConfig { /// calculate the spilling partition number for join spill or aggregation /// spill. uint8_t spillStartPartitionBit() const { - constexpr uint8_t kDefaultStartBit = 29; + constexpr uint8_t kDefaultStartBit = 48; return get(kSpillStartPartitionBit, kDefaultStartBit); } diff --git a/velox/docs/functions/spark/array.rst b/velox/docs/functions/spark/array.rst index 2183f4f301c2..f80e13923dc0 100644 --- a/velox/docs/functions/spark/array.rst +++ b/velox/docs/functions/spark/array.rst @@ -51,6 +51,18 @@ Array Functions SELECT array_min(ARRAY [4.0, float('nan')]); -- 4.0 SELECT array_min(ARRAY [NULL, float('nan')]); -- NaN +.. spark:function:: array_repeat(element, count) -> array(E) + + Returns an array containing ``element`` ``count`` times. If ``count`` is negative or zero, + returns empty array. If ``element`` is NULL, returns an array containing ``count`` NULLs. + If ``count`` is NULL, returns NULL as result. Throws an exception if ``count`` exceeds 10'000. :: + + SELECT array_repeat(100, 3); -- [100, 100, 100] + SELECT array_repeat(NULL, 3); -- [NULL, NULL, NULL] + SELECT array_repeat(100, NULL); -- NULL + SELECT array_repeat(100, 0); -- [] + SELECT array_repeat(100, -1); -- [] + .. spark:function:: array_sort(array(E)) -> array(E) Returns an array which has the sorted order of the input array(E). The elements of array(E) must diff --git a/velox/dwio/common/Options.h b/velox/dwio/common/Options.h index 690af80e727b..c154f8f4a6e0 100644 --- a/velox/dwio/common/Options.h +++ b/velox/dwio/common/Options.h @@ -94,6 +94,8 @@ class SerDeOptions { struct TableParameter { static constexpr const char* kSkipHeaderLineCount = "skip.header.line.count"; + static constexpr const char* kSerializationNullFormat = + "serialization.null.format"; }; /** diff --git a/velox/dwio/parquet/reader/CMakeLists.txt b/velox/dwio/parquet/reader/CMakeLists.txt index 3fb5250b7e64..fbb38dd64eef 100644 --- a/velox/dwio/parquet/reader/CMakeLists.txt +++ b/velox/dwio/parquet/reader/CMakeLists.txt @@ -23,7 +23,6 @@ add_library( ParquetData.cpp RepeatedColumnReader.cpp RleBpDecoder.cpp - Statistics.cpp StructColumnReader.cpp StringColumnReader.cpp) diff --git a/velox/dwio/parquet/reader/Metadata.cpp b/velox/dwio/parquet/reader/Metadata.cpp index c0fa6ab7ca02..771e68e8a595 100644 --- a/velox/dwio/parquet/reader/Metadata.cpp +++ b/velox/dwio/parquet/reader/Metadata.cpp @@ -15,11 +15,142 @@ */ #include "velox/dwio/parquet/reader/Metadata.h" - -#include "velox/dwio/parquet/reader/Statistics.h" +#include "velox/dwio/parquet/thrift/ParquetThriftTypes.h" namespace facebook::velox::parquet { +template +inline const T load(const char* ptr) { + T ret; + std::memcpy(&ret, ptr, sizeof(ret)); + return ret; +} + +template +inline std::optional getMin(const thrift::Statistics& columnChunkStats) { + return columnChunkStats.__isset.min_value + ? load(columnChunkStats.min_value.data()) + : (columnChunkStats.__isset.min + ? std::optional(load(columnChunkStats.min.data())) + : std::nullopt); +} + +template +inline std::optional getMax(const thrift::Statistics& columnChunkStats) { + return columnChunkStats.__isset.max_value + ? std::optional(load(columnChunkStats.max_value.data())) + : (columnChunkStats.__isset.max + ? std::optional(load(columnChunkStats.max.data())) + : std::nullopt); +} + +template <> +inline std::optional getMin( + const thrift::Statistics& columnChunkStats) { + return columnChunkStats.__isset.min_value + ? std::optional(columnChunkStats.min_value) + : (columnChunkStats.__isset.min ? std::optional(columnChunkStats.min) + : std::nullopt); +} + +template <> +inline std::optional getMax( + const thrift::Statistics& columnChunkStats) { + return columnChunkStats.__isset.max_value + ? std::optional(columnChunkStats.max_value) + : (columnChunkStats.__isset.max ? std::optional(columnChunkStats.max) + : std::nullopt); +} + +std::unique_ptr buildColumnStatisticsFromThrift( + const thrift::Statistics& columnChunkStats, + const velox::Type& type, + uint64_t numRowsInRowGroup) { + std::optional nullCount = columnChunkStats.__isset.null_count + ? std::optional(columnChunkStats.null_count) + : std::nullopt; + std::optional valueCount = nullCount.has_value() + ? std::optional(numRowsInRowGroup - nullCount.value()) + : std::nullopt; + std::optional hasNull = columnChunkStats.__isset.null_count + ? std::optional(columnChunkStats.null_count > 0) + : std::nullopt; + + switch (type.kind()) { + case TypeKind::BOOLEAN: + return std::make_unique( + valueCount, hasNull, std::nullopt, std::nullopt, std::nullopt); + case TypeKind::TINYINT: + return std::make_unique( + valueCount, + hasNull, + std::nullopt, + std::nullopt, + getMin(columnChunkStats), + getMax(columnChunkStats), + std::nullopt); + case TypeKind::SMALLINT: + return std::make_unique( + valueCount, + hasNull, + std::nullopt, + std::nullopt, + getMin(columnChunkStats), + getMax(columnChunkStats), + std::nullopt); + case TypeKind::INTEGER: + return std::make_unique( + valueCount, + hasNull, + std::nullopt, + std::nullopt, + getMin(columnChunkStats), + getMax(columnChunkStats), + std::nullopt); + case TypeKind::BIGINT: + return std::make_unique( + valueCount, + hasNull, + std::nullopt, + std::nullopt, + getMin(columnChunkStats), + getMax(columnChunkStats), + std::nullopt); + case TypeKind::REAL: + return std::make_unique( + valueCount, + hasNull, + std::nullopt, + std::nullopt, + getMin(columnChunkStats), + getMax(columnChunkStats), + std::nullopt); + case TypeKind::DOUBLE: + return std::make_unique( + valueCount, + hasNull, + std::nullopt, + std::nullopt, + getMin(columnChunkStats), + getMax(columnChunkStats), + std::nullopt); + case TypeKind::VARCHAR: + case TypeKind::VARBINARY: + return std::make_unique( + valueCount, + hasNull, + std::nullopt, + std::nullopt, + getMin(columnChunkStats), + getMax(columnChunkStats), + std::nullopt); + + default: + return std::make_unique( + valueCount, hasNull, std::nullopt, std::nullopt); + } +} + common::CompressionKind thriftCodecToCompressionKind( thrift::CompressionCodec::type codec) { switch (codec) { diff --git a/velox/dwio/parquet/reader/ParquetColumnReader.cpp b/velox/dwio/parquet/reader/ParquetColumnReader.cpp index ea3169ae727a..c3816c0e960a 100644 --- a/velox/dwio/parquet/reader/ParquetColumnReader.cpp +++ b/velox/dwio/parquet/reader/ParquetColumnReader.cpp @@ -25,10 +25,8 @@ #include "velox/dwio/parquet/reader/FloatingPointColumnReader.h" #include "velox/dwio/parquet/reader/IntegerColumnReader.h" #include "velox/dwio/parquet/reader/RepeatedColumnReader.h" -#include "velox/dwio/parquet/reader/Statistics.h" #include "velox/dwio/parquet/reader/StringColumnReader.h" #include "velox/dwio/parquet/reader/StructColumnReader.h" -#include "velox/dwio/parquet/thrift/ParquetThriftTypes.h" namespace facebook::velox::parquet { diff --git a/velox/dwio/parquet/reader/ParquetData.cpp b/velox/dwio/parquet/reader/ParquetData.cpp index 283190bbfb0a..a2688403ebcd 100644 --- a/velox/dwio/parquet/reader/ParquetData.cpp +++ b/velox/dwio/parquet/reader/ParquetData.cpp @@ -17,7 +17,6 @@ #include "velox/dwio/parquet/reader/ParquetData.h" #include "velox/dwio/common/BufferedInput.h" -#include "velox/dwio/parquet/reader/Statistics.h" namespace facebook::velox::parquet { diff --git a/velox/dwio/parquet/reader/Statistics.cpp b/velox/dwio/parquet/reader/Statistics.cpp deleted file mode 100644 index e7ee86a8b768..000000000000 --- a/velox/dwio/parquet/reader/Statistics.cpp +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Copyright (c) Facebook, Inc. and its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "velox/dwio/parquet/reader/Statistics.h" - -#include "velox/dwio/common/Statistics.h" -#include "velox/type/Type.h" - -namespace facebook::velox::parquet { - -std::unique_ptr buildColumnStatisticsFromThrift( - const thrift::Statistics& columnChunkStats, - const velox::Type& type, - uint64_t numRowsInRowGroup) { - std::optional nullCount = columnChunkStats.__isset.null_count - ? std::optional(columnChunkStats.null_count) - : std::nullopt; - std::optional valueCount = nullCount.has_value() - ? std::optional(numRowsInRowGroup - nullCount.value()) - : std::nullopt; - std::optional hasNull = columnChunkStats.__isset.null_count - ? std::optional(columnChunkStats.null_count > 0) - : std::nullopt; - - switch (type.kind()) { - case TypeKind::BOOLEAN: - return std::make_unique( - valueCount, hasNull, std::nullopt, std::nullopt, std::nullopt); - case TypeKind::TINYINT: - return std::make_unique( - valueCount, - hasNull, - std::nullopt, - std::nullopt, - getMin(columnChunkStats), - getMax(columnChunkStats), - std::nullopt); - case TypeKind::SMALLINT: - return std::make_unique( - valueCount, - hasNull, - std::nullopt, - std::nullopt, - getMin(columnChunkStats), - getMax(columnChunkStats), - std::nullopt); - case TypeKind::INTEGER: - return std::make_unique( - valueCount, - hasNull, - std::nullopt, - std::nullopt, - getMin(columnChunkStats), - getMax(columnChunkStats), - std::nullopt); - case TypeKind::BIGINT: - return std::make_unique( - valueCount, - hasNull, - std::nullopt, - std::nullopt, - getMin(columnChunkStats), - getMax(columnChunkStats), - std::nullopt); - case TypeKind::REAL: - return std::make_unique( - valueCount, - hasNull, - std::nullopt, - std::nullopt, - getMin(columnChunkStats), - getMax(columnChunkStats), - std::nullopt); - case TypeKind::DOUBLE: - return std::make_unique( - valueCount, - hasNull, - std::nullopt, - std::nullopt, - getMin(columnChunkStats), - getMax(columnChunkStats), - std::nullopt); - case TypeKind::VARCHAR: - case TypeKind::VARBINARY: - return std::make_unique( - valueCount, - hasNull, - std::nullopt, - std::nullopt, - getMin(columnChunkStats), - getMax(columnChunkStats), - std::nullopt); - - default: - return std::make_unique( - valueCount, hasNull, std::nullopt, std::nullopt); - } -} - -} // namespace facebook::velox::parquet diff --git a/velox/dwio/parquet/reader/Statistics.h b/velox/dwio/parquet/reader/Statistics.h deleted file mode 100644 index 18f67d5b13b0..000000000000 --- a/velox/dwio/parquet/reader/Statistics.h +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Copyright (c) Facebook, Inc. and its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include "velox/dwio/parquet/thrift/ParquetThriftTypes.h" - -#include -#include - -namespace facebook::velox { -class Type; -} - -namespace facebook::velox::dwio::common { -class ColumnStatistics; -} - -namespace facebook::velox::parquet { - -// TODO: provide function to merge multiple Statistics into one - -template -inline const T load(const char* ptr) { - T ret; - std::memcpy(&ret, ptr, sizeof(ret)); - return ret; -} - -template -inline std::optional getMin(const thrift::Statistics& columnChunkStats) { - return columnChunkStats.__isset.min_value - ? load(columnChunkStats.min_value.data()) - : (columnChunkStats.__isset.min - ? std::optional(load(columnChunkStats.min.data())) - : std::nullopt); -} - -template -inline std::optional getMax(const thrift::Statistics& columnChunkStats) { - return columnChunkStats.__isset.max_value - ? std::optional(load(columnChunkStats.max_value.data())) - : (columnChunkStats.__isset.max - ? std::optional(load(columnChunkStats.max.data())) - : std::nullopt); -} - -template <> -inline std::optional getMin( - const thrift::Statistics& columnChunkStats) { - return columnChunkStats.__isset.min_value - ? std::optional(columnChunkStats.min_value) - : (columnChunkStats.__isset.min ? std::optional(columnChunkStats.min) - : std::nullopt); -} - -template <> -inline std::optional getMax( - const thrift::Statistics& columnChunkStats) { - return columnChunkStats.__isset.max_value - ? std::optional(columnChunkStats.max_value) - : (columnChunkStats.__isset.max ? std::optional(columnChunkStats.max) - : std::nullopt); -} - -std::unique_ptr buildColumnStatisticsFromThrift( - const thrift::Statistics& columnChunkStats, - const velox::Type& type, - uint64_t numRowsInRowGroup); - -} // namespace facebook::velox::parquet diff --git a/velox/dwio/parquet/tests/CMakeLists.txt b/velox/dwio/parquet/tests/CMakeLists.txt index 32d0d67afca4..427a9085d828 100644 --- a/velox/dwio/parquet/tests/CMakeLists.txt +++ b/velox/dwio/parquet/tests/CMakeLists.txt @@ -43,3 +43,6 @@ target_link_libraries( velox_aggregates velox_tpch_gen ${TEST_LINK_LIBS}) + +file(COPY ${CMAKE_CURRENT_SOURCE_DIR}/examples + DESTINATION ${CMAKE_CURRENT_BINARY_DIR}) diff --git a/velox/exec/ExchangeSource.h b/velox/exec/ExchangeSource.h index 452c8ded9f42..8cee990d8c02 100644 --- a/velox/exec/ExchangeSource.h +++ b/velox/exec/ExchangeSource.h @@ -40,12 +40,6 @@ class ExchangeSource : public std::enable_shared_from_this { std::shared_ptr queue, memory::MemoryPool* pool); - /// Temporary API to indicate whether 'request(maxBytes, maxWaitSeconds)' API - /// is supported. - virtual bool supportsFlowControlV2() const { - VELOX_UNREACHABLE(); - } - /// Temporary API to indicate whether 'metrics()' API /// is supported. virtual bool supportsMetrics() const { @@ -65,14 +59,6 @@ class ExchangeSource : public std::enable_shared_from_this { return requestPending_; } - /// Requests the producer to generate up to 'maxBytes' more data. - /// Returns a future that completes when producer responds either with 'data' - /// or with a message indicating that all data has been already produced or - /// data will take more time to produce. - virtual ContinueFuture request(uint32_t /*maxBytes*/) { - VELOX_NYI(); - } - struct Response { /// Size of the response in bytes. Zero means response didn't contain any /// data. diff --git a/velox/exec/GroupingSet.cpp b/velox/exec/GroupingSet.cpp index e45f4e5cb8c6..a0fb9136a89a 100644 --- a/velox/exec/GroupingSet.cpp +++ b/velox/exec/GroupingSet.cpp @@ -224,7 +224,12 @@ void GroupingSet::addInputForActiveRows( TestValue::adjust( "facebook::velox::exec::GroupingSet::addInputForActiveRows", this); - table_->prepareForGroupProbe(*lookup_, input, activeRows_, ignoreNullKeys_); + table_->prepareForGroupProbe( + *lookup_, + input, + activeRows_, + ignoreNullKeys_, + BaseHashTable::kNoSpillInputStartPartitionBit); if (lookup_->rows.empty()) { // No rows to probe. Can happen when ignoreNullKeys_ is true and all rows // have null keys. diff --git a/velox/exec/HashBuild.cpp b/velox/exec/HashBuild.cpp index ce8a2f76ceb8..bfeb1cd6ff4e 100644 --- a/velox/exec/HashBuild.cpp +++ b/velox/exec/HashBuild.cpp @@ -823,7 +823,9 @@ bool HashBuild::finishHashBuild() { table_->prepareJoinTable( std::move(otherTables), allowParallelJoinBuild ? operatorCtx_->task()->queryCtx()->executor() - : nullptr); + : nullptr, + isInputFromSpill() ? spillConfig()->startPartitionBit + : BaseHashTable::kNoSpillInputStartPartitionBit); addRuntimeStats(); if (joinBridge_->setHashTable( std::move(table_), std::move(spillPartitions), joinHasNullKeys_)) { diff --git a/velox/exec/HashTable.cpp b/velox/exec/HashTable.cpp index ce89fe1d60dd..d09b3bc041f4 100644 --- a/velox/exec/HashTable.cpp +++ b/velox/exec/HashTable.cpp @@ -1607,7 +1607,8 @@ bool mayUseValueIds(const BaseHashTable& table) { template void HashTable::prepareJoinTable( std::vector> tables, - folly::Executor* executor) { + folly::Executor* executor, + int8_t spillInputStartPartitionBit) { buildExecutor_ = executor; otherTables_.reserve(tables.size()); for (auto& table : tables) { @@ -1650,6 +1651,7 @@ void HashTable::prepareJoinTable( } else { decideHashMode(0); } + checkHashBitsOverlap(spillInputStartPartitionBit); } template @@ -1982,7 +1984,9 @@ void BaseHashTable::prepareForGroupProbe( HashLookup& lookup, const RowVectorPtr& input, SelectivityVector& rows, - bool ignoreNullKeys) { + bool ignoreNullKeys, + int8_t spillInputStartPartitionBit) { + checkHashBitsOverlap(spillInputStartPartitionBit); auto& hashers = lookup.hashers; for (auto& hasher : hashers) { @@ -2015,7 +2019,8 @@ void BaseHashTable::prepareForGroupProbe( decideHashMode(input->size()); // Do not forward 'ignoreNullKeys' to avoid redundant evaluation of // deselectRowsWithNulls. - prepareForGroupProbe(lookup, input, rows, false); + prepareForGroupProbe( + lookup, input, rows, false, spillInputStartPartitionBit); return; } } diff --git a/velox/exec/HashTable.h b/velox/exec/HashTable.h index 5dc6e128934c..eec394caf599 100644 --- a/velox/exec/HashTable.h +++ b/velox/exec/HashTable.h @@ -121,6 +121,8 @@ class BaseHashTable { /// Specifies the hash mode of a table. enum class HashMode { kHash, kArray, kNormalizedKey }; + static constexpr int8_t kNoSpillInputStartPartitionBit = -1; + /// Returns the string of the given 'mode'. static std::string modeString(HashMode mode); @@ -181,7 +183,8 @@ class BaseHashTable { HashLookup& lookup, const RowVectorPtr& input, SelectivityVector& rows, - bool ignoreNullKeys); + bool ignoreNullKeys, + int8_t spillInputStartPartitionBit); /// Finds or creates a group for each key in 'lookup'. The keys are /// returned in 'lookup.hits'. @@ -248,7 +251,8 @@ class BaseHashTable { virtual void prepareJoinTable( std::vector> tables, - folly::Executor* executor = nullptr) = 0; + folly::Executor* executor = nullptr, + int8_t spillInputStartPartitionBit = kNoSpillInputStartPartitionBit) = 0; /// Returns the memory footprint in bytes for any data structures /// owned by 'this'. @@ -328,7 +332,12 @@ class BaseHashTable { /// Extracts a 7 bit tag from a hash number. The high bit is always set. static uint8_t hashTag(uint64_t hash) { - return static_cast(hash >> 32) | 0x80; + // This is likely all 0 for small key types (<= 32 bits). Not an issue + // because small types have a range that makes them normalized key cases. + // If there are multiple small type keys, they are mixed which makes them a + // 64 bit hash. Normalized keys are mixed before being used as hash + // numbers. + return static_cast(hash >> 38) | 0x80; } /// Loads a vector of tags for bulk comparison. Disables tsan errors @@ -365,6 +374,20 @@ class BaseHashTable { virtual void setHashMode(HashMode mode, int32_t numNew) = 0; + virtual int sizeBits() const = 0; + + // We don't want any overlap in the bit ranges used by bucket index and those + // used by spill partitioning; otherwise because we receive data from only one + // partition, the overlapped bits would be the same and only a fraction of the + // buckets would be used. This would cause the insertion taking very long + // time and block driver threads. + void checkHashBitsOverlap(int8_t spillInputStartPartitionBit) { + if (spillInputStartPartitionBit != kNoSpillInputStartPartitionBit && + hashMode() != HashMode::kArray) { + VELOX_CHECK_LE(sizeBits(), spillInputStartPartitionBit); + } + } + std::vector> hashers_; std::unique_ptr rows_; @@ -525,7 +548,9 @@ class HashTable : public BaseHashTable { // and VectorHashers and decides the hash mode and representation. void prepareJoinTable( std::vector> tables, - folly::Executor* executor = nullptr) override; + folly::Executor* executor = nullptr, + int8_t spillInputStartPartitionBit = + kNoSpillInputStartPartitionBit) override; uint64_t hashTableSizeIncrease(int32_t numNewDistinct) const override { if (numDistinct_ + numNewDistinct > rehashSize()) { @@ -587,10 +612,6 @@ class HashTable : public BaseHashTable { // occupy exactly two (64 bytes) cache lines. class Bucket { public: - Bucket() { - static_assert(sizeof(Bucket) == 128); - } - uint8_t tagAt(int32_t slotIndex) { return reinterpret_cast(&tags_)[slotIndex]; } @@ -622,6 +643,7 @@ class HashTable : public BaseHashTable { char padding_[16]; }; + static_assert(sizeof(Bucket) == 128); static constexpr uint64_t kBucketSize = sizeof(Bucket); // Returns the bucket at byte offset 'offset' from 'table_'. @@ -881,6 +903,10 @@ class HashTable : public BaseHashTable { } } + int sizeBits() const final { + return sizeBits_; + } + // The min table size in row to trigger parallel join table build. const uint32_t minTableSizeForParallelJoinBuild_; @@ -938,7 +964,7 @@ class HashTable : public BaseHashTable { // Executor for parallelizing hash join build. This may be the // executor for Drivers. If this executor is indefinitely taken by - // other work, the thread of prepareJoinTables() will sequentially + // other work, the thread of prepareJoinTable() will sequentially // execute the parallel build steps. folly::Executor* buildExecutor_{nullptr}; diff --git a/velox/exec/RowNumber.cpp b/velox/exec/RowNumber.cpp index f753cde0e38f..04f289c818ee 100644 --- a/velox/exec/RowNumber.cpp +++ b/velox/exec/RowNumber.cpp @@ -78,7 +78,12 @@ void RowNumber::addInput(RowVectorPtr input) { } SelectivityVector rows(numInput); - table_->prepareForGroupProbe(*lookup_, input, rows, false); + table_->prepareForGroupProbe( + *lookup_, + input, + rows, + false, + BaseHashTable::kNoSpillInputStartPartitionBit); table_->groupProbe(*lookup_); // Initialize new partitions with zeros. @@ -93,7 +98,8 @@ void RowNumber::addInput(RowVectorPtr input) { void RowNumber::addSpillInput() { const auto numInput = input_->size(); SelectivityVector rows(numInput); - table_->prepareForGroupProbe(*lookup_, input_, rows, false); + table_->prepareForGroupProbe( + *lookup_, input_, rows, false, spillConfig_->startPartitionBit); table_->groupProbe(*lookup_); // Initialize new partitions with zeros. @@ -157,7 +163,8 @@ void RowNumber::restoreNextSpillPartition() { const auto numInput = input->size(); SelectivityVector rows(numInput); - table_->prepareForGroupProbe(*lookup_, input, rows, false); + table_->prepareForGroupProbe( + *lookup_, input, rows, false, spillConfig_->startPartitionBit); table_->groupProbe(*lookup_); auto* counts = data->children().back()->as>(); diff --git a/velox/exec/TopNRowNumber.cpp b/velox/exec/TopNRowNumber.cpp index ceb9dc2131e3..5ad9184c0bd5 100644 --- a/velox/exec/TopNRowNumber.cpp +++ b/velox/exec/TopNRowNumber.cpp @@ -191,7 +191,12 @@ void TopNRowNumber::addInput(RowVectorPtr input) { ensureInputFits(input); SelectivityVector rows(numInput); - table_->prepareForGroupProbe(*lookup_, input, rows, false); + table_->prepareForGroupProbe( + *lookup_, + input, + rows, + false, + BaseHashTable::kNoSpillInputStartPartitionBit); table_->groupProbe(*lookup_); // Initialize new partitions. diff --git a/velox/exec/tests/HashJoinTest.cpp b/velox/exec/tests/HashJoinTest.cpp index 573c618989b4..08dc5b8f7604 100644 --- a/velox/exec/tests/HashJoinTest.cpp +++ b/velox/exec/tests/HashJoinTest.cpp @@ -5038,6 +5038,22 @@ TEST_F(HashJoinTest, spillFileSize) { } } +TEST_F(HashJoinTest, spillPartitionBitsOverlap) { + auto builder = + HashJoinBuilder(*pool_, duckDbQueryRunner_, driverExecutor_.get()) + .numDrivers(numDrivers_) + .keyTypes({BIGINT(), BIGINT()}) + .probeVectors(2'000, 3) + .buildVectors(2'000, 3) + .referenceQuery( + "SELECT t_k0, t_k1, t_data, u_k0, u_k1, u_data FROM t, u WHERE t_k0 = u_k0 and t_k1 = u_k1") + .config(core::QueryConfig::kSpillStartPartitionBit, "8") + .config(core::QueryConfig::kJoinSpillPartitionBits, "1") + .checkSpillStats(false) + .maxSpillLevel(0); + VELOX_ASSERT_THROW(builder.run(), "vs. 8"); +} + // The test is to verify if the hash build reservation has been released on // task error. DEBUG_ONLY_TEST_F(HashJoinTest, buildReservationReleaseCheck) { @@ -5242,6 +5258,7 @@ DEBUG_ONLY_TEST_F(HashJoinTest, reclaimDuringInputProcessing) { .spillDirectory(testData.spillEnabled ? tempDirectory->path : "") .referenceQuery( "SELECT t_k1, t_k2, t_v1, u_k1, u_k2, u_v1 FROM t, u WHERE t.t_k1 = u.u_k1") + .config(core::QueryConfig::kSpillStartPartitionBit, "29") .verifier([&](const std::shared_ptr& task, bool /*unused*/) { const auto statsPair = taskSpilledStats(*task); if (testData.expectedReclaimable) { @@ -5394,6 +5411,7 @@ DEBUG_ONLY_TEST_F(HashJoinTest, reclaimDuringReserve) { .spillDirectory(tempDirectory->path) .referenceQuery( "SELECT t_k1, t_k2, t_v1, u_k1, u_k2, u_v1 FROM t, u WHERE t.t_k1 = u.u_k1") + .config(core::QueryConfig::kSpillStartPartitionBit, "29") .verifier([&](const std::shared_ptr& task, bool /*unused*/) { const auto statsPair = taskSpilledStats(*task); ASSERT_GT(statsPair.first.spilledBytes, 0); @@ -5788,6 +5806,7 @@ DEBUG_ONLY_TEST_F(HashJoinTest, reclaimDuringWaitForProbe) { .spillDirectory(tempDirectory->path) .referenceQuery( "SELECT t_k1, t_k2, t_v1, u_k1, u_k2, u_v1 FROM t, u WHERE t.t_k1 = u.u_k1") + .config(core::QueryConfig::kSpillStartPartitionBit, "29") .verifier([&](const std::shared_ptr& task, bool /*unused*/) { const auto statsPair = taskSpilledStats(*task); ASSERT_GT(statsPair.first.spilledBytes, 0); @@ -6351,6 +6370,7 @@ TEST_F(HashJoinTest, exceededMaxSpillLevel) { .spillDirectory(tempDirectory->path) .referenceQuery( "SELECT t_k1, t_k2, t_v1, u_k1, u_k2, u_v1 FROM t, u WHERE t.t_k1 = u.u_k1") + .config(core::QueryConfig::kSpillStartPartitionBit, "29") .verifier([&](const std::shared_ptr& task, bool /*unused*/) { auto joinStats = task->taskStats() .pipelineStats.back() diff --git a/velox/exec/tests/utils/ArbitratorTestUtil.cpp b/velox/exec/tests/utils/ArbitratorTestUtil.cpp index aeb75584beab..62761c9f33d8 100644 --- a/velox/exec/tests/utils/ArbitratorTestUtil.cpp +++ b/velox/exec/tests/utils/ArbitratorTestUtil.cpp @@ -99,6 +99,7 @@ QueryTestResult runHashJoinTask( .spillDirectory(spillDirectory->path) .config(core::QueryConfig::kSpillEnabled, true) .config(core::QueryConfig::kJoinSpillEnabled, true) + .config(core::QueryConfig::kSpillStartPartitionBit, "29") .queryCtx(queryCtx) .maxDrivers(numDrivers) .copyResults(pool, result.task); diff --git a/velox/expression/SpecialFormRegistry.cpp b/velox/expression/SpecialFormRegistry.cpp index 616378ce4152..3957981905b3 100644 --- a/velox/expression/SpecialFormRegistry.cpp +++ b/velox/expression/SpecialFormRegistry.cpp @@ -29,8 +29,10 @@ SpecialFormRegistry& specialFormRegistryInternal() { void SpecialFormRegistry::registerFunctionCallToSpecialForm( const std::string& name, std::unique_ptr functionCallToSpecialForm) { - registry_.withWLock( - [&](auto& map) { map[name] = std::move(functionCallToSpecialForm); }); + const auto sanitizedName = sanitizeName(name); + registry_.withWLock([&](auto& map) { + map[sanitizedName] = std::move(functionCallToSpecialForm); + }); } void SpecialFormRegistry::unregisterAllFunctionCallToSpecialForm() { @@ -39,9 +41,10 @@ void SpecialFormRegistry::unregisterAllFunctionCallToSpecialForm() { FunctionCallToSpecialForm* FOLLY_NULLABLE SpecialFormRegistry::getSpecialForm(const std::string& name) const { + const auto sanitizedName = sanitizeName(name); FunctionCallToSpecialForm* specialForm = nullptr; registry_.withRLock([&](const auto& map) { - auto it = map.find(name); + auto it = map.find(sanitizedName); if (it != map.end()) { specialForm = it->second.get(); } diff --git a/velox/expression/tests/FunctionCallToSpecialFormTest.cpp b/velox/expression/tests/FunctionCallToSpecialFormTest.cpp index e3b72a513ec6..7c9bd3a59022 100644 --- a/velox/expression/tests/FunctionCallToSpecialFormTest.cpp +++ b/velox/expression/tests/FunctionCallToSpecialFormTest.cpp @@ -185,3 +185,48 @@ TEST_F(FunctionCallToSpecialFormTest, notASpecialForm) { config_); ASSERT_EQ(specialForm, nullptr); } + +class FunctionCallToSpecialFormSanitizeNameTest : public testing::Test, + public VectorTestBase { + protected: + static void SetUpTestCase() { + // This class does not pre-register the special forms. + memory::MemoryManager::testingSetInstance({}); + } +}; + +TEST_F(FunctionCallToSpecialFormSanitizeNameTest, sanitizeName) { + // Make sure no special forms are registered. + unregisterAllFunctionCallToSpecialForm(); + + ASSERT_FALSE(isFunctionCallToSpecialFormRegistered("and")); + ASSERT_FALSE(isFunctionCallToSpecialFormRegistered("AND")); + ASSERT_FALSE(isFunctionCallToSpecialFormRegistered("or")); + ASSERT_FALSE(isFunctionCallToSpecialFormRegistered("OR")); + + registerFunctionCallToSpecialForm( + "and", std::make_unique(true /* isAnd */)); + registerFunctionCallToSpecialForm( + "OR", std::make_unique(false /* isAnd */)); + + auto testLookup = [this](const std::string& name) { + auto type = resolveTypeForSpecialForm(name, {BOOLEAN(), BOOLEAN()}); + ASSERT_EQ(type, BOOLEAN()); + + auto specialForm = constructSpecialForm( + name, + BOOLEAN(), + {std::make_shared( + vectorMaker_.constantVector({true})), + std::make_shared( + vectorMaker_.constantVector({false}))}, + false, + core::QueryConfig{{}}); + ASSERT_EQ(typeid(*specialForm), typeid(const ConjunctExpr&)); + }; + + testLookup("and"); + testLookup("AND"); + testLookup("or"); + testLookup("OR"); +} diff --git a/velox/functions/lib/CMakeLists.txt b/velox/functions/lib/CMakeLists.txt index 85fa8d64c5ec..fe1bdb405ff9 100644 --- a/velox/functions/lib/CMakeLists.txt +++ b/velox/functions/lib/CMakeLists.txt @@ -28,6 +28,7 @@ add_library( KllSketch.cpp MapConcat.cpp Re2Functions.cpp + Repeat.cpp StringEncodingUtils.cpp SubscriptUtil.cpp CheckNestedNulls.cpp diff --git a/velox/functions/prestosql/Repeat.cpp b/velox/functions/lib/Repeat.cpp similarity index 77% rename from velox/functions/prestosql/Repeat.cpp rename to velox/functions/lib/Repeat.cpp index 39993b6b1f1e..885f13b6b43a 100644 --- a/velox/functions/prestosql/Repeat.cpp +++ b/velox/functions/lib/Repeat.cpp @@ -18,10 +18,14 @@ namespace facebook::velox::functions { namespace { - // See documentation at https://prestodb.io/docs/current/functions/array.html class RepeatFunction : public exec::VectorFunction { public: + // @param allowNegativeCount If true, negative 'count' is allowed + // and treated the same as zero (Spark's behavior). + explicit RepeatFunction(bool allowNegativeCount) + : allowNegativeCount_(allowNegativeCount) {} + static constexpr int32_t kMaxResultEntries = 10'000; bool isDefaultNullBehavior() const override { @@ -37,29 +41,36 @@ class RepeatFunction : public exec::VectorFunction { VectorPtr& result) const override { VectorPtr localResult; if (args[1]->isConstantEncoding()) { - localResult = applyConstant(rows, args, outputType, context); + localResult = applyConstantCount(rows, args, outputType, context); if (localResult == nullptr) { return; } } else { - localResult = applyFlat(rows, args, outputType, context); + localResult = applyNonConstantCount(rows, args, outputType, context); } context.moveOrCopyResult(localResult, rows, result); } private: - static void checkCount(const int32_t count) { - VELOX_USER_CHECK_GE( - count, - 0, - "Count argument of repeat function must be greater than or equal to 0"); + // Check count to make sure it is in valid range. + static int32_t checkCount(int32_t count, bool allowNegativeCount) { + if (count < 0) { + if (allowNegativeCount) { + return 0; + } + VELOX_USER_FAIL( + "({} vs. {}) Count argument of repeat function must be greater than or equal to 0", + count, + 0); + } VELOX_USER_CHECK_LE( count, kMaxResultEntries, "Count argument of repeat function must be less than or equal to 10000"); + return count; } - VectorPtr applyConstant( + VectorPtr applyConstantCount( const SelectivityVector& rows, std::vector& args, const TypePtr& outputType, @@ -73,14 +84,13 @@ class RepeatFunction : public exec::VectorFunction { return BaseVector::createNullConstant(outputType, numRows, pool); } - const auto count = constantCount->valueAt(0); + auto count = constantCount->valueAt(0); try { - checkCount(count); + count = checkCount(count, allowNegativeCount_); } catch (const VeloxUserError&) { context.setErrors(rows, std::current_exception()); return nullptr; } - const auto totalCount = count * numRows; // Allocate new vectors for indices, lengths and offsets. @@ -109,7 +119,7 @@ class RepeatFunction : public exec::VectorFunction { BaseVector::wrapInDictionary(nullptr, indices, totalCount, args[0])); } - VectorPtr applyFlat( + VectorPtr applyNonConstantCount( const SelectivityVector& rows, std::vector& args, const TypePtr& outputType, @@ -120,7 +130,7 @@ class RepeatFunction : public exec::VectorFunction { context.applyToSelectedNoThrow(rows, [&](auto row) { auto count = countDecoded->isNullAt(row) ? 0 : countDecoded->valueAt(row); - checkCount(count); + count = checkCount(count, allowNegativeCount_); totalCount += count; }); @@ -156,6 +166,9 @@ class RepeatFunction : public exec::VectorFunction { return; } auto count = countDecoded->valueAt(row); + if (count < 0) { + count = 0; + } rawSizes[row] = count; rawOffsets[row] = offset; std::fill(rawIndices + offset, rawIndices + offset + count, row); @@ -171,9 +184,12 @@ class RepeatFunction : public exec::VectorFunction { sizes, BaseVector::wrapInDictionary(nullptr, indices, totalCount, args[0])); } + + const bool allowNegativeCount_; }; +} // namespace -static std::vector> signatures() { +std::vector> repeatSignatures() { // T, integer -> array(T) return {exec::FunctionSignatureBuilder() .typeVariable("T") @@ -182,11 +198,19 @@ static std::vector> signatures() { .argumentType("integer") .build()}; } -} // namespace -VELOX_DECLARE_VECTOR_FUNCTION( - udf_repeat, - signatures(), - std::make_unique()); +std::shared_ptr makeRepeat( + const std::string& /* name */, + const std::vector& /* inputArgs */, + const core::QueryConfig& /*config*/) { + return std::make_unique(false); +} + +std::shared_ptr makeRepeatAllowNegativeCount( + const std::string& /* name */, + const std::vector& /* inputArgs */, + const core::QueryConfig& /*config*/) { + return std::make_unique(true); +} } // namespace facebook::velox::functions diff --git a/velox/functions/lib/Repeat.h b/velox/functions/lib/Repeat.h new file mode 100644 index 000000000000..2721ede1e9a8 --- /dev/null +++ b/velox/functions/lib/Repeat.h @@ -0,0 +1,37 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include + +#include "velox/expression/VectorFunction.h" + +namespace facebook::velox::functions { + +std::vector> repeatSignatures(); + +// Does not allow negative count. +std::shared_ptr makeRepeat( + const std::string& name, + const std::vector& inputArgs, + const core::QueryConfig& config); + +// Allows negative count (Spark's behavior). +std::shared_ptr makeRepeatAllowNegativeCount( + const std::string& name, + const std::vector& inputArgs, + const core::QueryConfig& config); +} // namespace facebook::velox::functions diff --git a/velox/functions/lib/tests/CMakeLists.txt b/velox/functions/lib/tests/CMakeLists.txt index 424aef6daf47..8427d4c34044 100644 --- a/velox/functions/lib/tests/CMakeLists.txt +++ b/velox/functions/lib/tests/CMakeLists.txt @@ -20,6 +20,7 @@ add_executable( KllSketchTest.cpp MapConcatTest.cpp Re2FunctionsTest.cpp + RepeatTest.cpp ZetaDistributionTest.cpp CheckNestedNullsTest.cpp) diff --git a/velox/functions/prestosql/tests/RepeatTest.cpp b/velox/functions/lib/tests/RepeatTest.cpp similarity index 75% rename from velox/functions/prestosql/tests/RepeatTest.cpp rename to velox/functions/lib/tests/RepeatTest.cpp index 95275c8c7e4f..0969f8e95c06 100644 --- a/velox/functions/prestosql/tests/RepeatTest.cpp +++ b/velox/functions/lib/tests/RepeatTest.cpp @@ -14,17 +14,27 @@ * limitations under the License. */ +#include "velox/functions/lib/Repeat.h" #include "velox/common/base/tests/GTestUtils.h" #include "velox/functions/prestosql/tests/utils/FunctionBaseTest.h" -using namespace facebook::velox; using namespace facebook::velox::test; -using namespace facebook::velox::functions::test; +namespace facebook::velox::functions { namespace { -class RepeatTest : public FunctionBaseTest { +class RepeatTest : public functions::test::FunctionBaseTest { protected: + static void SetUpTestCase() { + FunctionBaseTest::SetUpTestCase(); + exec::registerStatefulVectorFunction( + "repeat", functions::repeatSignatures(), functions::makeRepeat); + exec::registerStatefulVectorFunction( + "repeat_allow_negative_count", + functions::repeatSignatures(), + functions::makeRepeatAllowNegativeCount); + } + void testExpression( const std::string& expression, const std::vector& input, @@ -41,7 +51,6 @@ class RepeatTest : public FunctionBaseTest { evaluate(expression, makeRowVector(input)), expectedError); } }; -} // namespace TEST_F(RepeatTest, repeat) { const auto elementVector = makeNullableFlatVector( @@ -124,3 +133,34 @@ TEST_F(RepeatTest, repeatWithInvalidCount) { {elementVector}, "(10001 vs. 10000) Count argument of repeat function must be less than or equal to 10000"); } + +TEST_F(RepeatTest, repeatAllowNegativeCount) { + const auto elementVector = makeNullableFlatVector( + {0.0, -2.0, 3.333333, 4.0004, std::nullopt, 5.12345}); + auto expected = makeArrayVector({{}, {}, {}, {}, {}, {}}); + + // Test negative count. + auto countVector = + makeNullableFlatVector({-1, -2, -3, -5, -10, -100}); + testExpression( + "repeat_allow_negative_count(C0, C1)", + {elementVector, countVector}, + expected); + + // Test using a constant as the count argument. + testExpression( + "repeat_allow_negative_count(C0, '-5'::INTEGER)", + {elementVector}, + expected); + + // Test mixed case. + expected = makeArrayVector( + {{0.0}, {-2.0, -2.0}, {}, {}, {}, {5.12345, 5.12345, 5.12345}}); + countVector = makeNullableFlatVector({1, 2, -1, 0, -10, 3}); + testExpression( + "repeat_allow_negative_count(C0, C1)", + {elementVector, countVector}, + expected); +} +} // namespace +} // namespace facebook::velox::functions diff --git a/velox/functions/prestosql/CMakeLists.txt b/velox/functions/prestosql/CMakeLists.txt index d2a17ca1c32a..fcb4d97e3e22 100644 --- a/velox/functions/prestosql/CMakeLists.txt +++ b/velox/functions/prestosql/CMakeLists.txt @@ -44,7 +44,6 @@ add_library( MapZipWith.cpp Not.cpp Reduce.cpp - Repeat.cpp Reverse.cpp RowFunction.cpp Sequence.cpp diff --git a/velox/functions/prestosql/registration/ArrayFunctionsRegistration.cpp b/velox/functions/prestosql/registration/ArrayFunctionsRegistration.cpp index 46c7400ce8e3..7524a274ce7e 100644 --- a/velox/functions/prestosql/registration/ArrayFunctionsRegistration.cpp +++ b/velox/functions/prestosql/registration/ArrayFunctionsRegistration.cpp @@ -17,6 +17,7 @@ #include #include "velox/functions/Registerer.h" +#include "velox/functions/lib/Repeat.h" #include "velox/functions/prestosql/ArrayConstructor.h" #include "velox/functions/prestosql/ArrayFunctions.h" #include "velox/functions/prestosql/ArraySort.h" @@ -144,7 +145,8 @@ void registerArrayFunctions(const std::string& prefix) { }); VELOX_REGISTER_VECTOR_FUNCTION(udf_array_sum, prefix + "array_sum"); - VELOX_REGISTER_VECTOR_FUNCTION(udf_repeat, prefix + "repeat"); + exec::registerStatefulVectorFunction( + prefix + "repeat", repeatSignatures(), makeRepeat); VELOX_REGISTER_VECTOR_FUNCTION(udf_sequence, prefix + "sequence"); exec::registerStatefulVectorFunction( diff --git a/velox/functions/prestosql/tests/CMakeLists.txt b/velox/functions/prestosql/tests/CMakeLists.txt index e893aad1d98a..4ba742ff61c8 100644 --- a/velox/functions/prestosql/tests/CMakeLists.txt +++ b/velox/functions/prestosql/tests/CMakeLists.txt @@ -76,7 +76,6 @@ add_executable( RandTest.cpp ReduceTest.cpp RegexpReplaceTest.cpp - RepeatTest.cpp ReverseTest.cpp RoundTest.cpp ScalarFunctionRegTest.cpp diff --git a/velox/functions/sparksql/Register.cpp b/velox/functions/sparksql/Register.cpp index 5c67a54fa01d..1136eae75f06 100644 --- a/velox/functions/sparksql/Register.cpp +++ b/velox/functions/sparksql/Register.cpp @@ -21,6 +21,7 @@ #include "velox/functions/lib/IsNull.h" #include "velox/functions/lib/Re2Functions.h" #include "velox/functions/lib/RegistrationHelpers.h" +#include "velox/functions/lib/Repeat.h" #include "velox/functions/prestosql/DateTimeFunctions.h" #include "velox/functions/prestosql/JsonFunctions.h" #include "velox/functions/prestosql/StringFunctions.h" @@ -257,6 +258,11 @@ void registerFunctions(const std::string& prefix) { exec::registerStatefulVectorFunction( prefix + "sort_array", sortArraySignatures(), makeSortArray); + exec::registerStatefulVectorFunction( + prefix + "array_repeat", + repeatSignatures(), + makeRepeatAllowNegativeCount); + // Register date functions. registerFunction({prefix + "year"}); registerFunction({prefix + "year"}); diff --git a/velox/vector/arrow/Bridge.cpp b/velox/vector/arrow/Bridge.cpp index 601cbc08e847..6522ab9f18fc 100644 --- a/velox/vector/arrow/Bridge.cpp +++ b/velox/vector/arrow/Bridge.cpp @@ -248,6 +248,8 @@ const char* exportArrowFormatStr( return "u"; // utf-8 string case TypeKind::VARBINARY: return "z"; // binary + case TypeKind::UNKNOWN: + return "n"; // NullType case TypeKind::TIMESTAMP: return "ttn"; // time64 [nanoseconds] @@ -598,6 +600,7 @@ void exportFlat( case TypeKind::REAL: case TypeKind::DOUBLE: case TypeKind::TIMESTAMP: + case TypeKind::UNKNOWN: exportValues(vec, rows, out, pool, holder); break; case TypeKind::VARCHAR: @@ -940,6 +943,8 @@ TypePtr importFromArrowImpl( return REAL(); case 'g': return DOUBLE(); + case 'n': + return UNKNOWN(); // Map both utf-8 and large utf-8 string to varchar. case 'u': diff --git a/velox/vector/arrow/tests/ArrowBridgeSchemaTest.cpp b/velox/vector/arrow/tests/ArrowBridgeSchemaTest.cpp index 8def65ce8e8e..a880d93f1a97 100644 --- a/velox/vector/arrow/tests/ArrowBridgeSchemaTest.cpp +++ b/velox/vector/arrow/tests/ArrowBridgeSchemaTest.cpp @@ -195,6 +195,8 @@ TEST_F(ArrowBridgeSchemaExportTest, scalar) { testScalarType(DECIMAL(10, 4), "d:10,4"); testScalarType(DECIMAL(20, 15), "d:20,15"); + + testScalarType(UNKNOWN(), "n"); } TEST_F(ArrowBridgeSchemaExportTest, nested) { @@ -238,24 +240,14 @@ TEST_F(ArrowBridgeSchemaExportTest, constant) { testConstant(DOUBLE(), "g"); testConstant(VARCHAR(), "u"); testConstant(DATE(), "tdD"); + testConstant(UNKNOWN(), "n"); testConstant(ARRAY(INTEGER()), "+l"); + testConstant(ARRAY(UNKNOWN()), "+l"); testConstant(MAP(BOOLEAN(), REAL()), "+m"); + testConstant(MAP(UNKNOWN(), REAL()), "+m"); testConstant(ROW({TIMESTAMP(), DOUBLE()}), "+s"); -} - -TEST_F(ArrowBridgeSchemaExportTest, unsupported) { - // Try some combination of unsupported types to ensure there's no crash or - // memory leak in failure scenarios. - EXPECT_THROW(testScalarType(UNKNOWN(), ""), VeloxException); - - EXPECT_THROW(testScalarType(ARRAY(UNKNOWN()), ""), VeloxException); - EXPECT_THROW(testScalarType(MAP(UNKNOWN(), INTEGER()), ""), VeloxException); - EXPECT_THROW(testScalarType(MAP(BIGINT(), UNKNOWN()), ""), VeloxException); - - EXPECT_THROW(testScalarType(ROW({BIGINT(), UNKNOWN()}), ""), VeloxException); - EXPECT_THROW( - testScalarType(ROW({BIGINT(), REAL(), UNKNOWN()}), ""), VeloxException); + testConstant(ROW({UNKNOWN(), UNKNOWN()}), "+s"); } class ArrowBridgeSchemaImportTest : public ArrowBridgeSchemaExportTest { @@ -395,7 +387,6 @@ TEST_F(ArrowBridgeSchemaImportTest, complexTypes) { } TEST_F(ArrowBridgeSchemaImportTest, unsupported) { - EXPECT_THROW(testSchemaImport("n"), VeloxUserError); EXPECT_THROW(testSchemaImport("C"), VeloxUserError); EXPECT_THROW(testSchemaImport("S"), VeloxUserError); EXPECT_THROW(testSchemaImport("I"), VeloxUserError);