Skip to content

Commit

Permalink
treewide: replace hdr_hist with log_hist
Browse files Browse the repository at this point in the history
This reduces the memory required to store a histogram from kilobytes to
208 bytes. It also speeds up recording to the histogram by 2x.

(cherry picked from commit c2959f5)
  • Loading branch information
ballard26 authored and StephanDollberg committed Sep 8, 2023
1 parent 9f3abd2 commit 0329bae
Show file tree
Hide file tree
Showing 22 changed files with 73 additions and 51 deletions.
3 changes: 3 additions & 0 deletions src/v/cloud_storage/partition_probe.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,16 @@
#pragma once

#include "model/fundamental.h"
#include "utils/log_hist.h"

#include <seastar/core/metrics_registration.hh>

namespace cloud_storage {

class partition_probe {
public:
using hist_t = log_hist_internal;

explicit partition_probe(const model::ntp& ntp);

void add_bytes_read(uint64_t read) { _bytes_read += read; }
Expand Down
3 changes: 3 additions & 0 deletions src/v/cloud_storage/probe.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "cloud_storage/types.h"
#include "model/fundamental.h"
#include "seastarx.h"
#include "utils/log_hist.h"

#include <seastar/core/metrics_registration.hh>

Expand All @@ -25,6 +26,8 @@ class materialized_segments;
/// Cloud storage endpoint level probe
class remote_probe {
public:
using hist_t = log_hist_internal;

explicit remote_probe(
remote_metrics_disabled disabled,
remote_metrics_disabled public_disabled,
Expand Down
1 change: 1 addition & 0 deletions src/v/cloud_storage_clients/client_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

#include "cloud_roles/apply_credentials.h"
#include "cloud_storage_clients/client.h"
#include "cloud_storage_clients/client_probe.h"
#include "utils/gate_guard.h"
#include "utils/intrusive_list_helpers.h"

Expand Down
3 changes: 3 additions & 0 deletions src/v/cloud_storage_clients/client_probe.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "model/fundamental.h"
#include "net/types.h"
#include "ssx/metrics.h"
#include "utils/log_hist.h"

#include <seastar/core/metrics_registration.hh>

Expand All @@ -44,6 +45,8 @@ enum class op_type_tag { upload, download };
/// time-series.
class client_probe : public http::client_probe {
public:
using hist_t = log_hist_internal;

/// \brief Probe c-tor for S3 client
///
/// \param disable is used to switch the internal monitoring off
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
#include "http/client.h"
#include "seastarx.h"
#include "syschecks/syschecks.h"
#include "utils/hdr_hist.h"
#include "vlog.h"

#include <seastar/core/app-template.hh>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
#include "http/client.h"
#include "seastarx.h"
#include "syschecks/syschecks.h"
#include "utils/hdr_hist.h"
#include "vlog.h"

#include <seastar/core/app-template.hh>
Expand Down
1 change: 0 additions & 1 deletion src/v/http/demo/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
#include "rpc/types.h"
#include "seastarx.h"
#include "syschecks/syschecks.h"
#include "utils/hdr_hist.h"
#include "vlog.h"

#include <seastar/core/app-template.hh>
Expand Down
33 changes: 19 additions & 14 deletions src/v/kafka/latency_probe.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,22 @@
#include "config/configuration.h"
#include "prometheus/prometheus_sanitize.h"
#include "ssx/metrics.h"
#include "utils/hdr_hist.h"
#include "utils/log_hist.h"

#include <seastar/core/metrics.hh>

namespace kafka {
class latency_probe {
public:
using hist_t = log_hist_internal;

latency_probe() = default;
latency_probe(const latency_probe&) = delete;
latency_probe& operator=(const latency_probe&) = delete;
latency_probe(latency_probe&&) = delete;
latency_probe& operator=(latency_probe&&) = delete;
~latency_probe() = default;

void setup_metrics() {
namespace sm = ss::metrics;

Expand All @@ -38,13 +47,13 @@ class latency_probe {
"fetch_latency_us",
sm::description("Fetch Latency"),
labels,
[this] { return _fetch_latency.seastar_histogram_logform(); })
[this] { return _fetch_latency.internal_histogram_logform(); })
.aggregate(aggregate_labels),
sm::make_histogram(
"produce_latency_us",
sm::description("Produce Latency"),
labels,
[this] { return _produce_latency.seastar_histogram_logform(); })
[this] { return _produce_latency.internal_histogram_logform(); })
.aggregate(aggregate_labels)});
}

Expand All @@ -61,32 +70,28 @@ class latency_probe {
"request_latency_seconds",
sm::description("Internal latency of kafka produce requests"),
{ssx::metrics::make_namespaced_label("request")("produce")},
[this] {
return ssx::metrics::report_default_histogram(
_produce_latency);
})
[this] { return _produce_latency.public_histogram_logform(); })
.aggregate({sm::shard_label}),
sm::make_histogram(
"request_latency_seconds",
sm::description("Internal latency of kafka consume requests"),
{ssx::metrics::make_namespaced_label("request")("consume")},
[this] {
return ssx::metrics::report_default_histogram(_fetch_latency);
})
[this] { return _fetch_latency.public_histogram_logform(); })
.aggregate({sm::shard_label}),
});
}

