Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move buffer growth policy to separate module #84

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions dwio/nimble/velox/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ add_library(nimble_velox_schema_builder SchemaBuilder.cpp)
target_link_libraries(nimble_velox_schema_builder nimble_velox_schema_reader
nimble_velox_schema nimble_common Folly::folly)

add_library(nimble_velox_stream_data StreamData.cpp)
target_link_libraries(nimble_velox_stream_data nimble_velox_schema_builder
nimble_common)

add_library(nimble_velox_field_reader FieldReader.cpp)
target_link_libraries(
nimble_velox_field_reader nimble_velox_schema_reader nimble_common
Expand All @@ -36,10 +40,13 @@ add_library(nimble_velox_layout_planner LayoutPlanner.cpp)
target_link_libraries(nimble_velox_layout_planner nimble_velox_schema_reader
velox_file)

add_library(nimble_velox_field_writer BufferGrowthPolicy.cpp
DeduplicationUtils.cpp FieldWriter.cpp)
target_link_libraries(nimble_velox_field_writer nimble_velox_schema
nimble_velox_schema_builder Folly::folly)
add_library(nimble_velox_buffer_growth_policy BufferGrowthPolicy.cpp)
target_link_libraries(nimble_velox_buffer_growth_policy nimble_common)

add_library(nimble_velox_field_writer DeduplicationUtils.cpp FieldWriter.cpp)
target_link_libraries(
nimble_velox_field_writer nimble_velox_schema nimble_velox_stream_data
nimble_velox_buffer_growth_policy nimble_velox_schema_builder Folly::folly)

# Nimble code expects an upper case suffix to the generated file.
list(PREPEND FLATBUFFERS_FLATC_SCHEMA_EXTRA_ARGS "--filename-suffix"
Expand Down
20 changes: 0 additions & 20 deletions dwio/nimble/velox/FieldWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1278,26 +1278,6 @@ std::unique_ptr<FieldWriter> createArrayWithOffsetsFieldWriter(

} // namespace

void NullsStreamData::ensureNullsCapacity(
bool mayHaveNulls,
velox::vector_size_t size) {
if (mayHaveNulls || hasNulls_) {
auto newSize = bufferedCount_ + size;
nonNulls_.reserve(newSize);
if (!hasNulls_) {
hasNulls_ = true;
std::fill(nonNulls_.data(), nonNulls_.data() + bufferedCount_, true);
nonNulls_.update_size(bufferedCount_);
}
if (!mayHaveNulls) {
std::fill(
nonNulls_.data() + bufferedCount_, nonNulls_.data() + newSize, true);
nonNulls_.update_size(newSize);
}
}
bufferedCount_ += size;
}

FieldWriterContext::LocalDecodedVector
FieldWriterContext::getLocalDecodedVector() {
return LocalDecodedVector{*this};
Expand Down
189 changes: 1 addition & 188 deletions dwio/nimble/velox/FieldWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,199 +21,12 @@
#include "dwio/nimble/velox/BufferGrowthPolicy.h"
#include "dwio/nimble/velox/OrderedRanges.h"
#include "dwio/nimble/velox/SchemaBuilder.h"
#include "dwio/nimble/velox/StreamData.h"
#include "velox/dwio/common/TypeWithId.h"
#include "velox/vector/DecodedVector.h"

namespace facebook::nimble {

// Stream data is a generic interface representing a stream of data, allowing
// generic access to the content to be used by writers
class StreamData {
public:
explicit StreamData(const StreamDescriptorBuilder& descriptor)
: descriptor_{descriptor} {}

StreamData(const StreamData&) = delete;
StreamData(StreamData&&) = delete;
StreamData& operator=(const StreamData&) = delete;
StreamData& operator=(StreamData&&) = delete;

virtual std::string_view data() const = 0;
virtual std::span<const bool> nonNulls() const = 0;
virtual bool hasNulls() const = 0;
virtual bool empty() const = 0;
virtual uint64_t memoryUsed() const = 0;

virtual void reset() = 0;
virtual void materialize() {}

const StreamDescriptorBuilder& descriptor() const {
return descriptor_;
}

virtual ~StreamData() = default;

private:
const StreamDescriptorBuilder& descriptor_;
};

// Content only data stream.
// Used when a stream doesn't contain nulls.
template <typename T>
class ContentStreamData final : public StreamData {
public:
ContentStreamData(
velox::memory::MemoryPool& memoryPool,
const StreamDescriptorBuilder& descriptor)
: StreamData(descriptor), data_{&memoryPool}, extraMemory_{0} {}

inline virtual std::string_view data() const override {
return {
reinterpret_cast<const char*>(data_.data()), data_.size() * sizeof(T)};
}

inline virtual std::span<const bool> nonNulls() const override {
return {};
}

inline virtual bool hasNulls() const override {
return false;
}

inline virtual bool empty() const override {
return data_.empty();
}

inline virtual uint64_t memoryUsed() const override {
return (data_.size() * sizeof(T)) + extraMemory_;
}

inline Vector<T>& mutableData() {
return data_;
}

inline uint64_t& extraMemory() {
return extraMemory_;
}

inline virtual void reset() override {
data_.clear();
extraMemory_ = 0;
}

private:
Vector<T> data_;
uint64_t extraMemory_;
};

// Nulls only data stream.
// Used in cases where boolean data (representing nulls) is needed.
// NOTE: ContentStreamData<bool> can also be used to represent these data
// streams, however, for these "null streams", we have special optimizations,
// where if all data is non-null, we omit the stream. This class specialization
// helps with reusing enabling this optimization.
class NullsStreamData : public StreamData {
public:
NullsStreamData(
velox::memory::MemoryPool& memoryPool,
const StreamDescriptorBuilder& descriptor)
: StreamData(descriptor),
nonNulls_{&memoryPool},
hasNulls_{false},
bufferedCount_{0} {}

inline virtual std::string_view data() const override {
return {};
}

inline virtual std::span<const bool> nonNulls() const override {
return nonNulls_;
}

inline virtual bool hasNulls() const override {
return hasNulls_;
}

inline virtual bool empty() const override {
return nonNulls_.empty() && bufferedCount_ == 0;
}

inline virtual uint64_t memoryUsed() const override {
return nonNulls_.size();
}

inline Vector<bool>& mutableNonNulls() {
return nonNulls_;
}

inline virtual void reset() override {
nonNulls_.clear();
hasNulls_ = false;
bufferedCount_ = 0;
}

void materialize() override {
if (nonNulls_.size() < bufferedCount_) {
const auto offset = nonNulls_.size();
nonNulls_.resize(bufferedCount_);
std::fill(
nonNulls_.data() + offset, nonNulls_.data() + bufferedCount_, true);
}
}

void ensureNullsCapacity(bool mayHaveNulls, velox::vector_size_t size);

protected:
Vector<bool> nonNulls_;
bool hasNulls_;
uint32_t bufferedCount_;
};

// Nullable content data stream.
// Used in all cases where data may contain nulls.
template <typename T>
class NullableContentStreamData final : public NullsStreamData {
public:
NullableContentStreamData(
velox::memory::MemoryPool& memoryPool,
const StreamDescriptorBuilder& descriptor)
: NullsStreamData(memoryPool, descriptor),
data_{&memoryPool},
extraMemory_{0} {}

inline virtual std::string_view data() const override {
return {
reinterpret_cast<const char*>(data_.data()), data_.size() * sizeof(T)};
}

inline virtual bool empty() const override {
return NullsStreamData::empty() && data_.empty();
}

inline virtual uint64_t memoryUsed() const override {
return (data_.size() * sizeof(T)) + extraMemory_ +
NullsStreamData::memoryUsed();
}

inline Vector<T>& mutableData() {
return data_;
}

inline uint64_t& extraMemory() {
return extraMemory_;
}

inline virtual void reset() override {
NullsStreamData::reset();
data_.clear();
extraMemory_ = 0;
}

private:
Vector<T> data_;
uint64_t extraMemory_;
};

struct InputBufferGrowthStats {
std::atomic<uint64_t> count{0};
// realloc bytes would be interesting, but requires a bit more
Expand Down
39 changes: 39 additions & 0 deletions dwio/nimble/velox/StreamData.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright (c) Meta Platforms, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "dwio/nimble/velox/StreamData.h"

namespace facebook::nimble {

void NullsStreamData::ensureNullsCapacity(bool mayHaveNulls, uint32_t size) {
if (mayHaveNulls || hasNulls_) {
auto newSize = bufferedCount_ + size;
nonNulls_.reserve(newSize);
if (!hasNulls_) {
hasNulls_ = true;
std::fill(nonNulls_.data(), nonNulls_.data() + bufferedCount_, true);
nonNulls_.update_size(bufferedCount_);
}
if (!mayHaveNulls) {
std::fill(
nonNulls_.data() + bufferedCount_, nonNulls_.data() + newSize, true);
nonNulls_.update_size(newSize);
}
}
bufferedCount_ += size;
}

} // namespace facebook::nimble
Loading