Skip to content

Commit

Permalink
Merge branch 'branch-24.12' into pandas_upgrade
Browse files Browse the repository at this point in the history
  • Loading branch information
galipremsagar authored Oct 4, 2024
2 parents 49aec35 + 39342b8 commit 03749a9
Show file tree
Hide file tree
Showing 4 changed files with 158 additions and 70 deletions.
22 changes: 18 additions & 4 deletions cpp/include/cudf/io/datasource.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,28 @@ class datasource {
/**
* @brief Creates a source from a file path.
*
* @note Parameters `offset`, `max_size_estimate` and `min_size_estimate` are hints to the
* `datasource` implementation about the expected range of the data that will be read. The
* implementation may use these hints to optimize the read operation. These parameters are usually
* based on the byte range option. In this case, `min_size_estimate` should be no greater than the
* byte range to avoid potential issues when reading adjacent ranges. `max_size_estimate` can
* include padding after the byte range, to include additional data that may be needed for
* processing.
*
@throws cudf::logic_error if the minimum size estimate is greater than the maximum size estimate
*
* @param[in] filepath Path to the file to use
* @param[in] offset Bytes from the start of the file (the default is zero)
* @param[in] size Bytes from the offset; use zero for entire file (the default is zero)
* @param[in] offset Starting byte offset from which data will be read (the default is zero)
* @param[in] max_size_estimate Upper estimate of the data range that will be read (the default is
* zero, which means the whole file after `offset`)
* @param[in] min_size_estimate Lower estimate of the data range that will be read (the default is
* zero, which means the whole file after `offset`)
* @return Constructed datasource object
*/
static std::unique_ptr<datasource> create(std::string const& filepath,
size_t offset = 0,
size_t size = 0);
size_t offset = 0,
size_t max_size_estimate = 0,
size_t min_size_estimate = 0);

/**
* @brief Creates a source from a host memory buffer.
Expand Down
14 changes: 9 additions & 5 deletions cpp/src/io/functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,14 +122,16 @@ chunked_parquet_writer_options_builder chunked_parquet_writer_options::builder(
namespace {

std::vector<std::unique_ptr<cudf::io::datasource>> make_datasources(source_info const& info,
size_t range_offset = 0,
size_t range_size = 0)
size_t offset = 0,
size_t max_size_estimate = 0,
size_t min_size_estimate = 0)
{
switch (info.type()) {
case io_type::FILEPATH: {
auto sources = std::vector<std::unique_ptr<cudf::io::datasource>>();
for (auto const& filepath : info.filepaths()) {
sources.emplace_back(cudf::io::datasource::create(filepath, range_offset, range_size));
sources.emplace_back(
cudf::io::datasource::create(filepath, offset, max_size_estimate, min_size_estimate));
}
return sources;
}
Expand Down Expand Up @@ -211,7 +213,8 @@ table_with_metadata read_json(json_reader_options options,

auto datasources = make_datasources(options.get_source(),
options.get_byte_range_offset(),
options.get_byte_range_size_with_padding());
options.get_byte_range_size_with_padding(),
options.get_byte_range_size());

return json::detail::read_json(datasources, options, stream, mr);
}
Expand All @@ -238,7 +241,8 @@ table_with_metadata read_csv(csv_reader_options options,

auto datasources = make_datasources(options.get_source(),
options.get_byte_range_offset(),
options.get_byte_range_size_with_padding());
options.get_byte_range_size_with_padding(),
options.get_byte_range_size());

CUDF_EXPECTS(datasources.size() == 1, "Only a single source is currently supported.");

Expand Down
157 changes: 96 additions & 61 deletions cpp/src/io/utilities/datasource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include <unistd.h>

#include <unordered_map>
#include <vector>

namespace cudf {
namespace io {
Expand All @@ -54,6 +55,30 @@ class file_source : public datasource {
}
}

std::unique_ptr<buffer> host_read(size_t offset, size_t size) override
{
lseek(_file.desc(), offset, SEEK_SET);

// Clamp length to available data
ssize_t const read_size = std::min(size, _file.size() - offset);

std::vector<uint8_t> v(read_size);
CUDF_EXPECTS(read(_file.desc(), v.data(), read_size) == read_size, "read failed");
return buffer::create(std::move(v));
}

size_t host_read(size_t offset, size_t size, uint8_t* dst) override
{
lseek(_file.desc(), offset, SEEK_SET);

// Clamp length to available data
auto const read_size = std::min(size, _file.size() - offset);

CUDF_EXPECTS(read(_file.desc(), dst, read_size) == static_cast<ssize_t>(read_size),
"read failed");
return read_size;
}

~file_source() override = default;

[[nodiscard]] bool supports_device_read() const override
Expand Down Expand Up @@ -138,40 +163,63 @@ class file_source : public datasource {
*/
class memory_mapped_source : public file_source {
public:
explicit memory_mapped_source(char const* filepath, size_t offset, size_t size)
explicit memory_mapped_source(char const* filepath,
size_t offset,
size_t max_size_estimate,
size_t min_size_estimate)
: file_source(filepath)
{
if (_file.size() != 0) {
map(_file.desc(), offset, size);
register_mmap_buffer();
// Memory mapping is not exclusive, so we can include the whole region we expect to read
map(_file.desc(), offset, max_size_estimate);
// Buffer registration is exclusive (can't overlap with other registered buffers) so we
// register the lower estimate; this avoids issues when reading adjacent ranges from the same
// file from multiple threads
register_mmap_buffer(offset, min_size_estimate);
}
}

~memory_mapped_source() override
{
if (_map_addr != nullptr) {
munmap(_map_addr, _map_size);
unmap();
unregister_mmap_buffer();
}
}

std::unique_ptr<buffer> host_read(size_t offset, size_t size) override
{
CUDF_EXPECTS(offset >= _map_offset, "Requested offset is outside mapping");
// Clamp length to available data
auto const read_size = std::min(size, +_file.size() - offset);

// If the requested range is outside of the mapped region, read from the file
if (offset < _map_offset or offset + read_size > (_map_offset + _map_size)) {
return file_source::host_read(offset, read_size);
}

// Clamp length to available data in the mapped region
auto const read_size = std::min(size, _map_size - (offset - _map_offset));
// If the requested range is only partially within the registered region, copy to a new
// host buffer to make the data safe to copy to the device
if (_reg_addr != nullptr and
(offset < _reg_offset or offset + read_size > (_reg_offset + _reg_size))) {
auto const src = static_cast<uint8_t*>(_map_addr) + (offset - _map_offset);

return std::make_unique<owning_buffer<std::vector<uint8_t>>>(
std::vector<uint8_t>(src, src + read_size));
}

return std::make_unique<non_owning_buffer>(
static_cast<uint8_t*>(_map_addr) + (offset - _map_offset), read_size);
static_cast<uint8_t*>(_map_addr) + offset - _map_offset, read_size);
}

