Skip to content

Commit

Permalink
Merge pull request #13344 from redpanda-data/stephan/handler-backport…
Browse files Browse the repository at this point in the history
…-v231

Backport per-handler metrics to v23.1
  • Loading branch information
piyushredpanda authored Sep 11, 2023
2 parents 62e221c + 0329bae commit e14de94
Show file tree
Hide file tree
Showing 29 changed files with 858 additions and 94 deletions.
3 changes: 3 additions & 0 deletions src/v/cloud_storage/partition_probe.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,16 @@
#pragma once

#include "model/fundamental.h"
#include "utils/log_hist.h"

#include <seastar/core/metrics_registration.hh>

namespace cloud_storage {

class partition_probe {
public:
using hist_t = log_hist_internal;

explicit partition_probe(const model::ntp& ntp);

void add_bytes_read(uint64_t read) { _bytes_read += read; }
Expand Down
3 changes: 3 additions & 0 deletions src/v/cloud_storage/probe.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "cloud_storage/types.h"
#include "model/fundamental.h"
#include "seastarx.h"
#include "utils/log_hist.h"

#include <seastar/core/metrics_registration.hh>

Expand All @@ -25,6 +26,8 @@ class materialized_segments;
/// Cloud storage endpoint level probe
class remote_probe {
public:
using hist_t = log_hist_internal;

explicit remote_probe(
remote_metrics_disabled disabled,
remote_metrics_disabled public_disabled,
Expand Down
1 change: 1 addition & 0 deletions src/v/cloud_storage_clients/client_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

#include "cloud_roles/apply_credentials.h"
#include "cloud_storage_clients/client.h"
#include "cloud_storage_clients/client_probe.h"
#include "utils/gate_guard.h"
#include "utils/intrusive_list_helpers.h"

Expand Down
3 changes: 3 additions & 0 deletions src/v/cloud_storage_clients/client_probe.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "model/fundamental.h"
#include "net/types.h"
#include "ssx/metrics.h"
#include "utils/log_hist.h"

#include <seastar/core/metrics_registration.hh>

Expand All @@ -44,6 +45,8 @@ enum class op_type_tag { upload, download };
/// time-series.
class client_probe : public http::client_probe {
public:
using hist_t = log_hist_internal;

/// \brief Probe c-tor for S3 client
///
/// \param disable is used to switch the internal monitoring off
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
#include "http/client.h"
#include "seastarx.h"
#include "syschecks/syschecks.h"
#include "utils/hdr_hist.h"
#include "vlog.h"

#include <seastar/core/app-template.hh>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
#include "http/client.h"
#include "seastarx.h"
#include "syschecks/syschecks.h"
#include "utils/hdr_hist.h"
#include "vlog.h"

#include <seastar/core/app-template.hh>
Expand Down
1 change: 0 additions & 1 deletion src/v/http/demo/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
#include "rpc/types.h"
#include "seastarx.h"
#include "syschecks/syschecks.h"
#include "utils/hdr_hist.h"
#include "vlog.h"

#include <seastar/core/app-template.hh>
Expand Down
1 change: 1 addition & 0 deletions src/v/kafka/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ set(handlers_srcs
server/handlers/topics/topic_utils.cc
server/handlers/describe_producers.cc
server/handlers/describe_transactions.cc
server/handlers/handler_probe.cc
server/handlers/list_transactions.cc
)

Expand Down
33 changes: 19 additions & 14 deletions src/v/kafka/latency_probe.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,22 @@
#include "config/configuration.h"
#include "prometheus/prometheus_sanitize.h"
#include "ssx/metrics.h"
#include "utils/hdr_hist.h"
#include "utils/log_hist.h"

#include <seastar/core/metrics.hh>

namespace kafka {
class latency_probe {
public:
using hist_t = log_hist_internal;

latency_probe() = default;
latency_probe(const latency_probe&) = delete;
latency_probe& operator=(const latency_probe&) = delete;
latency_probe(latency_probe&&) = delete;
latency_probe& operator=(latency_probe&&) = delete;
~latency_probe() = default;

void setup_metrics() {
namespace sm = ss::metrics;

Expand All @@ -38,13 +47,13 @@ class latency_probe {
"fetch_latency_us",
sm::description("Fetch Latency"),
labels,
[this] { return _fetch_latency.seastar_histogram_logform(); })
[this] { return _fetch_latency.internal_histogram_logform(); })
.aggregate(aggregate_labels),
sm::make_histogram(
"produce_latency_us",
sm::description("Produce Latency"),
labels,
[this] { return _produce_latency.seastar_histogram_logform(); })
[this] { return _produce_latency.internal_histogram_logform(); })
.aggregate(aggregate_labels)});
}

Expand All @@ -61,32 +70,28 @@ class latency_probe {
"request_latency_seconds",
sm::description("Internal latency of kafka produce requests"),
{ssx::metrics::make_namespaced_label("request")("produce")},
[this] {
return ssx::metrics::report_default_histogram(
_produce_latency);
})
[this] { return _produce_latency.public_histogram_logform(); })
.aggregate({sm::shard_label}),
sm::make_histogram(
"request_latency_seconds",
sm::description("Internal latency of kafka consume requests"),
{ssx::metrics::make_namespaced_label("request")("consume")},
[this] {
return ssx::metrics::report_default_histogram(_fetch_latency);
})
[this] { return _fetch_latency.public_histogram_logform(); })
.aggregate({sm::shard_label}),
});
}

