Skip to content

Commit

Permalink
Merge pull request #23358 from andrwng/archival-compaction-retention
Browse files Browse the repository at this point in the history
archival: skip spillover retention if not collectable
  • Loading branch information
lf-rep authored Sep 18, 2024
2 parents 75eaed5 + 26df815 commit 76ea911
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 0 deletions.
64 changes: 64 additions & 0 deletions src/v/cloud_storage/tests/cloud_storage_e2e_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,70 @@ using tests::kv_t;

static ss::logger e2e_test_log("e2e_test");

class ManualFixture
: public s3_imposter_fixture
, public manual_metadata_upload_mixin
, public redpanda_thread_fixture
, public enable_cloud_storage_fixture
, public ::testing::Test {
public:
ManualFixture()
: redpanda_thread_fixture(
redpanda_thread_fixture::init_cloud_storage_tag{},
httpd_port_number()) {
// No expectations: tests will PUT and GET organically.
set_expectations_and_listen({});
wait_for_controller_leadership().get();
}

scoped_config test_local_cfg;
};

TEST_F(ManualFixture, TestSpilloverRetentionCompactedTopic) {
test_local_cfg.get("cloud_storage_disable_upload_loop_for_tests")
.set_value(true);
test_local_cfg.get("cloud_storage_spillover_manifest_max_segments")
.set_value(std::make_optional<size_t>(5));
test_local_cfg.get("cloud_storage_spillover_manifest_size")
.set_value(std::optional<size_t>{});
test_local_cfg.get("log_retention_ms")
.set_value(std::make_optional<std::chrono::milliseconds>(1ms));
const model::topic topic_name("tapioca");
model::ntp ntp(model::kafka_namespace, topic_name, 0);

cluster::topic_properties props;
props.shadow_indexing = model::shadow_indexing_mode::full;
props.cleanup_policy_bitflags = model::cleanup_policy_bitflags::compaction;
add_topic({model::kafka_namespace, topic_name}, 1, props).get();
wait_for_leader(ntp).get();

const auto records_per_seg = 5;
const auto num_segs = 100;
auto partition = app.partition_manager.local().get(ntp);
auto& archiver = partition->archiver().value().get();
tests::remote_segment_generator gen(make_kafka_client().get(), *partition);
auto total_records = gen.num_segments(num_segs)
.batches_per_segment(records_per_seg)
.produce()
.get();
ASSERT_GE(total_records, 500);
ASSERT_TRUE(archiver.sync_for_tests().get());
archiver.apply_spillover().get();
ss::sleep(5s).get();
archiver.apply_archive_retention().get();

tests::kafka_list_offsets_transport lister(make_kafka_client().get());
lister.start().get();

auto offset
= lister.start_offset_for_partition(topic_name, model::partition_id(0))
.get();
ASSERT_EQ(offset(), 0);
ASSERT_EQ(
archiver.manifest().full_log_start_offset().value_or(model::offset{})(),
0);
}

class EndToEndFixture
: public s3_imposter_fixture
, public manual_metadata_upload_mixin
Expand Down
4 changes: 4 additions & 0 deletions src/v/cluster/archival/ntp_archiver_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2291,6 +2291,10 @@ ss::future<> ntp_archiver::apply_archive_retention() {
}

const auto& ntp_conf = _parent.get_ntp_config();
if (!ntp_conf.is_collectable()) {
vlog(_rtclog.trace, "NTP is not collectable");
co_return;
}
std::optional<size_t> retention_bytes = ntp_conf.retention_bytes();
std::optional<std::chrono::milliseconds> retention_ms
= ntp_conf.retention_duration();
Expand Down

0 comments on commit 76ea911

Please sign in to comment.