From c339bce5bff74c1ec305e718d7473aaa971535ea Mon Sep 17 00:00:00 2001 From: Denis Rystsov Date: Fri, 14 Jul 2023 08:40:58 -0700 Subject: [PATCH] txns: fix list txns failure when it's used before find_coordinator find_coordinator creates txn coordinator's topic when it's used for the first time. list txns api expects the topic to be already created so it used to fail when the api was invoked before find_coordinator. Fixed the problem by creating the topic on the first list txns call. fixes https://github.com/redpanda-data/redpanda/issues/11947 (cherry picked from commit 85ad993098860d37cd80dc6d9400aff134b2ae16) --- src/v/cluster/tx_registry_frontend.cc | 46 +++++++++++++++++-- src/v/cluster/tx_registry_frontend.h | 2 + src/v/kafka/server/server.cc | 11 +++++ .../tests/transaction_kafka_api_test.py | 8 +++- 4 files changed, 61 insertions(+), 6 deletions(-) diff --git a/src/v/cluster/tx_registry_frontend.cc b/src/v/cluster/tx_registry_frontend.cc index 7454646db042..1ccc70651445 100644 --- a/src/v/cluster/tx_registry_frontend.cc +++ b/src/v/cluster/tx_registry_frontend.cc @@ -61,14 +61,49 @@ tx_registry_frontend::tx_registry_frontend( config::shard_local_cfg().metadata_dissemination_retry_delay_ms.value()) { } +ss::future tx_registry_frontend::ensure_tx_topic_exists() { + if (_metadata_cache.local().contains(model::tx_manager_nt)) { + co_return true; + } + + if (!co_await try_create_tx_topic()) { + vlog(clusterlog.warn, "failed to create {}", model::tx_manager_nt); + co_return false; + } + + auto retries = _metadata_dissemination_retries; + auto delay_ms = _metadata_dissemination_retry_delay_ms; + std::optional error; + while (!_as.abort_requested() && 0 < retries--) { + auto is_cache_filled = _metadata_cache.local().contains( + model::tx_manager_nt); + if (unlikely(!is_cache_filled)) { + error = vformat( + fmt::runtime("can't find {} in the metadata_cache cache"), + model::tx_manager_nt); + vlog( + clusterlog.trace, + "waiting for {} to fill metadata_cache cache, retries left: {}", + model::tx_manager_nt, + retries); + co_await sleep_abortable(delay_ms, _as); + continue; + } + co_return true; + } + + if (error) { + vlog(clusterlog.warn, "{}", error.value()); + } + + co_return false; +} + ss::future tx_registry_frontend::find_coordinator( kafka::transactional_id tid, model::timeout_clock::duration timeout) { if (!_metadata_cache.local().contains(model::tx_manager_nt)) { if (!co_await try_create_tx_topic()) { - vlog( - clusterlog.warn, - "can't find {} in the metadata cache", - model::tx_manager_nt); + vlog(clusterlog.warn, "failed to create {}", model::tx_manager_nt); co_return find_coordinator_reply( std::nullopt, std::nullopt, errc::topic_not_exists); } @@ -316,7 +351,8 @@ ss::future tx_registry_frontend::try_create_tx_topic() { return _controller->get_topics_frontend() .local() .autocreate_topics( - {std::move(topic)}, config::shard_local_cfg().create_topic_timeout_ms()) + {std::move(topic)}, + config::shard_local_cfg().create_topic_timeout_ms() * partitions_amount) .then([](std::vector res) { vassert(res.size() == 1, "expected exactly one result"); if (res[0].ec == cluster::errc::topic_already_exists) { diff --git a/src/v/cluster/tx_registry_frontend.h b/src/v/cluster/tx_registry_frontend.h index 23305934f122..68423c3790ff 100644 --- a/src/v/cluster/tx_registry_frontend.h +++ b/src/v/cluster/tx_registry_frontend.h @@ -35,6 +35,8 @@ class tx_registry_frontend { ss::sharded&, ss::sharded&); + ss::future ensure_tx_topic_exists(); + ss::future find_coordinator(kafka::transactional_id, model::timeout_clock::duration); diff --git a/src/v/kafka/server/server.cc b/src/v/kafka/server/server.cc index 021efbce936a..53e334897afc 100644 --- a/src/v/kafka/server/server.cc +++ b/src/v/kafka/server/server.cc @@ -11,6 +11,7 @@ #include "cluster/id_allocator_frontend.h" #include "cluster/topics_frontend.h" +#include "cluster/tx_registry_frontend.h" #include "config/broker_authn_endpoint.h" #include "config/configuration.h" #include "config/node_config.h" @@ -1566,6 +1567,16 @@ list_transactions_handler::handle(request_context ctx, ss::smp_service_group) { return true; }; + auto& tx_registry = ctx.tx_registry_frontend(); + if (!co_await tx_registry.ensure_tx_topic_exists()) { + vlog( + klog.error, + "Can not return list of transactions. Failed to create {}", + model::tx_manager_nt); + response.data.error_code = kafka::error_code::unknown_server_error; + co_return co_await ctx.respond(std::move(response)); + } + auto& tx_frontend = ctx.tx_gateway_frontend(); auto txs = co_await tx_frontend.get_all_transactions(); if (txs.has_value()) { diff --git a/tests/rptest/tests/transaction_kafka_api_test.py b/tests/rptest/tests/transaction_kafka_api_test.py index 7f64fe4b3153..6c1b06066678 100644 --- a/tests/rptest/tests/transaction_kafka_api_test.py +++ b/tests/rptest/tests/transaction_kafka_api_test.py @@ -30,7 +30,8 @@ def __init__(self, test_context): "tx_timeout_delay_ms": 10000000, "abort_timed_out_transactions_interval_ms": 10000000, - 'enable_leader_balancer': False + "enable_leader_balancer": False, + "transaction_coordinator_partitions": 4 }) self.kafka_cli = KafkaCliTools(self.redpanda, "3.0.0") @@ -136,6 +137,11 @@ def test_describe_transactions(self): tpoic_partition = f"{topic}-{partition}" assert tpoic_partition in expected_partitions + @cluster(num_nodes=3) + def test_empty_list_transactions(self): + txs_info = self.kafka_cli.list_transactions() + assert len(txs_info) == 0 + @cluster(num_nodes=3) def test_list_transactions(self): producer1 = ck.Producer({