std::unique_ptr<hdr_hist::measurement> auto_produce_measurement() {
std::unique_ptr<hist_t::measurement> auto_produce_measurement() {
return _produce_latency.auto_measure();
}
std::unique_ptr<hdr_hist::measurement> auto_fetch_measurement() {

std::unique_ptr<hist_t::measurement> auto_fetch_measurement() {
return _fetch_latency.auto_measure();
}

private:
hdr_hist _produce_latency;
hdr_hist _fetch_latency;
hist_t _produce_latency;
hist_t _fetch_latency;
ss::metrics::metric_groups _metrics;
ss::metrics::metric_groups _public_metrics{
ssx::metrics::public_metrics_handle};
Expand Down
85 changes: 45 additions & 40 deletions src/v/kafka/server/connection_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ ss::future<> connection_context::process_one_request() {
_server.probe().header_corrupted();
co_return;
}
_server.handler_probe(h->key).add_bytes_received(sz.value());

try {
co_return co_await dispatch_method_once(
Expand Down Expand Up @@ -286,7 +287,8 @@ ss::future<session_resources> connection_context::throttle_request(
request_data r_data = request_data{
.request_key = hdr.key,
.client_id = ss::sstring{hdr.client_id.value_or("")}};
auto tracker = std::make_unique<request_tracker>(_server.probe());
auto& h_probe = _server.handler_probe(r_data.request_key);
auto tracker = std::make_unique<request_tracker>(_server.probe(), h_probe);
auto fut = ss::now();
if (delay.enforce > delay_t::clock::duration::zero()) {
fut = ss::sleep_abortable(delay.enforce, _server.abort_source());
Expand All @@ -300,15 +302,16 @@ ss::future<session_resources> connection_context::throttle_request(
r_data = std::move(r_data),
delay = delay.request,
track,
tracker = std::move(tracker)](ssx::semaphore_units units) mutable {
tracker = std::move(tracker),
&h_probe](ssx::semaphore_units units) mutable {
return server().get_request_unit().then(
[this,
r_data = std::move(r_data),
delay,
mem_units = std::move(units),
track,
tracker = std::move(tracker)](
ssx::semaphore_units qd_units) mutable {
tracker = std::move(tracker),
&h_probe](ssx::semaphore_units qd_units) mutable {
session_resources r{
.backpressure_delay = delay,
.memlocks = std::move(mem_units),
Expand All @@ -318,6 +321,7 @@ ss::future<session_resources> connection_context::throttle_request(
if (track) {
r.method_latency = _server.hist().auto_measure();
}
r.handler_latency = h_probe.auto_latency_measurement();
return r;
});
});
Expand Down Expand Up @@ -422,8 +426,7 @@ connection_context::dispatch_method_once(request_header hdr, size_t size) {
seq,
correlation,
self,
sres = std::move(sres)](
ss::future<> d) mutable {
sres](ss::future<> d) mutable {
/*
* if the dispatch/first stage failed, then we need to
* need to consume the second stage since it might be
Expand All @@ -440,9 +443,8 @@ connection_context::dispatch_method_once(request_header hdr, size_t size) {
"Discarding second stage failure {}",
e);
})
.finally([self, d = std::move(d)]() mutable {
self->_server.probe().service_error();
self->_server.probe().request_completed();
.finally([self, d = std::move(d), sres]() mutable {
sres->tracker->mark_errored();
return std::move(d);
});
}
Expand All @@ -454,7 +456,7 @@ connection_context::dispatch_method_once(request_header hdr, size_t size) {
_server.conn_gate(),
[this,
f = std::move(f),
sres = std::move(sres),
sres,
seq,
correlation]() mutable {
return f.then([this,
Expand All @@ -469,40 +471,41 @@ connection_context::dispatch_method_once(request_header hdr, size_t size) {
return maybe_process_responses();
});
})
.handle_exception([self](std::exception_ptr e) {
// ssx::spawn_with_gate already caught
// shutdown-like exceptions, so we should only
// be taking this path for real errors. That
// also means that on shutdown we don't bother
// to call shutdown_input on the connection, so
// rely on any future reader to check the abort
// source before considering reading the
// connection.

auto disconnected
= net::is_disconnect_exception(e);
if (disconnected) {
vlog(
klog.info,
"Disconnected {} ({})",
self->conn->addr,
disconnected.value());
} else {
vlog(
klog.warn,
"Error processing request: {}",
e);
}

self->_server.probe().service_error();
self->conn->shutdown_input();
});
.handle_exception(
[self, sres](std::exception_ptr e) {
// ssx::spawn_with_gate already caught
// shutdown-like exceptions, so we should only
// be taking this path for real errors. That
// also means that on shutdown we don't bother
// to call shutdown_input on the connection,
// so rely on any future reader to check the
// abort source before considering reading the
// connection.
auto disconnected
= net::is_disconnect_exception(e);
if (disconnected) {
vlog(
klog.info,
"Disconnected {} ({})",
self->conn->addr,
disconnected.value());
} else {
vlog(
klog.warn,
"Error processing request: {}",
e);
}

sres->tracker->mark_errored();
self->conn->shutdown_input();
});
return d;
})
.handle_exception([self](std::exception_ptr e) {
.handle_exception([self, sres](std::exception_ptr e) {
vlog(
klog.info, "Detected error dispatching request: {}", e);
self->conn->shutdown_input();
sres->tracker->mark_errored();
});
});
})
Expand Down Expand Up @@ -566,6 +569,7 @@ ss::future<> connection_context::maybe_process_responses() {
*_snc_quota_context, response_size);
}
}
_server.handler_probe(request_key).add_bytes_sent(response_size);
try {
return conn->write(std::move(msg))
.then([] {
Expand All @@ -574,8 +578,9 @@ ss::future<> connection_context::maybe_process_responses() {
})
// release the resources only once it has been written to the
// connection.
.finally([resources = std::move(resp_and_res.resources)] {});
.finally([resources = resp_and_res.resources] {});
} catch (...) {
resp_and_res.resources->tracker->mark_errored();
vlog(
klog.debug,
"Failed to process request: {}",
Expand Down
Loading

0 comments on commit e14de94

Please sign in to comment.