Skip to content

Commit

Permalink
Syncrhonize access to optionalSectionsCache_
Browse files Browse the repository at this point in the history
Summary: To make the class thread safe.

Reviewed By: helfman

Differential Revision: D56763840
  • Loading branch information
Daniel Munoz authored and facebook-github-bot committed May 1, 2024
1 parent 3acdb4b commit 6fcecee
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 41 deletions.
15 changes: 8 additions & 7 deletions dwio/nimble/common/tests/TestUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "dwio/nimble/common/Types.h"
#include "dwio/nimble/common/Vector.h"
#include "folly/Random.h"
#include "folly/Synchronized.h"
#include "velox/common/file/File.h"
#include "velox/common/memory/Memory.h"

Expand Down Expand Up @@ -291,12 +292,12 @@ class InMemoryTrackableReadFile final : public velox::ReadFile {

std::string_view pread(uint64_t offset, uint64_t length, void* buf)
const final {
chunks_.push_back({offset, length});
chunks_.wlock()->push_back({offset, length});
return file_.pread(offset, length, buf);
}

std::string pread(uint64_t offset, uint64_t length) const final {
chunks_.push_back({offset, length});
chunks_.wlock()->push_back({offset, length});
return file_.pread(offset, length);
}

Expand All @@ -314,7 +315,7 @@ class InMemoryTrackableReadFile final : public velox::ReadFile {
const auto& region = regions[i];
auto& output = iobufs[i];
if (shouldProduceChainedBuffers_) {
chunks_.push_back({region.offset, region.length});
chunks_.wlock()->push_back({region.offset, region.length});
uint64_t splitPoint = region.length / 2;
output = folly::IOBuf(folly::IOBuf::CREATE, splitPoint);
file_.pread(region.offset, splitPoint, output.writableData());
Expand Down Expand Up @@ -350,12 +351,12 @@ class InMemoryTrackableReadFile final : public velox::ReadFile {
return file_.shouldCoalesce();
}

const std::vector<Chunk>& chunks() {
return chunks_;
std::vector<Chunk> chunks() {
return *chunks_.rlock();
}

void resetChunks() {
chunks_.clear();
chunks_.wlock()->clear();
}

std::string getName() const override {
Expand All @@ -369,7 +370,7 @@ class InMemoryTrackableReadFile final : public velox::ReadFile {
private:
velox::InMemoryReadFile file_;
bool shouldProduceChainedBuffers_;
mutable std::vector<Chunk> chunks_;
mutable folly::Synchronized<std::vector<Chunk>> chunks_;
};

} // namespace facebook::nimble::testing
26 changes: 15 additions & 11 deletions dwio/nimble/tablet/TabletReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -261,8 +261,9 @@ TabletReader::TabletReader(
/* stripeIndex */ 0,
std::make_unique<MetadataBuffer>(memoryPool, stripeGroup));
initStripes();
auto optionalSectionsCacheLock = optionalSectionsCache_.wlock();
for (auto& pair : optionalSections) {
optionalSectionsCache_.insert(
optionalSectionsCacheLock->insert(
{pair.first,
std::make_unique<MetadataBuffer>(memoryPool, pair.second)});
}
Expand Down Expand Up @@ -407,7 +408,7 @@ TabletReader::TabletReader(
sectionOffset - offset,
sectionSize,
sectionCompressionType);
optionalSectionsCache_.insert({preload, std::move(metadata)});
optionalSectionsCache_.wlock()->insert({preload, std::move(metadata)});
}
}
if (!mustRead.empty()) {
Expand All @@ -421,7 +422,7 @@ TabletReader::TabletReader(
const std::string preload{mustRead[i].label};
auto metadata = std::make_unique<MetadataBuffer>(
memoryPool_, iobuf, std::get<2>(optionalSections_[preload]));
optionalSectionsCache_.insert({preload, std::move(metadata)});
optionalSectionsCache_.wlock()->insert({preload, std::move(metadata)});
}
}
}
Expand Down Expand Up @@ -609,14 +610,17 @@ std::optional<Section> TabletReader::loadOptionalSection(
const std::string& name,
bool keepCache) const {
NIMBLE_CHECK(!name.empty(), "Optional section name cannot be empty.");
auto itCache = optionalSectionsCache_.find(name);
if (itCache != optionalSectionsCache_.end()) {
if (keepCache) {
return Section{MetadataBuffer{*itCache->second}};
} else {
auto metadata = std::move(itCache->second);
optionalSectionsCache_.erase(itCache);
return Section{std::move(*metadata)};
{
auto optionalSectionsCache = optionalSectionsCache_.wlock();
auto itCache = optionalSectionsCache->find(name);
if (itCache != optionalSectionsCache->end()) {
if (keepCache) {
return Section{MetadataBuffer{*itCache->second}};
} else {
auto metadata = std::move(itCache->second);
optionalSectionsCache->erase(itCache);
return Section{std::move(*metadata)};
}
}
}

Expand Down
4 changes: 3 additions & 1 deletion dwio/nimble/tablet/TabletReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "dwio/nimble/common/Checksum.h"
#include "dwio/nimble/common/Vector.h"
#include "folly/Range.h"
#include "folly/Synchronized.h"
#include "folly/io/IOBuf.h"
#include "velox/common/file/File.h"
#include "velox/common/memory/Memory.h"
Expand Down Expand Up @@ -289,7 +290,8 @@ class TabletReader {
std::string,
std::tuple<uint64_t, uint32_t, CompressionType>>
optionalSections_;
mutable std::unordered_map<std::string, std::unique_ptr<MetadataBuffer>>
mutable folly::Synchronized<
std::unordered_map<std::string, std::unique_ptr<MetadataBuffer>>>
optionalSectionsCache_;

friend class TabletHelper;
Expand Down
59 changes: 37 additions & 22 deletions dwio/nimble/tablet/tests/TabletTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <folly/executors/CPUThreadPoolExecutor.h>
#include <gtest/gtest.h>
#include <algorithm>
#include <iterator>
Expand All @@ -28,9 +27,11 @@
#include "dwio/nimble/tablet/TabletWriter.h"
#include "folly/FileUtil.h"
#include "folly/Random.h"
#include "folly/executors/CPUThreadPoolExecutor.h"
#include "folly/experimental/coro/Generator.h"
#include "velox/common/file/File.h"
#include "velox/common/memory/Memory.h"
#include "velox/dwio/common/ExecutorBarrier.h"

using namespace ::facebook;

Expand Down Expand Up @@ -513,14 +514,14 @@ TEST(TabletTests, OptionalSections) {
const std::string& content = random;
tabletWriter.writeOptionalSection("section1", content);
}
std::string zeroes;
{
std::string content;
content.resize(randomSize);
for (auto i = 0; i < content.size(); ++i) {
content[i] = '\0';
zeroes.resize(randomSize);
for (auto i = 0; i < zeroes.size(); ++i) {
zeroes[i] = '\0';
}

tabletWriter.writeOptionalSection("section2", content);
tabletWriter.writeOptionalSection("section2", zeroes);
}
{
std::string content;
Expand All @@ -529,30 +530,44 @@ TEST(TabletTests, OptionalSections) {

tabletWriter.close();

folly::CPUThreadPoolExecutor executor{5};
facebook::velox::dwio::common::ExecutorBarrier barrier{executor};

for (auto useChaniedBuffers : {false, true}) {
nimble::testing::InMemoryTrackableReadFile readFile(
file, useChaniedBuffers);
nimble::TabletReader tablet{*pool, &readFile};

auto section = tablet.loadOptionalSection("section1");
ASSERT_TRUE(section.has_value());
ASSERT_EQ(random, section->content());
auto check1 = [&]() {
auto section = tablet.loadOptionalSection("section1");
ASSERT_TRUE(section.has_value());
ASSERT_EQ(random, section->content());
};

std::string expectedContent;
expectedContent.resize(randomSize);
for (auto i = 0; i < expectedContent.size(); ++i) {
expectedContent[i] = '\0';
}
section = tablet.loadOptionalSection("section2");
ASSERT_TRUE(section.has_value());
ASSERT_EQ(expectedContent, section->content());
auto check2 = [&]() {
auto section = tablet.loadOptionalSection("section2");
ASSERT_TRUE(section.has_value());
ASSERT_EQ(zeroes, section->content());
};

section = tablet.loadOptionalSection("section3");
ASSERT_TRUE(section.has_value());
ASSERT_EQ(std::string(), section->content());
auto check3 = [&, empty = std::string()]() {
auto section = tablet.loadOptionalSection("section3");
ASSERT_TRUE(section.has_value());
ASSERT_EQ(empty, section->content());
};

section = tablet.loadOptionalSection("section4");
ASSERT_FALSE(section.has_value());
auto check4 = [&]() {
auto section = tablet.loadOptionalSection("section4");
ASSERT_FALSE(section.has_value());
};

for (int i = 0; i < 10; ++i) {
barrier.add(check1);
barrier.add(check2);
barrier.add(check3);
barrier.add(check4);
}
barrier.waitAll();
}
}

Expand Down

0 comments on commit 6fcecee

Please sign in to comment.