Skip to content

Commit

Permalink
add opaque type support to PrestoSerializer (facebookincubator#11256)
Browse files Browse the repository at this point in the history
Summary:

The idea of opaque types in Presto serializer is that we'll serialize the underlying opaque type to StringView. This requires a serde callbacks to be registered via

  OpaqueType::registerSerialization();

then inside `PrestoSerializer.cpp` we use these to write and read values of type opaque.

Differential Revision: D64362525
  • Loading branch information
Guilherme Kunigami authored and facebook-github-bot committed Oct 29, 2024
1 parent b00268f commit bf67077
Show file tree
Hide file tree
Showing 2 changed files with 158 additions and 4 deletions.
121 changes: 117 additions & 4 deletions velox/serializers/PrestoSerializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,9 @@ std::string_view typeToEncodingName(const TypePtr& type) {
return kRow;
case TypeKind::UNKNOWN:
return kByteArray;
case TypeKind::OPAQUE:
return kVariableWidth;

default:
VELOX_FAIL("Unknown type kind: {}", static_cast<int>(type->kind()));
}
Expand Down Expand Up @@ -625,6 +628,65 @@ void read<StringView>(
}
}

template <>
void read<OpaqueType>(
ByteInputStream* source,
const TypePtr& type,
vector_size_t resultOffset,
const uint64_t* incomingNulls,
int32_t numIncomingNulls,
memory::MemoryPool* pool,
const SerdeOpts& opts,
VectorPtr& result) {
// Opaque values are serialized by first converting them to string
// them serializing them as if they were string. The deserializable
// does the reverse operation.

const int32_t size = source->read<int32_t>();
const int32_t numNewValues = sizeWithIncomingNulls(size, numIncomingNulls);

result->resize(resultOffset + numNewValues);

auto opaqueType = std::dynamic_pointer_cast<const OpaqueType>(type);
auto deserialization = opaqueType->getDeserializeFunc();

auto flatResult = result->as<FlatVector<std::shared_ptr<void>>>();
BufferPtr values = flatResult->mutableValues(resultOffset + size);

auto rawValues = values->asMutable<std::shared_ptr<void>>();
std::vector<int32_t> offsets;
int32_t lastOffset = 0;
for (int32_t i = 0; i < numNewValues; ++i) {
// Set the first int32_t of each StringView to be the offset.
if (incomingNulls && bits::isBitNull(incomingNulls, i)) {
offsets.push_back(lastOffset);
continue;
}
lastOffset = source->read<int32_t>();
offsets.push_back(lastOffset);
}
readNulls(
source, size, resultOffset, incomingNulls, numIncomingNulls, *flatResult);

const int32_t dataSize = source->read<int32_t>();
if (dataSize == 0) {
return;
}

int64_t rawStringSizeBytes = dataSize * sizeof(char);
char* rawString = (char*)pool->allocate(rawStringSizeBytes);
source->readBytes(rawString, dataSize);
int32_t previousOffset = 0;
for (int32_t i = 0; i < numNewValues; ++i) {
int32_t offset = offsets[i];
auto sv = StringView(rawString + previousOffset, offset - previousOffset);
auto opaqueValue = deserialization(sv);
rawValues[resultOffset + i] = opaqueValue;
previousOffset = offset;
}
pool->free(rawString, rawStringSizeBytes);
}