size_t host_read(size_t offset, size_t size, uint8_t* dst) override
{
CUDF_EXPECTS(offset >= _map_offset, "Requested offset is outside mapping");
// Clamp length to available data
auto const read_size = std::min(size, +_file.size() - offset);

// Clamp length to available data in the mapped region
auto const read_size = std::min(size, _map_size - (offset - _map_offset));
// If the requested range is outside of the mapped region, read from the file
if (offset < _map_offset or offset + read_size > (_map_offset + _map_size)) {
return file_source::host_read(offset, read_size, dst);
}

auto const src = static_cast<uint8_t*>(_map_addr) + (offset - _map_offset);
std::memcpy(dst, src, read_size);
Expand All @@ -184,16 +232,18 @@ class memory_mapped_source : public file_source {
*
* Fixes nvbugs/4215160
*/
void register_mmap_buffer()
void register_mmap_buffer(size_t offset, size_t size)
{
if (_map_addr == nullptr or _map_size == 0 or not pageableMemoryAccessUsesHostPageTables()) {
return;
}
if (_map_addr == nullptr or not pageableMemoryAccessUsesHostPageTables()) { return; }

auto const result = cudaHostRegister(_map_addr, _map_size, cudaHostRegisterDefault);
if (result == cudaSuccess) {
_is_map_registered = true;
} else {
// Registered region must be within the mapped region
_reg_offset = std::max(offset, _map_offset);
_reg_size = std::min(size != 0 ? size : _map_size, (_map_offset + _map_size) - _reg_offset);

_reg_addr = static_cast<std::byte*>(_map_addr) - _map_offset + _reg_offset;
auto const result = cudaHostRegister(_reg_addr, _reg_size, cudaHostRegisterReadOnly);
if (result != cudaSuccess) {
_reg_addr = nullptr;
CUDF_LOG_WARN("cudaHostRegister failed with {} ({})",
static_cast<int>(result),
cudaGetErrorString(result));
Expand All @@ -205,10 +255,12 @@ class memory_mapped_source : public file_source {
*/
void unregister_mmap_buffer()
{
if (not _is_map_registered) { return; }
if (_reg_addr == nullptr) { return; }

auto const result = cudaHostUnregister(_map_addr);
if (result != cudaSuccess) {
auto const result = cudaHostUnregister(_reg_addr);
if (result == cudaSuccess) {
_reg_addr = nullptr;
} else {
CUDF_LOG_WARN("cudaHostUnregister failed with {} ({})",
static_cast<int>(result),
cudaGetErrorString(result));
Expand All @@ -226,52 +278,30 @@ class memory_mapped_source : public file_source {

// Size for `mmap()` needs to include the page padding
_map_size = size + (offset - _map_offset);
if (_map_size == 0) { return; }

// Check if accessing a region within already mapped area
_map_addr = mmap(nullptr, _map_size, PROT_READ, MAP_PRIVATE, fd, _map_offset);
CUDF_EXPECTS(_map_addr != MAP_FAILED, "Cannot create memory mapping");
}

private:
size_t _map_size = 0;
size_t _map_offset = 0;
void* _map_addr = nullptr;
bool _is_map_registered = false;
};

/**
* @brief Implementation class for reading from a file using `read` calls
*
* Potentially faster than `memory_mapped_source` when only a small portion of the file is read
* through the host.
*/
class direct_read_source : public file_source {
public:
explicit direct_read_source(char const* filepath) : file_source(filepath) {}

std::unique_ptr<buffer> host_read(size_t offset, size_t size) override
void unmap()
{
lseek(_file.desc(), offset, SEEK_SET);

// Clamp length to available data
ssize_t const read_size = std::min(size, _file.size() - offset);

std::vector<uint8_t> v(read_size);
CUDF_EXPECTS(read(_file.desc(), v.data(), read_size) == read_size, "read failed");
return buffer::create(std::move(v));
if (_map_addr != nullptr) {
auto const result = munmap(_map_addr, _map_size);
if (result != 0) { CUDF_LOG_WARN("munmap failed with {}", result); }
_map_addr = nullptr;
}
}

size_t host_read(size_t offset, size_t size, uint8_t* dst) override
{
lseek(_file.desc(), offset, SEEK_SET);

// Clamp length to available data
auto const read_size = std::min(size, _file.size() - offset);
private:
size_t _map_offset = 0;
size_t _map_size = 0;
void* _map_addr = nullptr;

CUDF_EXPECTS(read(_file.desc(), dst, read_size) == static_cast<ssize_t>(read_size),
"read failed");
return read_size;
}
size_t _reg_offset = 0;
size_t _reg_size = 0;
void* _reg_addr = nullptr;
};

/**
Expand Down Expand Up @@ -431,16 +461,21 @@ class user_datasource_wrapper : public datasource {

std::unique_ptr<datasource> datasource::create(std::string const& filepath,
size_t offset,
size_t size)
size_t max_size_estimate,
size_t min_size_estimate)
{
CUDF_EXPECTS(max_size_estimate == 0 or min_size_estimate <= max_size_estimate,
"Invalid min/max size estimates for datasource creation");

#ifdef CUFILE_FOUND
if (cufile_integration::is_always_enabled()) {
// avoid mmap as GDS is expected to be used for most reads
return std::make_unique<direct_read_source>(filepath.c_str());
return std::make_unique<file_source>(filepath.c_str());
}
#endif
// Use our own memory mapping implementation for direct file reads
return std::make_unique<memory_mapped_source>(filepath.c_str(), offset, size);
return std::make_unique<memory_mapped_source>(
filepath.c_str(), offset, max_size_estimate, min_size_estimate);
}

std::unique_ptr<datasource> datasource::create(host_buffer const& buffer)
Expand Down
35 changes: 35 additions & 0 deletions cpp/tests/io/csv_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2516,4 +2516,39 @@ TEST_F(CsvReaderTest, UTF8BOM)
CUDF_TEST_EXPECT_TABLES_EQUIVALENT(result_view, expected);
}

void expect_buffers_equal(cudf::io::datasource::buffer* lhs, cudf::io::datasource::buffer* rhs)
{
ASSERT_EQ(lhs->size(), rhs->size());
EXPECT_EQ(0, std::memcmp(lhs->data(), rhs->data(), lhs->size()));
}

TEST_F(CsvReaderTest, OutOfMapBoundsReads)
{
// write a lot of data into a file
auto filepath = temp_env->get_temp_dir() + "OutOfMapBoundsReads.csv";
auto const num_rows = 1 << 20;
auto const row = std::string{"0,1,2,3,4,5,6,7,8,9\n"};
auto const file_size = num_rows * row.size();
{
std::ofstream outfile(filepath, std::ofstream::out);
for (size_t i = 0; i < num_rows; ++i) {
outfile << row;
}
}

// Only memory map the middle of the file
auto source = cudf::io::datasource::create(filepath, file_size / 2, file_size / 4);
auto full_source = cudf::io::datasource::create(filepath);
auto const all_data = source->host_read(0, file_size);
auto ref_data = full_source->host_read(0, file_size);
expect_buffers_equal(ref_data.get(), all_data.get());

auto const start_data = source->host_read(file_size / 2, file_size / 2);
expect_buffers_equal(full_source->host_read(file_size / 2, file_size / 2).get(),
start_data.get());

auto const end_data = source->host_read(0, file_size / 2 + 512);
expect_buffers_equal(full_source->host_read(0, file_size / 2 + 512).get(), end_data.get());
}

CUDF_TEST_PROGRAM_MAIN()

0 comments on commit 03749a9

Please sign in to comment.