std::unique_ptr<hdr_hist::measurement> auto_produce_measurement() {
std::unique_ptr<hist_t::measurement> auto_produce_measurement() {
return _produce_latency.auto_measure();
}
std::unique_ptr<hdr_hist::measurement> auto_fetch_measurement() {

std::unique_ptr<hist_t::measurement> auto_fetch_measurement() {
return _fetch_latency.auto_measure();
}

private:
hdr_hist _produce_latency;
hdr_hist _fetch_latency;
hist_t _produce_latency;
hist_t _fetch_latency;
ss::metrics::metric_groups _metrics;
ss::metrics::metric_groups _public_metrics{
ssx::metrics::public_metrics_handle};
Expand Down
3 changes: 1 addition & 2 deletions src/v/kafka/server/connection_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
#include "security/mtls.h"
#include "security/sasl_authentication.h"
#include "ssx/semaphore.h"
#include "utils/hdr_hist.h"
#include "utils/log_hist.h"
#include "utils/named_type.h"

Expand Down Expand Up @@ -102,7 +101,7 @@ struct session_resources {
ss::lowres_clock::duration backpressure_delay;
ssx::semaphore_units memlocks;
ssx::semaphore_units queue_units;
std::unique_ptr<hdr_hist::measurement> method_latency;
std::unique_ptr<server::hist_t::measurement> method_latency;
std::unique_ptr<handler_probe::hist_t::measurement> handler_latency;
std::unique_ptr<request_tracker> tracker;
request_data request_data;
Expand Down
4 changes: 2 additions & 2 deletions src/v/kafka/server/handlers/fetch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ static void fill_fetch_responses(
op_context& octx,
std::vector<read_result> results,
std::vector<op_context::response_placeholder_ptr> responses,
std::vector<std::unique_ptr<hdr_hist::measurement>> metrics) {
std::vector<std::unique_ptr<shard_fetch::hist_t::measurement>> metrics) {
auto range = boost::irange<size_t>(0, results.size());
if (unlikely(
results.size() != responses.size()
Expand All @@ -389,7 +389,7 @@ static void fill_fetch_responses(
if (unlikely(res.error != error_code::none)) {
resp_it->set(
make_partition_response_error(res.partition, res.error));
metric->set_trace(false);
metric->cancel();
continue;
}

Expand Down
7 changes: 5 additions & 2 deletions src/v/kafka/server/handlers/fetch.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "kafka/server/handlers/handler.h"
#include "kafka/types.h"
#include "utils/intrusive_list_helpers.h"
#include "utils/log_hist.h"

namespace kafka {

Expand Down Expand Up @@ -293,10 +294,12 @@ struct read_result {
// struct aggregating fetch requests and corresponding response iterators for
// the same shard
struct shard_fetch {
using hist_t = log_hist_internal;

void push_back(
ntp_fetch_config config,
op_context::response_placeholder_ptr r_ph,
std::unique_ptr<hdr_hist::measurement> m) {
std::unique_ptr<hist_t::measurement> m) {
requests.push_back(std::move(config));
responses.push_back(r_ph);
metrics.push_back(std::move(m));
Expand All @@ -306,7 +309,7 @@ struct shard_fetch {
ss::shard_id shard;
std::vector<ntp_fetch_config> requests;
std::vector<op_context::response_placeholder_ptr> responses;
std::vector<std::unique_ptr<hdr_hist::measurement>> metrics;
std::vector<std::unique_ptr<hist_t::measurement>> metrics;

friend std::ostream& operator<<(std::ostream& o, const shard_fetch& sf) {
fmt::print(o, "{}", sf.requests);
Expand Down
2 changes: 1 addition & 1 deletion src/v/kafka/server/handlers/produce.cc
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ static partition_produce_stages produce_topic_partition(
auto dur = std::chrono::steady_clock::now() - start;
octx.rctx.connection()->server().update_produce_latency(dur);
} else {
m->set_trace(false);
m->cancel();
}
return p;
});
Expand Down
2 changes: 1 addition & 1 deletion src/v/kafka/server/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class server final
~server() noexcept override = default;
server(const server&) = delete;
server& operator=(const server&) = delete;
server(server&&) noexcept = default;
server(server&&) noexcept = delete;
server& operator=(server&&) noexcept = delete;

std::string_view name() const final { return "kafka rpc protocol"; }
Expand Down
4 changes: 2 additions & 2 deletions src/v/net/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ void server::setup_metrics() {
"{}: Memory consumed by request processing", cfg.name))),
sm::make_histogram(
"dispatch_handler_latency",
[this] { return _hist.seastar_histogram_logform(); },
[this] { return _hist.internal_histogram_logform(); },
sm::description(ssx::sformat("{}: Latency ", cfg.name)))});
}

Expand All @@ -352,7 +352,7 @@ void server::setup_public_metrics() {
"latency_seconds",
sm::description("RPC latency"),
{server_label(server_name)},
[this] { return ssx::metrics::report_default_histogram(_hist); })
[this] { return _hist.public_histogram_logform(); })
.aggregate({sm::shard_label})});
}

Expand Down
13 changes: 8 additions & 5 deletions src/v/net/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@
#include "net/connection.h"
#include "net/connection_rate.h"
#include "net/types.h"
#include "ssx/metrics.h"
#include "ssx/semaphore.h"
#include "utils/hdr_hist.h"
#include "utils/log_hist.h"

#include <seastar/core/abort_source.hh>
#include <seastar/core/gate.hh>
Expand Down Expand Up @@ -104,9 +105,11 @@ struct server_configuration {

class server {
public:
using hist_t = log_hist_internal;

explicit server(server_configuration, ss::logger&);
explicit server(ss::sharded<server_configuration>* s, ss::logger&);
server(server&&) noexcept = default;
server(server&&) noexcept = delete;
server& operator=(server&&) noexcept = delete;
server(const server&) = delete;
server& operator=(const server&) = delete;
Expand All @@ -131,15 +134,15 @@ class server {
ss::future<> stop();

const server_configuration cfg; // NOLINT
const hdr_hist& histogram() const { return _hist; }
const hist_t& histogram() const { return _hist; }

virtual std::string_view name() const = 0;
virtual ss::future<> apply(ss::lw_shared_ptr<net::connection>) = 0;

server_probe& probe() { return _probe; }
ssx::semaphore& memory() { return _memory; }
ss::gate& conn_gate() { return _conn_gate; }
hdr_hist& hist() { return _hist; }
hist_t& hist() { return _hist; }
ss::abort_source& abort_source() { return _as; }
bool abort_requested() const { return _as.abort_requested(); }

Expand Down Expand Up @@ -169,7 +172,7 @@ class server {
boost::intrusive::list<net::connection> _connections;
ss::abort_source _as;
ss::gate _conn_gate;
hdr_hist _hist;
hist_t _hist;
server_probe _probe;
ss::metrics::metric_groups _metrics;
ss::metrics::metric_groups _public_metrics;
Expand Down
9 changes: 4 additions & 5 deletions src/v/pandaproxy/probe.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@ void probe::setup_metrics() {
"request_latency",
sm::description("Request latency"),
labels,
[this] { return _request_metrics.hist().seastar_histogram_logform(); })
[this] {
return _request_metrics.hist().internal_histogram_logform();
})
.aggregate(internal_aggregate_labels)});
}

Expand All @@ -82,10 +84,7 @@ void probe::setup_public_metrics() {
sm::description(
ssx::sformat("Internal latency of request for {}", _group_name)),
labels,
[this] {
return ssx::metrics::report_default_histogram(
_request_metrics.hist());
})
[this] { return _request_metrics.hist().public_histogram_logform(); })
.aggregate(aggregate_labels),

sm::make_counter(
Expand Down
14 changes: 8 additions & 6 deletions src/v/pandaproxy/probe.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@

#pragma once

#include "utils/hdr_hist.h"
#include "seastarx.h"
#include "utils/log_hist.h"

#include <seastar/core/metrics_registration.hh>
#include <seastar/http/json_path.hh>
Expand All @@ -22,10 +23,11 @@ namespace pandaproxy {
/// If the request is good, measure latency, otherwise record the error.
class http_status_metric {
public:
using hist_t = log_hist_internal;
class measurement {
public:
measurement(
http_status_metric* p, std::unique_ptr<hdr_hist::measurement> m)
http_status_metric* p, std::unique_ptr<hist_t::measurement> m)
: _p(p)
, _m(std::move(m)) {}

Expand All @@ -41,17 +43,17 @@ class http_status_metric {
} else {
++_p->_5xx_count;
}
_m->set_trace(false);
_m->cancel();
}

private:
http_status_metric* _p;
std::unique_ptr<hdr_hist::measurement> _m;
std::unique_ptr<hist_t::measurement> _m;
};
hdr_hist& hist() { return _hist; }
hist_t& hist() { return _hist; }
auto auto_measure() { return measurement{this, _hist.auto_measure()}; }

hdr_hist _hist;
hist_t _hist;
int64_t _5xx_count{0};
int64_t _4xx_count{0};
int64_t _3xx_count{0};
Expand Down
2 changes: 1 addition & 1 deletion src/v/rpc/rpc_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ class failure_probes;
prometheus_sanitize::metrics_name("internal_rpc"),
{sm::make_histogram(
"latency",
[this] { return _methods[{{loop.index-1}}].probes.latency_hist().seastar_histogram_logform(); },
[this] { return _methods[{{loop.index-1}}].probes.latency_hist().internal_histogram_logform(); },
sm::description("Internal RPC service latency"),
labels)
.aggregate(aggregate_labels)});
Expand Down
Loading

0 comments on commit 0329bae

Please sign in to comment.