void readColumns(
ByteInputStream* source,
const std::vector<TypePtr>& types,
Expand Down Expand Up @@ -1031,6 +1093,7 @@ void readColumns(
{TypeKind::TIMESTAMP, &read<Timestamp>},
{TypeKind::VARCHAR, &read<StringView>},
{TypeKind::VARBINARY, &read<StringView>},
{TypeKind::OPAQUE, &read<OpaqueType>},
{TypeKind::ARRAY, &readArrayVector},
{TypeKind::MAP, &readMapVector},
{TypeKind::ROW, &readRowVector},
Expand Down Expand Up @@ -1682,6 +1745,7 @@ class VectorStream {

case TypeKind::VARCHAR:
case TypeKind::VARBINARY:
case TypeKind::OPAQUE:
writeInt32(out, nullCount_ + nonNullCount_);
lengths_.flush(out);
flushNulls(out);
Expand Down Expand Up @@ -1764,6 +1828,8 @@ class VectorStream {
lengths_.startWrite(sizeof(vector_size_t));
lengths_.appendOne<int32_t>(0);
break;
case TypeKind::OPAQUE:
[[fallthrough]];
case TypeKind::VARCHAR:
[[fallthrough]];
case TypeKind::VARBINARY:
Expand Down Expand Up @@ -1935,6 +2001,39 @@ void serializeFlatVector<TypeKind::BOOLEAN>(
}
}

template <>
void serializeFlatVector<TypeKind::OPAQUE>(
const VectorPtr& vector,
const folly::Range<const IndexRange*>& ranges,
VectorStream* stream) {
using T = typename TypeTraits<TypeKind::OPAQUE>::NativeType;
auto* flatVector = vector->as<FlatVector<T>>();
VELOX_CHECK_NOT_NULL(flatVector, "Should cast to FlatVector properly");

auto opaqueType =
std::dynamic_pointer_cast<const OpaqueType>(flatVector->type());
auto serialization = opaqueType->getSerializeFunc();
auto* rawValues = flatVector->rawValues();

// Do not handle the case !flatVector->mayHaveNulls() in a special way like
// we do for say TypeKind::VARCHAR, because we need to traverse each element
// anyway to serialize them.
for (int32_t i = 0; i < ranges.size(); ++i) {
const int32_t end = ranges[i].begin + ranges[i].size;
for (int32_t offset = ranges[i].begin; offset < end; ++offset) {
if (flatVector->isNullAt(offset)) {
stream->appendNull();
continue;
}
stream->appendNonNull();
std::shared_ptr<void> ptr = rawValues[offset];
auto serialized = serialization(rawValues[offset]);
const auto view = StringView(serialized);
stream->appendOne(view);
}
}
}

void serializeColumn(
const VectorPtr& vector,
const folly::Range<const IndexRange*>& ranges,
Expand Down Expand Up @@ -2201,10 +2300,24 @@ void serializeConstantVectorImpl(
return;
}

const T value = constVector->valueAtFast(0);
for (int32_t i = 0; i < count; ++i) {
stream->appendNonNull();
stream->appendOne(value);
if constexpr (std::is_same_v<T, std::shared_ptr<void>>) {
auto opaqueType =
std::dynamic_pointer_cast<const OpaqueType>(constVector->type());
auto serialization = opaqueType->getSerializeFunc();
T valueOpaque = constVector->valueAtFast(0);

std::string serialized = serialization(valueOpaque);
const auto value = StringView(serialized);
for (int32_t i = 0; i < count; ++i) {
stream->appendNonNull();
stream->appendOne(value);
}
} else {
T value = constVector->valueAtFast(0);
for (int32_t i = 0; i < count; ++i) {
stream->appendNonNull();
stream->appendOne(value);
}
}
}

Expand Down
41 changes: 41 additions & 0 deletions velox/serializers/tests/PrestoSerializerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* limitations under the License.
*/
#include "velox/serializers/PrestoSerializer.h"
#include <boost/random/uniform_int_distribution.hpp>
#include <folly/Random.h>
#include <gtest/gtest.h>
#include <vector>
Expand Down Expand Up @@ -1511,6 +1512,39 @@ INSTANTIATE_TEST_SUITE_P(
common::CompressionKind::CompressionKind_LZ4,
common::CompressionKind::CompressionKind_GZIP));

class Foo {
public:
explicit Foo(int64_t id) : id_(id) {}

int64_t id() const {
return id_;
}

static std::shared_ptr<Foo> create(int64_t id) {
// Return the same instance if the id is already in the map, to make
// the operator== work with the instance before and after serde.
if (instances_.contains(id)) {
return instances_[id];
}
instances_[id] = std::make_shared<Foo>(id);
return instances_[id];
}

static std::string serialize(const std::shared_ptr<Foo>& foo) {
return std::to_string(foo->id_);
}

static std::shared_ptr<Foo> deserialize(const std::string& serialized) {
return create(std::stoi(serialized));
}

private:
int64_t id_;
static std::unordered_map<int64_t, std::shared_ptr<Foo>> instances_;
};

std::unordered_map<int64_t, std::shared_ptr<Foo>> Foo::instances_;

TEST_F(PrestoSerializerTest, deserializeSingleColumn) {
// Verify that deserializeSingleColumn API can handle all supported types.
static const size_t kPrestoPageHeaderBytes = 21;
Expand Down Expand Up @@ -1555,6 +1589,7 @@ TEST_F(PrestoSerializerTest, deserializeSingleColumn) {
DOUBLE(),
VARCHAR(),
TIMESTAMP(),
OPAQUE<Foo>(),
ROW({VARCHAR(), INTEGER()}),
ARRAY(INTEGER()),
ARRAY(INTEGER()),
Expand All @@ -1578,6 +1613,12 @@ TEST_F(PrestoSerializerTest, deserializeSingleColumn) {
LOG(ERROR) << "Seed: " << seed;
SCOPED_TRACE(fmt::format("seed: {}", seed));
VectorFuzzer fuzzer(opts, pool_.get(), seed);
fuzzer.registerOpaqueTypeGenerator<Foo>([](FuzzerGenerator& rng) {
int64_t id = boost::random::uniform_int_distribution<int64_t>(1, 10)(rng);
return Foo::create(id);
});
OpaqueType::registerSerialization<Foo>(
"Foo", Foo::serialize, Foo::deserialize);

for (const auto& type : typesToTest) {
SCOPED_TRACE(fmt::format("Type: {}", type->toString()));
Expand Down

0 comments on commit bf67077

Please sign in to comment.