Skip to content

Commit

Permalink
Merge pull request #13762 from abhijat/backport-pr-12888-v23.2.x-312
Browse files Browse the repository at this point in the history
Backport PRs 12888 and 12479
  • Loading branch information
piyushredpanda authored Sep 28, 2023
2 parents 00cdef8 + 8a9ab0c commit e899ab6
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 23 deletions.
17 changes: 11 additions & 6 deletions src/v/cloud_storage/remote_segment.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1341,14 +1341,19 @@ remote_segment_batch_reader::read_some(
if (
_bytes_consumed != 0 && _bytes_consumed == new_bytes_consumed.value()
&& !_config.over_budget) {
vlog(
_ctxlog.error,
"segment_reader is stuck, segment ntp: {}, _cur_rp_offset: {}, "
"_bytes_consumed: "
"{}",
const auto msg = fmt::format(
"segment_reader is stuck, segment ntp: {}, _cur_rp_offset: "
"{}, "
"_bytes_consumed: {}, parser error state: {}",
_seg->get_ntp(),
_cur_rp_offset,
_bytes_consumed);
_bytes_consumed,
_parser->error());
if (_parser->error() == storage::parser_errc::end_of_stream) {
vlog(_ctxlog.info, "{}", msg);
} else {
vlog(_ctxlog.error, "{}", msg);
}
_is_unexpected_eof = true;
co_return ss::circular_buffer<model::record_batch>{};
}
Expand Down
2 changes: 2 additions & 0 deletions src/v/storage/parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ class continuous_batch_parser {
/// \brief cleans up async resources like the input stream
ss::future<> close() { return _input.close(); }

parser_errc error() const { return _err; }

private:
/// \brief consumes _one_ full batch.
ss::future<result<batch_consumer::stop_parser>> consume_one();
Expand Down
42 changes: 26 additions & 16 deletions src/v/storage/parser_errc.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,26 +23,31 @@ enum class parser_errc {
fallocated_file_read_zero_bytes_for_header,
not_enough_bytes_in_parser_for_one_record,
};

inline std::string to_string(parser_errc err) {
switch (err) {
case parser_errc::none:
return "storage::parser_errc::success";
case parser_errc::end_of_stream:
return "parser_errc::end_of_stream";
case parser_errc::header_only_crc_missmatch:
return "parser_errc::header_only_crc_missmatch";
case parser_errc::input_stream_not_enough_bytes:
return "parser_errc::input_stream_not_enough_bytes";
case parser_errc::fallocated_file_read_zero_bytes_for_header:
return "parser_errc::fallocated_file_read_zero_bytes_for_header";
case parser_errc::not_enough_bytes_in_parser_for_one_record:
return "parser_errc::not_enough_bytes_in_parser_for_one_record";
default:
return "storage::parser_errc::unknown";
}
}

struct parser_errc_category final : public std::error_category {
const char* name() const noexcept final { return "storage::parser_errc"; }

std::string message(int c) const final {
switch (static_cast<parser_errc>(c)) {
case parser_errc::none:
return "storage::parser_errc::success";
case parser_errc::end_of_stream:
return "parser_errc::end_of_stream";
case parser_errc::header_only_crc_missmatch:
return "parser_errc::header_only_crc_missmatch";
case parser_errc::input_stream_not_enough_bytes:
return "parser_errc::input_stream_not_enough_bytes";
case parser_errc::fallocated_file_read_zero_bytes_for_header:
return "parser_errc::fallocated_file_read_zero_bytes_for_header";
case parser_errc::not_enough_bytes_in_parser_for_one_record:
return "parser_errc::not_enough_bytes_in_parser_for_one_record";
default:
return "storage::parser_errc::unknown";
}
return to_string(static_cast<parser_errc>(c));
}
};
inline const std::error_category& error_category() noexcept {
Expand All @@ -52,6 +57,11 @@ inline const std::error_category& error_category() noexcept {
inline std::error_code make_error_code(parser_errc e) noexcept {
return std::error_code(static_cast<int>(e), error_category());
}

inline std::ostream& operator<<(std::ostream& os, parser_errc err) {
return os << to_string(err);
}

} // namespace storage
namespace std {
template<>
Expand Down
2 changes: 1 addition & 1 deletion tests/rptest/tests/e2e_shadow_indexing_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1142,7 +1142,7 @@ def consume(self):

consumer.free()

@cluster(num_nodes=4)
@cluster(num_nodes=4, log_allow_list=[r"cluster.*Can't add segment"])
@matrix(cloud_storage_type=get_cloud_storage_type())
def test_spillover(self, cloud_storage_type):

Expand Down

0 comments on commit e899ab6

Please sign in to comment.