Skip to content

Commit

Permalink
datalake: batching_parquet_writer returns valid data_writer_result
Browse files Browse the repository at this point in the history
This commit adds file size and file path to data writer result. It also
modifies batching_parquet_writer to return a valid data_writer_result
and modifies the tests to check this.
  • Loading branch information
jcipar committed Oct 4, 2024
1 parent d0995d4 commit 03b954d
Show file tree
Hide file tree
Showing 7 changed files with 182 additions and 47 deletions.
131 changes: 104 additions & 27 deletions src/v/datalake/batching_parquet_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

#include "datalake/batching_parquet_writer.h"

#include "bytes/iostream.h"
#include "bytes/iostream.h"
#include "datalake/arrow_translator.h"
#include "datalake/data_writer_interface.h"
Expand All @@ -34,53 +35,129 @@ batching_parquet_writer::batching_parquet_writer(
, _row_count_threshold{row_count_threshold}
, _byte_count_threshold{byte_count_threshold} {}

ss::future<>
ss::future<data_writer_error>
batching_parquet_writer::initialize(std::filesystem::path output_file_path) {
_output_file = co_await ss::open_file_dma(
output_file_path.string(),
ss::open_flags::create | ss::open_flags::truncate | ss::open_flags::wo);
_output_file_path = std::move(output_file_path);
try {
_output_file = co_await ss::open_file_dma(
_output_file_path.string(),
ss::open_flags::create | ss::open_flags::truncate
| ss::open_flags::wo);
} catch (...) {
co_return data_writer_error::file_io_error;
}
bool error = false;
try {
_output_stream = co_await ss::make_file_output_stream(_output_file);
} catch (...) {
error = true;
}
if (error) {
co_await _output_file.close();
co_return data_writer_error::file_io_error;
}

_output_stream = co_await ss::make_file_output_stream(_output_file);
_result.file_path = _output_file_path.string();
co_return data_writer_error::ok;
}

ss::future<data_writer_error> batching_parquet_writer::add_data_struct(
iceberg::struct_value data, int64_t approx_size) {
_iceberg_to_arrow.add_data(std::move(data));
bool error = false;
try {
_iceberg_to_arrow.add_data(std::move(data));
} catch (...) {
error = true;
}
if (error) {
co_await abort();
co_return data_writer_error::parquet_conversion_error;
}
_row_count++;
_byte_count += approx_size;

if (
_row_count >= _row_count_threshold
|| _byte_count > _byte_count_threshold) {
co_await write_row_group();
co_return co_await write_row_group();
}
co_return data_writer_error::ok;
}

ss::future<data_writer_result> batching_parquet_writer::finish() {
// TODO: fill in result structure
return write_row_group()
.then([this] {
data_writer_result res;
iobuf out = _arrow_to_iobuf.close_and_take_iobuf();

return write_iobuf_to_output_stream(std::move(out), _output_stream)
.then([res] { return res; });
})
.finally([this] { return _output_stream.close(); });
ss::future<result<data_writer_result, data_writer_error>>
batching_parquet_writer::finish() {
auto write_result = co_await write_row_group();
if (write_result != data_writer_error::ok) {
co_await abort();
co_return write_result;
}
bool error = false;
iobuf out;
try {
out = _arrow_to_iobuf.close_and_take_iobuf();
_result.file_size_bytes += out.size_bytes();
} catch (...) {
error = true;
}
if (error) {
co_await abort();
co_return data_writer_error::parquet_conversion_error;
}

try {
co_await write_iobuf_to_output_stream(std::move(out), _output_stream);
co_await _output_stream.close();
} catch (...) {
error = true;
}
if (error) {
co_await abort();
co_return data_writer_error::file_io_error;
}

co_return _result;
}

ss::future<> batching_parquet_writer::write_row_group() {
ss::future<data_writer_error> batching_parquet_writer::write_row_group() {
if (_row_count == 0) {
// This can happen if finish() is called when there is no new data.
co_return;
}
auto chunk = _iceberg_to_arrow.take_chunk();
_row_count = 0;
_byte_count = 0;
_arrow_to_iobuf.add_arrow_array(chunk);
iobuf out = _arrow_to_iobuf.take_iobuf();
co_await write_iobuf_to_output_stream(std::move(out), _output_stream);
co_return data_writer_error::ok;
}
bool error = false;
iobuf out;
try {
auto chunk = _iceberg_to_arrow.take_chunk();
_result.record_count += _row_count;
_row_count = 0;
_byte_count = 0;
_arrow_to_iobuf.add_arrow_array(chunk);
out = _arrow_to_iobuf.take_iobuf();
_result.file_size_bytes += out.size_bytes();
} catch (...) {
error = true;
}
if (error) {
co_await abort();
co_return data_writer_error::parquet_conversion_error;
}
try {
co_await write_iobuf_to_output_stream(std::move(out), _output_stream);
} catch (...) {
error = true;
}
if (error) {
co_await abort();
co_return data_writer_error::file_io_error;
}
co_return data_writer_error::ok;
}

ss::future<> batching_parquet_writer::abort() {
co_await _output_stream.close();
auto exists = co_await ss::file_exists(_output_file_path.c_str());
if (exists) {
co_await ss::remove_file(_output_file_path.c_str());
}
}

} // namespace datalake
14 changes: 11 additions & 3 deletions src/v/datalake/batching_parquet_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
*/

#pragma once
#include "base/outcome.h"
#include "datalake/arrow_translator.h"
#include "datalake/data_writer_interface.h"
#include "datalake/parquet_writer.h"
Expand Down Expand Up @@ -42,15 +43,20 @@ class batching_parquet_writer : public data_writer {
size_t row_count_threshold,
size_t byte_count_threshold);

ss::future<> initialize(std::filesystem::path output_file_path);
ss::future<data_writer_error>
initialize(std::filesystem::path output_file_path);

ss::future<data_writer_error>
add_data_struct(iceberg::struct_value data, int64_t approx_size) override;

ss::future<data_writer_result> finish() override;
ss::future<result<data_writer_result, data_writer_error>> finish() override;

// Close the file handle, delete any temporary data and clean up any other
// state.
ss::future<> abort();

private:
ss::future<> write_row_group();
ss::future<data_writer_error> write_row_group();

// translating
arrow_translator _iceberg_to_arrow;
Expand All @@ -63,8 +69,10 @@ class batching_parquet_writer : public data_writer {
size_t _byte_count = 0;

// Output
std::filesystem::path _output_file_path;
ss::file _output_file;
ss::output_stream<char> _output_stream;
data_writer_result _result;
};

} // namespace datalake
20 changes: 14 additions & 6 deletions src/v/datalake/data_writer_interface.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
*
* https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md
*/
#include "base/outcome.h"
#include "datalake/schemaless_translator.h"
#include "iceberg/datatypes.h"
#include "iceberg/values.h"
Expand All @@ -20,17 +21,21 @@
namespace datalake {
enum class data_writer_error {
ok = 0,
uh_oh,
parquet_conversion_error,
file_io_error,

};

struct data_writer_result
: serde::envelope<
data_writer_result,
serde::version<0>,
serde::compat_version<0>> {
size_t row_count = 0;
ss::sstring file_path = "";
size_t record_count = 0;
size_t file_size_bytes = 0;

auto serde_fields() { return std::tie(row_count); }
auto serde_fields() { return std::tie(record_count); }
};

class data_writer {
Expand All @@ -41,7 +46,8 @@ class data_writer {
iceberg::struct_value /* data */, int64_t /* approx_size */)
= 0;

virtual ss::future<data_writer_result> finish() = 0;
virtual ss::future<result<data_writer_result, data_writer_error>>
finish() = 0;
};

class data_writer_factory {
Expand All @@ -59,8 +65,10 @@ struct data_writer_error_category : std::error_category {
switch (static_cast<data_writer_error>(ev)) {
case data_writer_error::ok:
return "Ok";
case data_writer_error::uh_oh:
return "Uh oh!";
case data_writer_error::parquet_conversion_error:
return "Parquet Conversion Error";
case data_writer_error::file_io_error:
return "File IO Error";
}
}

Expand Down
10 changes: 7 additions & 3 deletions src/v/datalake/record_multiplexer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,13 @@ record_multiplexer::end_of_stream() {
// TODO: once we have multiple _writers this should be a loop
if (_writer) {
chunked_vector<data_writer_result> ret;
data_writer_result res = co_await _writer->finish();
ret.push_back(res);
co_return ret;
auto res = co_await _writer->finish();
if (res.has_value()) {
ret.push_back(res.value());
co_return ret;
} else {
co_return res.error();
}
} else {
co_return chunked_vector<data_writer_result>{};
}
Expand Down
37 changes: 36 additions & 1 deletion src/v/datalake/tests/batching_parquet_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,17 @@
#include "iceberg/values.h"
#include "test_utils/tmp_dir.h"

#include <seastar/core/seastar.hh>

#include <arrow/io/file.h>
#include <arrow/table.h>
#include <gtest/gtest.h>
#include <parquet/arrow/reader.h>

#include <filesystem>

namespace datalake {

TEST(BatchingParquetWriterTest, WritesParquetFiles) {
temporary_dir tmp_dir("batching_parquet_writer");
std::filesystem::path file_path = tmp_dir.get_path() / "test_file.parquet";
Expand All @@ -36,7 +42,12 @@ TEST(BatchingParquetWriterTest, WritesParquetFiles) {
writer.add_data_struct(std::move(data), 1000).get0();
}

writer.finish().get0();
auto result = writer.finish().get0();
ASSERT_TRUE(result.has_value());
EXPECT_EQ(result.value().file_path, file_path);
EXPECT_EQ(result.value().record_count, num_rows);
auto true_file_size = std::filesystem::file_size(file_path);
EXPECT_EQ(result.value().file_size_bytes, true_file_size);

// Read the file and check the contents
auto reader = arrow::io::ReadableFile::Open(file_path).ValueUnsafe();
Expand All @@ -55,3 +66,27 @@ TEST(BatchingParquetWriterTest, WritesParquetFiles) {
EXPECT_EQ(table->num_rows(), num_rows);
EXPECT_EQ(table->num_columns(), 17);
}

TEST(BatchingParquetWriterTest, DeletesFileOnAbort) {
temporary_dir tmp_dir("batching_parquet_writer");
std::filesystem::path file_path = tmp_dir.get_path() / "test_file.parquet";
int num_rows = 1000;

datalake::batching_parquet_writer writer(
test_schema(iceberg::field_required::no), 500, 1000000);

writer.initialize(file_path).get0();

for (int i = 0; i < num_rows; i++) {
auto data = iceberg::tests::make_struct_value(
iceberg::tests::value_spec{
.forced_fixed_val = iobuf::from("Hello world")},
test_schema(iceberg::field_required::no));
writer.add_data_struct(std::move(data), 1000).get0();
}
writer.abort().get();
auto exists = ss::file_exists(file_path.c_str()).get();
EXPECT_FALSE(exists);
}

} // namespace datalake
4 changes: 2 additions & 2 deletions src/v/datalake/tests/gtest_record_multiplexer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ TEST(DatalakeMultiplexerTest, TestMultiplexer) {

ASSERT_TRUE(result.has_value());
ASSERT_EQ(result.value().size(), 1);
EXPECT_EQ(result.value()[0].row_count, record_count * batch_count);
EXPECT_EQ(result.value()[0].record_count, record_count * batch_count);
}
TEST(DatalakeMultiplexerTest, TestMultiplexerWriteError) {
int record_count = 10;
Expand All @@ -59,5 +59,5 @@ TEST(DatalakeMultiplexerTest, TestMultiplexerWriteError) {
});
auto res = reader.consume(std::move(multiplexer), model::no_timeout).get0();
ASSERT_TRUE(res.has_error());
EXPECT_EQ(res.error(), datalake::data_writer_error::uh_oh);
EXPECT_EQ(res.error(), datalake::data_writer_error::parquet_conversion_error);
}
13 changes: 8 additions & 5 deletions src/v/datalake/tests/test_data_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,17 @@ class test_data_writer : public data_writer {

ss::future<data_writer_error> add_data_struct(
iceberg::struct_value /* data */, int64_t /* approx_size */) override {
_result.row_count++;
data_writer_error status = _return_error ? data_writer_error::uh_oh
: data_writer_error::ok;
_result.record_count++;
data_writer_error status
= _return_error ? data_writer_error::parquet_conversion_error
: data_writer_error::ok;
return ss::make_ready_future<data_writer_error>(status);
}

ss::future<data_writer_result> finish() override {
return ss::make_ready_future<data_writer_result>(_result);
ss::future<result<data_writer_result, data_writer_error>>
finish() override {
return ss::make_ready_future<
result<data_writer_result, data_writer_error>>(_result);
}

private:
Expand Down

0 comments on commit 03b954d

Please sign in to comment.