diff --git a/velox/connectors/hive/HiveConnectorUtil.cpp b/velox/connectors/hive/HiveConnectorUtil.cpp index 4fe174ed7949..464d7560691a 100644 --- a/velox/connectors/hive/HiveConnectorUtil.cpp +++ b/velox/connectors/hive/HiveConnectorUtil.cpp @@ -377,7 +377,8 @@ std::shared_ptr makeScanSpec( } std::unique_ptr parseSerdeParameters( - const std::unordered_map& serdeParameters) { + const std::unordered_map& serdeParameters, + const std::unordered_map& tableParameters) { auto fieldIt = serdeParameters.find(dwio::common::SerDeOptions::kFieldDelim); if (fieldIt == serdeParameters.end()) { fieldIt = serdeParameters.find("serialization.format"); @@ -393,9 +394,13 @@ std::unique_ptr parseSerdeParameters( auto mapKeyIt = serdeParameters.find(dwio::common::SerDeOptions::kMapKeyDelim); + auto nullStringIt = tableParameters.find( + dwio::common::TableParameter::kSerializationNullFormat); + if (fieldIt == serdeParameters.end() && collectionIt == serdeParameters.end() && - mapKeyIt == serdeParameters.end()) { + mapKeyIt == serdeParameters.end() && + nullStringIt == tableParameters.end()) { return nullptr; } @@ -413,6 +418,7 @@ std::unique_ptr parseSerdeParameters( } auto serDeOptions = std::make_unique( fieldDelim, collectionDelim, mapKeyDelim); + serDeOptions->nullString = nullStringIt->second; return serDeOptions; } @@ -420,15 +426,15 @@ void configureReaderOptions( dwio::common::ReaderOptions& readerOptions, const std::shared_ptr& hiveConfig, const Config* sessionProperties, - const RowTypePtr& fileSchema, - std::shared_ptr hiveSplit) { + const std::shared_ptr& hiveTableHandle, + const std::shared_ptr& hiveSplit) { readerOptions.setMaxCoalesceBytes(hiveConfig->maxCoalescedBytes()); readerOptions.setMaxCoalesceDistance(hiveConfig->maxCoalescedDistanceBytes()); readerOptions.setFileColumnNamesReadAsLowerCase( hiveConfig->isFileColumnNamesReadAsLowerCase(sessionProperties)); readerOptions.setUseColumnNamesForColumnMapping( hiveConfig->isOrcUseColumnNames(sessionProperties)); - readerOptions.setFileSchema(fileSchema); + readerOptions.setFileSchema(hiveTableHandle->dataColumns()); readerOptions.setFooterEstimatedSize(hiveConfig->footerEstimatedSize()); readerOptions.setFilePreloadThreshold(hiveConfig->filePreloadThreshold()); @@ -439,7 +445,8 @@ void configureReaderOptions( dwio::common::toString(readerOptions.getFileFormat()), dwio::common::toString(hiveSplit->fileFormat)); } else { - auto serDeOptions = parseSerdeParameters(hiveSplit->serdeParameters); + auto serDeOptions = parseSerdeParameters( + hiveSplit->serdeParameters, hiveTableHandle->tableParameters()); if (serDeOptions) { readerOptions.setSerDeOptions(*serDeOptions); } diff --git a/velox/connectors/hive/HiveConnectorUtil.h b/velox/connectors/hive/HiveConnectorUtil.h index 51335f09e76a..67426bef78ca 100644 --- a/velox/connectors/hive/HiveConnectorUtil.h +++ b/velox/connectors/hive/HiveConnectorUtil.h @@ -26,6 +26,7 @@ namespace facebook::velox::connector::hive { class HiveColumnHandle; +class HiveTableHandle; class HiveConfig; struct HiveConnectorSplit; @@ -57,8 +58,8 @@ void configureReaderOptions( dwio::common::ReaderOptions& readerOptions, const std::shared_ptr& config, const Config* sessionProperties, - const RowTypePtr& fileSchema, - std::shared_ptr hiveSplit); + const std::shared_ptr& hiveTableHandle, + const std::shared_ptr& hiveSplit); void configureRowReaderOptions( dwio::common::RowReaderOptions& rowReaderOptions, diff --git a/velox/connectors/hive/SplitReader.cpp b/velox/connectors/hive/SplitReader.cpp index b6cce9860087..92376e566d38 100644 --- a/velox/connectors/hive/SplitReader.cpp +++ b/velox/connectors/hive/SplitReader.cpp @@ -82,7 +82,7 @@ void SplitReader::configureReaderOptions() { baseReaderOpts_, hiveConfig_, connectorQueryCtx_->sessionProperties(), - hiveTableHandle_->dataColumns(), + hiveTableHandle_, hiveSplit_); } diff --git a/velox/dwio/common/Options.h b/velox/dwio/common/Options.h index 690af80e727b..c154f8f4a6e0 100644 --- a/velox/dwio/common/Options.h +++ b/velox/dwio/common/Options.h @@ -94,6 +94,8 @@ class SerDeOptions { struct TableParameter { static constexpr const char* kSkipHeaderLineCount = "skip.header.line.count"; + static constexpr const char* kSerializationNullFormat = + "serialization.null.format"; }; /**