diff --git a/src/v/datalake/batching_parquet_writer.cc b/src/v/datalake/batching_parquet_writer.cc index 68fa520b6f50..17925dfaaad5 100644 --- a/src/v/datalake/batching_parquet_writer.cc +++ b/src/v/datalake/batching_parquet_writer.cc @@ -10,6 +10,7 @@ #include "datalake/batching_parquet_writer.h" +#include "bytes/iostream.h" #include "bytes/iostream.h" #include "datalake/arrow_translator.h" #include "datalake/data_writer_interface.h" @@ -34,53 +35,129 @@ batching_parquet_writer::batching_parquet_writer( , _row_count_threshold{row_count_threshold} , _byte_count_threshold{byte_count_threshold} {} -ss::future<> +ss::future batching_parquet_writer::initialize(std::filesystem::path output_file_path) { - _output_file = co_await ss::open_file_dma( - output_file_path.string(), - ss::open_flags::create | ss::open_flags::truncate | ss::open_flags::wo); + _output_file_path = std::move(output_file_path); + try { + _output_file = co_await ss::open_file_dma( + _output_file_path.string(), + ss::open_flags::create | ss::open_flags::truncate + | ss::open_flags::wo); + } catch (...) { + co_return data_writer_error::file_io_error; + } + bool error = false; + try { + _output_stream = co_await ss::make_file_output_stream(_output_file); + } catch (...) { + error = true; + } + if (error) { + co_await _output_file.close(); + co_return data_writer_error::file_io_error; + } - _output_stream = co_await ss::make_file_output_stream(_output_file); + _result.file_path = _output_file_path.string(); + co_return data_writer_error::ok; } ss::future batching_parquet_writer::add_data_struct( iceberg::struct_value data, int64_t approx_size) { - _iceberg_to_arrow.add_data(std::move(data)); + bool error = false; + try { + _iceberg_to_arrow.add_data(std::move(data)); + } catch (...) { + error = true; + } + if (error) { + co_await abort(); + co_return data_writer_error::parquet_conversion_error; + } _row_count++; _byte_count += approx_size; if ( _row_count >= _row_count_threshold || _byte_count > _byte_count_threshold) { - co_await write_row_group(); + co_return co_await write_row_group(); } co_return data_writer_error::ok; } -ss::future batching_parquet_writer::finish() { - // TODO: fill in result structure - return write_row_group() - .then([this] { - data_writer_result res; - iobuf out = _arrow_to_iobuf.close_and_take_iobuf(); - - return write_iobuf_to_output_stream(std::move(out), _output_stream) - .then([res] { return res; }); - }) - .finally([this] { return _output_stream.close(); }); +ss::future> +batching_parquet_writer::finish() { + auto write_result = co_await write_row_group(); + if (write_result != data_writer_error::ok) { + co_await abort(); + co_return write_result; + } + bool error = false; + iobuf out; + try { + out = _arrow_to_iobuf.close_and_take_iobuf(); + _result.file_size_bytes += out.size_bytes(); + } catch (...) { + error = true; + } + if (error) { + co_await abort(); + co_return data_writer_error::parquet_conversion_error; + } + + try { + co_await write_iobuf_to_output_stream(std::move(out), _output_stream); + co_await _output_stream.close(); + } catch (...) { + error = true; + } + if (error) { + co_await abort(); + co_return data_writer_error::file_io_error; + } + + co_return _result; } -ss::future<> batching_parquet_writer::write_row_group() { +ss::future batching_parquet_writer::write_row_group() { if (_row_count == 0) { // This can happen if finish() is called when there is no new data. - co_return; - } - auto chunk = _iceberg_to_arrow.take_chunk(); - _row_count = 0; - _byte_count = 0; - _arrow_to_iobuf.add_arrow_array(chunk); - iobuf out = _arrow_to_iobuf.take_iobuf(); - co_await write_iobuf_to_output_stream(std::move(out), _output_stream); + co_return data_writer_error::ok; + } + bool error = false; + iobuf out; + try { + auto chunk = _iceberg_to_arrow.take_chunk(); + _result.record_count += _row_count; + _row_count = 0; + _byte_count = 0; + _arrow_to_iobuf.add_arrow_array(chunk); + out = _arrow_to_iobuf.take_iobuf(); + _result.file_size_bytes += out.size_bytes(); + } catch (...) { + error = true; + } + if (error) { + co_await abort(); + co_return data_writer_error::parquet_conversion_error; + } + try { + co_await write_iobuf_to_output_stream(std::move(out), _output_stream); + } catch (...) { + error = true; + } + if (error) { + co_await abort(); + co_return data_writer_error::file_io_error; + } + co_return data_writer_error::ok; +} + +ss::future<> batching_parquet_writer::abort() { + co_await _output_stream.close(); + auto exists = co_await ss::file_exists(_output_file_path.c_str()); + if (exists) { + co_await ss::remove_file(_output_file_path.c_str()); + } } } // namespace datalake diff --git a/src/v/datalake/batching_parquet_writer.h b/src/v/datalake/batching_parquet_writer.h index 22e266a153c9..3f4d7c70620b 100644 --- a/src/v/datalake/batching_parquet_writer.h +++ b/src/v/datalake/batching_parquet_writer.h @@ -9,6 +9,7 @@ */ #pragma once +#include "base/outcome.h" #include "datalake/arrow_translator.h" #include "datalake/data_writer_interface.h" #include "datalake/parquet_writer.h" @@ -42,15 +43,20 @@ class batching_parquet_writer : public data_writer { size_t row_count_threshold, size_t byte_count_threshold); - ss::future<> initialize(std::filesystem::path output_file_path); + ss::future + initialize(std::filesystem::path output_file_path); ss::future add_data_struct(iceberg::struct_value data, int64_t approx_size) override; - ss::future finish() override; + ss::future> finish() override; + + // Close the file handle, delete any temporary data and clean up any other + // state. + ss::future<> abort(); private: - ss::future<> write_row_group(); + ss::future write_row_group(); // translating arrow_translator _iceberg_to_arrow; @@ -63,8 +69,10 @@ class batching_parquet_writer : public data_writer { size_t _byte_count = 0; // Output + std::filesystem::path _output_file_path; ss::file _output_file; ss::output_stream _output_stream; + data_writer_result _result; }; } // namespace datalake diff --git a/src/v/datalake/data_writer_interface.h b/src/v/datalake/data_writer_interface.h index 5148b39a8cfe..09b2338e4152 100644 --- a/src/v/datalake/data_writer_interface.h +++ b/src/v/datalake/data_writer_interface.h @@ -7,6 +7,7 @@ * * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md */ +#include "base/outcome.h" #include "datalake/schemaless_translator.h" #include "iceberg/datatypes.h" #include "iceberg/values.h" @@ -20,7 +21,9 @@ namespace datalake { enum class data_writer_error { ok = 0, - uh_oh, + parquet_conversion_error, + file_io_error, + }; struct data_writer_result @@ -28,9 +31,11 @@ struct data_writer_result data_writer_result, serde::version<0>, serde::compat_version<0>> { - size_t row_count = 0; + ss::sstring file_path = ""; + size_t record_count = 0; + size_t file_size_bytes = 0; - auto serde_fields() { return std::tie(row_count); } + auto serde_fields() { return std::tie(record_count); } }; class data_writer { @@ -41,7 +46,8 @@ class data_writer { iceberg::struct_value /* data */, int64_t /* approx_size */) = 0; - virtual ss::future finish() = 0; + virtual ss::future> + finish() = 0; }; class data_writer_factory { @@ -59,8 +65,10 @@ struct data_writer_error_category : std::error_category { switch (static_cast(ev)) { case data_writer_error::ok: return "Ok"; - case data_writer_error::uh_oh: - return "Uh oh!"; + case data_writer_error::parquet_conversion_error: + return "Parquet Conversion Error"; + case data_writer_error::file_io_error: + return "File IO Error"; } } diff --git a/src/v/datalake/record_multiplexer.cc b/src/v/datalake/record_multiplexer.cc index 288be586d276..dcc7d0b6c172 100644 --- a/src/v/datalake/record_multiplexer.cc +++ b/src/v/datalake/record_multiplexer.cc @@ -70,9 +70,13 @@ record_multiplexer::end_of_stream() { // TODO: once we have multiple _writers this should be a loop if (_writer) { chunked_vector ret; - data_writer_result res = co_await _writer->finish(); - ret.push_back(res); - co_return ret; + auto res = co_await _writer->finish(); + if (res.has_value()) { + ret.push_back(res.value()); + co_return ret; + } else { + co_return res.error(); + } } else { co_return chunked_vector{}; } diff --git a/src/v/datalake/tests/batching_parquet_writer_test.cc b/src/v/datalake/tests/batching_parquet_writer_test.cc index 532a5506cd52..c5da81eafe88 100644 --- a/src/v/datalake/tests/batching_parquet_writer_test.cc +++ b/src/v/datalake/tests/batching_parquet_writer_test.cc @@ -13,11 +13,17 @@ #include "iceberg/values.h" #include "test_utils/tmp_dir.h" +#include + #include #include #include #include +#include + +namespace datalake { + TEST(BatchingParquetWriterTest, WritesParquetFiles) { temporary_dir tmp_dir("batching_parquet_writer"); std::filesystem::path file_path = tmp_dir.get_path() / "test_file.parquet"; @@ -36,7 +42,12 @@ TEST(BatchingParquetWriterTest, WritesParquetFiles) { writer.add_data_struct(std::move(data), 1000).get0(); } - writer.finish().get0(); + auto result = writer.finish().get0(); + ASSERT_TRUE(result.has_value()); + EXPECT_EQ(result.value().file_path, file_path); + EXPECT_EQ(result.value().record_count, num_rows); + auto true_file_size = std::filesystem::file_size(file_path); + EXPECT_EQ(result.value().file_size_bytes, true_file_size); // Read the file and check the contents auto reader = arrow::io::ReadableFile::Open(file_path).ValueUnsafe(); @@ -55,3 +66,27 @@ TEST(BatchingParquetWriterTest, WritesParquetFiles) { EXPECT_EQ(table->num_rows(), num_rows); EXPECT_EQ(table->num_columns(), 17); } + +TEST(BatchingParquetWriterTest, DeletesFileOnAbort) { + temporary_dir tmp_dir("batching_parquet_writer"); + std::filesystem::path file_path = tmp_dir.get_path() / "test_file.parquet"; + int num_rows = 1000; + + datalake::batching_parquet_writer writer( + test_schema(iceberg::field_required::no), 500, 1000000); + + writer.initialize(file_path).get0(); + + for (int i = 0; i < num_rows; i++) { + auto data = iceberg::tests::make_struct_value( + iceberg::tests::value_spec{ + .forced_fixed_val = iobuf::from("Hello world")}, + test_schema(iceberg::field_required::no)); + writer.add_data_struct(std::move(data), 1000).get0(); + } + writer.abort().get(); + auto exists = ss::file_exists(file_path.c_str()).get(); + EXPECT_FALSE(exists); +} + +} // namespace datalake diff --git a/src/v/datalake/tests/gtest_record_multiplexer_test.cc b/src/v/datalake/tests/gtest_record_multiplexer_test.cc index 25f3a8bf9796..99158ab494e0 100644 --- a/src/v/datalake/tests/gtest_record_multiplexer_test.cc +++ b/src/v/datalake/tests/gtest_record_multiplexer_test.cc @@ -37,7 +37,7 @@ TEST(DatalakeMultiplexerTest, TestMultiplexer) { ASSERT_TRUE(result.has_value()); ASSERT_EQ(result.value().size(), 1); - EXPECT_EQ(result.value()[0].row_count, record_count * batch_count); + EXPECT_EQ(result.value()[0].record_count, record_count * batch_count); } TEST(DatalakeMultiplexerTest, TestMultiplexerWriteError) { int record_count = 10; @@ -59,5 +59,5 @@ TEST(DatalakeMultiplexerTest, TestMultiplexerWriteError) { }); auto res = reader.consume(std::move(multiplexer), model::no_timeout).get0(); ASSERT_TRUE(res.has_error()); - EXPECT_EQ(res.error(), datalake::data_writer_error::uh_oh); + EXPECT_EQ(res.error(), datalake::data_writer_error::parquet_conversion_error); } diff --git a/src/v/datalake/tests/test_data_writer.h b/src/v/datalake/tests/test_data_writer.h index 0552b1ac2dd4..68d93c195592 100644 --- a/src/v/datalake/tests/test_data_writer.h +++ b/src/v/datalake/tests/test_data_writer.h @@ -29,14 +29,17 @@ class test_data_writer : public data_writer { ss::future add_data_struct( iceberg::struct_value /* data */, int64_t /* approx_size */) override { - _result.row_count++; - data_writer_error status = _return_error ? data_writer_error::uh_oh - : data_writer_error::ok; + _result.record_count++; + data_writer_error status + = _return_error ? data_writer_error::parquet_conversion_error + : data_writer_error::ok; return ss::make_ready_future(status); } - ss::future finish() override { - return ss::make_ready_future(_result); + ss::future> + finish() override { + return ss::make_ready_future< + result>(_result); } private: