diff --git a/dwio/nimble/common/tests/TestUtils.h b/dwio/nimble/common/tests/TestUtils.h index cc09f77..9cd14ec 100644 --- a/dwio/nimble/common/tests/TestUtils.h +++ b/dwio/nimble/common/tests/TestUtils.h @@ -23,6 +23,7 @@ #include "dwio/nimble/common/Types.h" #include "dwio/nimble/common/Vector.h" #include "folly/Random.h" +#include "folly/Synchronized.h" #include "velox/common/file/File.h" #include "velox/common/memory/Memory.h" @@ -291,12 +292,12 @@ class InMemoryTrackableReadFile final : public velox::ReadFile { std::string_view pread(uint64_t offset, uint64_t length, void* buf) const final { - chunks_.push_back({offset, length}); + chunks_.wlock()->push_back({offset, length}); return file_.pread(offset, length, buf); } std::string pread(uint64_t offset, uint64_t length) const final { - chunks_.push_back({offset, length}); + chunks_.wlock()->push_back({offset, length}); return file_.pread(offset, length); } @@ -314,7 +315,7 @@ class InMemoryTrackableReadFile final : public velox::ReadFile { const auto& region = regions[i]; auto& output = iobufs[i]; if (shouldProduceChainedBuffers_) { - chunks_.push_back({region.offset, region.length}); + chunks_.wlock()->push_back({region.offset, region.length}); uint64_t splitPoint = region.length / 2; output = folly::IOBuf(folly::IOBuf::CREATE, splitPoint); file_.pread(region.offset, splitPoint, output.writableData()); @@ -350,12 +351,12 @@ class InMemoryTrackableReadFile final : public velox::ReadFile { return file_.shouldCoalesce(); } - const std::vector& chunks() { - return chunks_; + std::vector chunks() { + return *chunks_.rlock(); } void resetChunks() { - chunks_.clear(); + chunks_.wlock()->clear(); } std::string getName() const override { @@ -369,7 +370,7 @@ class InMemoryTrackableReadFile final : public velox::ReadFile { private: velox::InMemoryReadFile file_; bool shouldProduceChainedBuffers_; - mutable std::vector chunks_; + mutable folly::Synchronized> chunks_; }; } // namespace facebook::nimble::testing diff --git a/dwio/nimble/tablet/TabletReader.cpp b/dwio/nimble/tablet/TabletReader.cpp index 33197c5..d1ab40a 100644 --- a/dwio/nimble/tablet/TabletReader.cpp +++ b/dwio/nimble/tablet/TabletReader.cpp @@ -261,8 +261,9 @@ TabletReader::TabletReader( /* stripeIndex */ 0, std::make_unique(memoryPool, stripeGroup)); initStripes(); + auto optionalSectionsCacheLock = optionalSectionsCache_.wlock(); for (auto& pair : optionalSections) { - optionalSectionsCache_.insert( + optionalSectionsCacheLock->insert( {pair.first, std::make_unique(memoryPool, pair.second)}); } @@ -407,7 +408,7 @@ TabletReader::TabletReader( sectionOffset - offset, sectionSize, sectionCompressionType); - optionalSectionsCache_.insert({preload, std::move(metadata)}); + optionalSectionsCache_.wlock()->insert({preload, std::move(metadata)}); } } if (!mustRead.empty()) { @@ -421,7 +422,7 @@ TabletReader::TabletReader( const std::string preload{mustRead[i].label}; auto metadata = std::make_unique( memoryPool_, iobuf, std::get<2>(optionalSections_[preload])); - optionalSectionsCache_.insert({preload, std::move(metadata)}); + optionalSectionsCache_.wlock()->insert({preload, std::move(metadata)}); } } } @@ -609,14 +610,17 @@ std::optional
TabletReader::loadOptionalSection( const std::string& name, bool keepCache) const { NIMBLE_CHECK(!name.empty(), "Optional section name cannot be empty."); - auto itCache = optionalSectionsCache_.find(name); - if (itCache != optionalSectionsCache_.end()) { - if (keepCache) { - return Section{MetadataBuffer{*itCache->second}}; - } else { - auto metadata = std::move(itCache->second); - optionalSectionsCache_.erase(itCache); - return Section{std::move(*metadata)}; + { + auto optionalSectionsCache = optionalSectionsCache_.wlock(); + auto itCache = optionalSectionsCache->find(name); + if (itCache != optionalSectionsCache->end()) { + if (keepCache) { + return Section{MetadataBuffer{*itCache->second}}; + } else { + auto metadata = std::move(itCache->second); + optionalSectionsCache->erase(itCache); + return Section{std::move(*metadata)}; + } } } diff --git a/dwio/nimble/tablet/TabletReader.h b/dwio/nimble/tablet/TabletReader.h index 5cdb876..8e36544 100644 --- a/dwio/nimble/tablet/TabletReader.h +++ b/dwio/nimble/tablet/TabletReader.h @@ -19,6 +19,7 @@ #include "dwio/nimble/common/Checksum.h" #include "dwio/nimble/common/Vector.h" #include "folly/Range.h" +#include "folly/Synchronized.h" #include "folly/io/IOBuf.h" #include "velox/common/file/File.h" #include "velox/common/memory/Memory.h" @@ -289,7 +290,8 @@ class TabletReader { std::string, std::tuple> optionalSections_; - mutable std::unordered_map> + mutable folly::Synchronized< + std::unordered_map>> optionalSectionsCache_; friend class TabletHelper; diff --git a/dwio/nimble/tablet/tests/CMakeLists.txt b/dwio/nimble/tablet/tests/CMakeLists.txt index ed91280..aaeb69e 100644 --- a/dwio/nimble/tablet/tests/CMakeLists.txt +++ b/dwio/nimble/tablet/tests/CMakeLists.txt @@ -20,6 +20,7 @@ target_link_libraries( nimble_tablet_reader nimble_tablet_writer nimble_common + velox_dwio_common velox_memory velox_file gtest diff --git a/dwio/nimble/tablet/tests/TabletTests.cpp b/dwio/nimble/tablet/tests/TabletTests.cpp index 966a531..108b412 100644 --- a/dwio/nimble/tablet/tests/TabletTests.cpp +++ b/dwio/nimble/tablet/tests/TabletTests.cpp @@ -13,7 +13,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -#include #include #include #include @@ -28,9 +27,11 @@ #include "dwio/nimble/tablet/TabletWriter.h" #include "folly/FileUtil.h" #include "folly/Random.h" +#include "folly/executors/CPUThreadPoolExecutor.h" #include "folly/experimental/coro/Generator.h" #include "velox/common/file/File.h" #include "velox/common/memory/Memory.h" +#include "velox/dwio/common/ExecutorBarrier.h" using namespace ::facebook; @@ -513,14 +514,14 @@ TEST(TabletTests, OptionalSections) { const std::string& content = random; tabletWriter.writeOptionalSection("section1", content); } + std::string zeroes; { - std::string content; - content.resize(randomSize); - for (auto i = 0; i < content.size(); ++i) { - content[i] = '\0'; + zeroes.resize(randomSize); + for (auto i = 0; i < zeroes.size(); ++i) { + zeroes[i] = '\0'; } - tabletWriter.writeOptionalSection("section2", content); + tabletWriter.writeOptionalSection("section2", zeroes); } { std::string content; @@ -529,30 +530,44 @@ TEST(TabletTests, OptionalSections) { tabletWriter.close(); + folly::CPUThreadPoolExecutor executor{5}; + facebook::velox::dwio::common::ExecutorBarrier barrier{executor}; + for (auto useChaniedBuffers : {false, true}) { nimble::testing::InMemoryTrackableReadFile readFile( file, useChaniedBuffers); nimble::TabletReader tablet{*pool, &readFile}; - auto section = tablet.loadOptionalSection("section1"); - ASSERT_TRUE(section.has_value()); - ASSERT_EQ(random, section->content()); + auto check1 = [&]() { + auto section = tablet.loadOptionalSection("section1"); + ASSERT_TRUE(section.has_value()); + ASSERT_EQ(random, section->content()); + }; - std::string expectedContent; - expectedContent.resize(randomSize); - for (auto i = 0; i < expectedContent.size(); ++i) { - expectedContent[i] = '\0'; - } - section = tablet.loadOptionalSection("section2"); - ASSERT_TRUE(section.has_value()); - ASSERT_EQ(expectedContent, section->content()); + auto check2 = [&]() { + auto section = tablet.loadOptionalSection("section2"); + ASSERT_TRUE(section.has_value()); + ASSERT_EQ(zeroes, section->content()); + }; - section = tablet.loadOptionalSection("section3"); - ASSERT_TRUE(section.has_value()); - ASSERT_EQ(std::string(), section->content()); + auto check3 = [&, empty = std::string()]() { + auto section = tablet.loadOptionalSection("section3"); + ASSERT_TRUE(section.has_value()); + ASSERT_EQ(empty, section->content()); + }; - section = tablet.loadOptionalSection("section4"); - ASSERT_FALSE(section.has_value()); + auto check4 = [&]() { + auto section = tablet.loadOptionalSection("section4"); + ASSERT_FALSE(section.has_value()); + }; + + for (int i = 0; i < 10; ++i) { + barrier.add(check1); + barrier.add(check2); + barrier.add(check3); + barrier.add(check4); + } + barrier.waitAll(); } }