From def3d4b3a5c9f121a36d0f04434a61d51a76b098 Mon Sep 17 00:00:00 2001 From: Jakob Degen Date: Tue, 2 Jul 2024 02:46:10 -0700 Subject: [PATCH] Open source our wrapper around the scribe client Summary: This won't be able to build in OSS, but we can at least make the code available. Of interest specifically to https://github.com/facebook/buck2/pull/685 Reviewed By: ndmitchell Differential Revision: D59262895 fbshipit-source-id: e4e72a402e174dedd08715a627a84e7b16d1a225 --- app/buck2_events/BUCK | 2 +- shed/scribe_client/BUCK | 37 + shed/scribe_client/README.md | 43 + shed/scribe_client/src/lib.rs | 113 +++ shed/scribe_client/src/producer.rs | 1167 ++++++++++++++++++++++++++++ 5 files changed, 1361 insertions(+), 1 deletion(-) create mode 100644 shed/scribe_client/BUCK create mode 100644 shed/scribe_client/README.md create mode 100644 shed/scribe_client/src/lib.rs create mode 100644 shed/scribe_client/src/producer.rs diff --git a/app/buck2_events/BUCK b/app/buck2_events/BUCK index e82ff9f76a56..65f8cc0a38c5 100644 --- a/app/buck2_events/BUCK +++ b/app/buck2_events/BUCK @@ -46,9 +46,9 @@ rust_library( "//buck2/app/buck2_error:buck2_error", "//buck2/app/buck2_util:buck2_util", "//buck2/app/buck2_wrapper_common:buck2_wrapper_common", - "//buck2/facebook/scribe_client:scribe_client", "//buck2/gazebo/dupe:dupe", "//buck2/gazebo/gazebo:gazebo", + # @oss-disable: "//buck2/shed/scribe_client:scribe_client", "//common/rust/shed/fbinit:fbinit", # @oss-disable: "//common/rust/user:user", ], diff --git a/shed/scribe_client/BUCK b/shed/scribe_client/BUCK new file mode 100644 index 000000000000..c98b957e2f55 --- /dev/null +++ b/shed/scribe_client/BUCK @@ -0,0 +1,37 @@ +load("@fbcode_macros//build_defs:rust_library.bzl", "rust_library") +load("@fbsource//tools/build_defs:glob_defs.bzl", "glob") + +oncall("build_infra") + +# @oss-disable: _is_oss = False +_is_oss = True # @oss-enable + +# buildifier: disable=no-effect +rust_library( + name = "scribe_client", + srcs = glob(["src/**/*.rs"]), + test_deps = [ + "fbsource//third-party/rust:assert_matches", + "//common/rust/shed/fbinit:fbinit-tokio", + "//scribe/api/producer/thrift:producer_service-rust-mocks", + ], + visibility = [ + "//buck2/...", + ], + deps = [ + "fbsource//third-party/rust:anyhow", + "fbsource//third-party/rust:crossbeam", + "fbsource//third-party/rust:thiserror", + "fbsource//third-party/rust:tokio", + "fbsource//third-party/rust:tokio-retry", + "fbsource//third-party/rust:tracing", + "//common/rust/shed/fbinit:fbinit", + "//common/rust/thrift/bareclient:thriftclient", + "//scribe/api/producer/thrift:producer_service-rust", + "//scribe/api/producer/thrift:producer_service-rust-clients", + "//scribe/api/producer/thrift:producer_service-rust-thriftclients", + "//scribe/api/producer/thrift:use_case-rust", + "//scribe/api/thrift:message_metadata-rust", + "//thrift/lib/rust:fbthrift", + ], +) if not _is_oss else None diff --git a/shed/scribe_client/README.md b/shed/scribe_client/README.md new file mode 100644 index 000000000000..700926850aac --- /dev/null +++ b/shed/scribe_client/README.md @@ -0,0 +1,43 @@ +# Buck2 Scribe Client + +This folder houses Buck2's Scribe client, which Buck2 uses to send information +that powers all of our internal tooling around Buck2. Despite this client +serving the needs of Buck2, there is no Buck2-specific logic contained within +this library. + +See +[this post](https://fb.workplace.com/groups/buck2prototyping/posts/2829650903999058) +for justification of why this library exists and why it is here. This library is +intended to be an implementation detail of Buck2; please do not depend directly +on this library without speaking to us first. + +Buck2 writes to Scribe by interfacing directly with the Thrift service running +on port 1456 on all Meta-owned machines. In prod, the service listening on port +`1456` is +[`scribed`](https://www.internalfb.com/intern/wiki/Documentation/Scribe/), our +production Scribe daemon. In corp, or in non-Linux prod, the service listening +on on this port is +[`scribbled`](https://www.internalfb.com/intern/wiki/Scribe/users/Knowledge_Base/Interacting_with_Scribe_categories/Write_from_Alternative_Environments/Scribble/). +Both services are expected to behave the same, as far as this client is +cooncerned, so this client concerns itself with using the +[ProducerService Thrift API](https://www.internalfb.com/intern/wiki/Scribe/users/Knowledge_Base/Interacting_with_Scribe_categories/producer/producer-service-thrift-api/) +to send messages to Scribe. + +Why don't we use the already-existing +[Rust wrapper around the ProducerService Thrift API](https://www.internalfb.com/intern/wiki/Scribe/users/Knowledge_Base/Interacting_with_Scribe_categories/producer/producer-service-thrift-api/#producerservice-thrift-c)? +Unfortunately, this library does not provide a few key features that we need in +Buck2: + +1. On Linux, this library + [defaults to using ServiceRouter to construct a client](https://fburl.com/code/15fy5dyk), + which is not acceptable for Buck2 (which often runs in environments where + ServiceRouter cannot function). +2. `ScribeProducer` presents an asynchronous API for pushing messages, which is + not acceptable for Buck2. +3. Buck2 needs functionality that exists in the C++ Scribe client - + specifically, intelligent retries and message buffering. The Rust + ProducerService client does not provide any of these things, and we would + need to implement them on top of the library anyway. + +While this library cannot build in OSS, the code is still available for people +to inspect. diff --git a/shed/scribe_client/src/lib.rs b/shed/scribe_client/src/lib.rs new file mode 100644 index 000000000000..2f0b946e8523 --- /dev/null +++ b/shed/scribe_client/src/lib.rs @@ -0,0 +1,113 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * This source code is licensed under both the MIT license found in the + * LICENSE-MIT file in the root directory of this source tree and the Apache + * License, Version 2.0 found in the LICENSE-APACHE file in the root directory + * of this source tree. + */ + +#![feature(once_cell_try)] +#![deny(warnings)] + +mod producer; + +use std::sync::Arc; +use std::sync::OnceLock; +use std::time::Duration; + +use fbinit::FacebookInit; +pub use producer::Message; +use tokio::runtime::Builder; + +use crate::producer::ProducerCounters; +use crate::producer::ScribeProducer; + +static PRODUCER: OnceLock> = OnceLock::new(); + +/// Initializes the Scribe producer that backs all Scribe clients. Returns an error if a connection can't be +/// established to a remote Scribe daemon process. +fn initialize( + fb: FacebookInit, + buffer_size: usize, + retry_backoff: Duration, + retry_attempts: usize, + message_batch_size: Option, +) -> anyhow::Result<&'static ScribeProducer> { + Ok(&**PRODUCER.get_or_try_init(|| -> anyhow::Result<_> { + let producer = Arc::new(ScribeProducer::new( + fb, + buffer_size, + retry_backoff, + retry_attempts, + message_batch_size, + )?); + + // Instead of relying on any existing runtimes, we bootstrap a new tokio runtime bound to a single thread that + // we spawn here. Running on the same runtime as the rest of the program runs the risk of the producer loop not + // getting polled in a timely fashion, which leads directly to the message queue filling up and messages getting + // dropped. + // + // We need a separate runtime for now because loading and analysis do large amounts of blocking work on Tokio + // runtime threads. + std::thread::Builder::new() + .name("scribe-producer".to_owned()) + .spawn({ + let producer = producer.clone(); + move || { + let runtime = Builder::new_current_thread().enable_all().build().unwrap(); + runtime.block_on(producer_loop(&producer)); + } + })?; + + Ok(producer) + })?) +} + +/// Task that drives the producer to regularly drain its queue. +async fn producer_loop(producer: &ScribeProducer) { + const SLEEP_INTERVAL: Duration = Duration::from_millis(500); + + loop { + producer.run_once().await; + tokio::time::sleep(SLEEP_INTERVAL).await; + } +} + +/// A Scribe client that sends messages to Scribe via `offer`. +pub struct ScribeClient { + scribe_producer: &'static ScribeProducer, +} + +impl ScribeClient { + pub fn new( + fb: FacebookInit, + buffer_size: usize, + retry_backoff: Duration, + retry_attempts: usize, + message_batch_size: Option, + ) -> anyhow::Result { + let scribe_producer = initialize( + fb, + buffer_size, + retry_backoff, + retry_attempts, + message_batch_size, + )?; + Ok(ScribeClient { scribe_producer }) + } + + pub fn export_counters(&self) -> ProducerCounters { + self.scribe_producer.export_counters() + } + + /// Offers a single message to Scribe. Does not block. + pub fn offer(&self, message: Message) { + self.scribe_producer.offer(message); + } + + /// Sends all messages in `messages` now (bypass internal message queue.) + pub async fn send_messages_now(&self, messages: Vec) { + self.scribe_producer.send_messages_now(messages).await + } +} diff --git a/shed/scribe_client/src/producer.rs b/shed/scribe_client/src/producer.rs new file mode 100644 index 000000000000..c7e53a455e8d --- /dev/null +++ b/shed/scribe_client/src/producer.rs @@ -0,0 +1,1167 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * This source code is licensed under both the MIT license found in the + * LICENSE-MIT file in the root directory of this source tree and the Apache + * License, Version 2.0 found in the LICENSE-APACHE file in the root directory + * of this source tree. + */ + +//! A producer of Scribe messages, which connects to a remote Scribe daemon and sends messages to it. +//! +//! There is expected to be at most one Scribe producer in any given process. The Scribe producer synchronously +//! receives messages from the process, which it places in a queue. As messages queue up, it periodically retires the +//! queue by flushing to the remote Scribe producer service. + +use std::convert::TryInto; +use std::net::IpAddr; +use std::net::Ipv6Addr; +use std::net::SocketAddr; +use std::sync::atomic; +use std::sync::atomic::AtomicU64; +use std::sync::Mutex; +use std::time::Duration; + +use crossbeam::queue::ArrayQueue; +use fbinit::FacebookInit; +use fbthrift::NonthrowingFunctionError; +use scribe_message_metadata::MessageMetadata; +use scribe_producer_service::consts::DEFAULT_PRODUCER_SERVICE_PORT; +use scribe_producer_service::WriteMessage; +use scribe_producer_service::WriteMessageResultCode; +use scribe_producer_service::WriteMessagesRequest; +use scribe_producer_service_clients::ProducerServiceClient; +use scribe_producer_service_thriftclients::build_ProducerService_client; +use scribe_use_cases::UseCase; +use thriftclient::ThriftChannelBuilder; +use thriftclient::TransportType; +use tokio_retry::strategy::FibonacciBackoff; + +/// A message, destined to be delivered to Scribe. +#[derive(Clone, Debug, Default)] +pub struct Message { + /// The category to publish this Scribe message to. + pub category: String, + + /// The message payload itself. + pub message: Vec, + + /// A message key to provide to Scribe. The message key is a 64-bit integer that Scribe will use when sharding a + /// category. To Scribe, a message key is the atomic unit of partitioning for a stream. All messages with a given + /// message key are guaranteed by Scribe to be processed by the same consumer shard. + /// + /// The number itself is arbitrary; for example, Buck2 hashes the trace ID and uses that as the message key. + pub message_key: Option, +} + +impl From for WriteMessage { + fn from(message: Message) -> WriteMessage { + let metadata = MessageMetadata { + messageKey: message.message_key, + ..Default::default() + }; + + WriteMessage { + category: message.category, + message: message.message, + metadata, + // TODO(swgillespie) add a buck2 use case? + useCase: UseCase::DEFAULT_RUST, + ..Default::default() + } + } +} + +struct MessageSendState { + message: Message, + error: Option, +} + +/// These counters are a snapshot of the current state of this Scribe client. +#[derive(Debug, Default, Clone)] +pub struct ProducerCounters { + // Successful submissions to scribe. + pub successes: u64, + // How many messages failed to be submitted to scribe by error type. + pub failures_invalid_request: u64, + pub failures_unauthorized: u64, + pub failures_rate_limited: u64, + pub failures_pushed_back: u64, + pub failures_enqueue_failed: u64, + pub failures_internal_error: u64, + pub failures_timed_out: u64, + pub failures_unknown: u64, + // Depth of the queue, e.g. how many messages need to be processed. + pub queue_depth: u64, + // How many messages were dropped before we even enqueued them (e.g. because the internal buffer is full). + pub dropped: u64, +} + +impl ProducerCounters { + pub fn failures(&self) -> u64 { + let ProducerCounters { + successes: _, + failures_invalid_request, + failures_unauthorized, + failures_rate_limited, + failures_pushed_back, + failures_enqueue_failed, + failures_internal_error, + failures_timed_out, + failures_unknown, + queue_depth: _, + dropped: _, + } = self; + *failures_invalid_request + + *failures_unauthorized + + *failures_rate_limited + + *failures_pushed_back + + *failures_enqueue_failed + + *failures_internal_error + + *failures_timed_out + + *failures_unknown + } +} + +#[derive(Debug, Default)] +struct ProducerCountersData { + successes: AtomicU64, + failures_invalid_request: AtomicU64, + failures_unauthorized: AtomicU64, + failures_rate_limited: AtomicU64, + failures_pushed_back: AtomicU64, + failures_enqueue_failed: AtomicU64, + failures_internal_error: AtomicU64, + failures_timed_out: AtomicU64, + failures_unknown: AtomicU64, + dropped: AtomicU64, +} + +// This congestion control has 2 states (phases) each of which changes its behavior depending on whether +// congestion has occurred. The concept of the algorithm is based on TCP Reno which also consists of the +// two states. Our implementation is a very-simplified version but we try not to diverge from the original +// concept on purpose. +#[derive(Debug)] +enum CongestionControlPhase { + /// Every message batch begins with EarlyFail phase where, if it fails to queue, + /// the batch is cut by half until the length that is successfully queued is found. + EarlyFail, + /// In this phase, the client tries recovering back the batch size linearly so it can send + /// as many messages as possible while keeping from causing congestion. If it fails to push + /// the batch to the queue, it cuts the size down to `(current_cutoff + cliff_bottom) / 2` + /// where `cliff_bottom` is the successful cutoff we memorize when the last cutoff happened. + FastRecovery { step: usize }, +} + +#[derive(Debug)] +struct CongestionControlState { + phase: CongestionControlPhase, + current_cutoff: usize, + + // If we draw a line chart with the send attempt count on x-axis and cutoff size on y-axis, the line will + // be going upward linearly while the traffic isn't congested, and it looks like a cliff when a congestion + // happens in this algorithm. We need to store the top and bottom value of the cliff since they'll be used + // in computing the next cutoff when a congestion happens again. + // https://fburl.com/px/le1398zy and https://fburl.com/gsheet/de3aa4ux are the illustrations that help + // visually understand it using the example of the value changes in `normal_cutoff_computations` test. + cliff_top: usize, + cliff_bottom: usize, +} + +/// The number of steps for a message batch to get recovered to the original length. +/// The smaller it is, the sooner the recovery will be, but the less chance it has to +/// find a proper and stable batch length. +const FAST_RECOVERY_STEPS: usize = 10; + +impl CongestionControlState { + fn new(initial_cutoff: usize) -> Self { + Self { + phase: CongestionControlPhase::EarlyFail, + current_cutoff: initial_cutoff, + cliff_top: 0, + cliff_bottom: 0, + } + } + + /// Update cutoff based on the current conditions + fn update_cutoff(&mut self, congested: bool) { + match self.phase { + CongestionControlPhase::EarlyFail => { + if congested { + // The +1 avoids the situations where 1) cutting down to zero and + // 2) making 1-leftover for odd numbers that adds one redundant iteration + let new_cutoff = self.current_cutoff / 2 + 1; + self.cliff_top = self.current_cutoff; + self.cliff_bottom = new_cutoff; + self.current_cutoff = new_cutoff; + } else { + self.phase = CongestionControlPhase::FastRecovery { + step: self.compute_recovery_amount(), + }; + } + } + CongestionControlPhase::FastRecovery { step } => { + if congested { + let mut new_cutoff = (self.current_cutoff + self.cliff_bottom) / 2 + 1; + if new_cutoff == self.current_cutoff || new_cutoff == self.current_cutoff + 1 { + // Congested but reached the bottom of cliff. Exploring down deeper. + self.cliff_bottom /= 2; + new_cutoff = (self.current_cutoff + self.cliff_bottom) / 2 + 1; + } + self.cliff_top = self.current_cutoff; + self.cliff_bottom = new_cutoff; + self.current_cutoff = new_cutoff; + self.phase = CongestionControlPhase::FastRecovery { + step: self.compute_recovery_amount(), + }; + } else { + self.current_cutoff += step; + } + } + } + } + + /// Compute and return the recovery amount of each step + fn compute_recovery_amount(&self) -> usize { + self.cliff_top.saturating_sub(self.current_cutoff) / FAST_RECOVERY_STEPS + 1 + } + + fn load_last_cutoff(&mut self, last_cutoff: usize, init_cliff_top: usize) { + self.current_cutoff = last_cutoff; + self.cliff_top = init_cliff_top; + self.phase = CongestionControlPhase::FastRecovery { + step: self.compute_recovery_amount(), + }; + } +} + +/// A client of the Scribe ProducerService that buffers, retries, and sends messages to a remote Scribe daemon. +pub(crate) struct ScribeProducer { + fb: FacebookInit, + client: tokio::sync::Mutex, + queue: ArrayQueue, + counters: ProducerCountersData, + retry_backoff: Duration, + retry_attempts: usize, + message_batch_size: Option, + last_cutoff: Mutex>, +} + +impl ScribeProducer { + pub(crate) fn new( + fb: FacebookInit, + buffer_size: usize, + retry_backoff: Duration, + retry_attempts: usize, + message_batch_size: Option, + ) -> anyhow::Result { + let client = connect(fb)?; + let queue = ArrayQueue::new(buffer_size); + Ok(ScribeProducer { + fb, + client: tokio::sync::Mutex::new(client), + queue, + counters: ProducerCountersData::default(), + retry_backoff, + retry_attempts, + message_batch_size, + last_cutoff: Mutex::new(None), + }) + } + + /// Offers a message to this Scribe producer. Does not block. + pub(crate) fn offer(&self, message: Message) { + if self.queue.push(message).is_err() { + tracing::debug!("Scribe producer dropping message due to full buffer"); + self.counters + .dropped + .fetch_add(1, atomic::Ordering::Relaxed); + } + } + + /// Scrape counters for reporting to upstream event logging. + pub(crate) fn export_counters(&self) -> ProducerCounters { + ProducerCounters { + successes: self.counters.successes.load(atomic::Ordering::Relaxed), + failures_invalid_request: self + .counters + .failures_invalid_request + .load(atomic::Ordering::Relaxed), + failures_unauthorized: self + .counters + .failures_unauthorized + .load(atomic::Ordering::Relaxed), + failures_rate_limited: self + .counters + .failures_rate_limited + .load(atomic::Ordering::Relaxed), + failures_pushed_back: self + .counters + .failures_pushed_back + .load(atomic::Ordering::Relaxed), + failures_enqueue_failed: self + .counters + .failures_enqueue_failed + .load(atomic::Ordering::Relaxed), + failures_internal_error: self + .counters + .failures_internal_error + .load(atomic::Ordering::Relaxed), + failures_timed_out: self + .counters + .failures_timed_out + .load(atomic::Ordering::Relaxed), + failures_unknown: self + .counters + .failures_unknown + .load(atomic::Ordering::Relaxed), + dropped: self.counters.dropped.load(atomic::Ordering::Relaxed), + // So we get an accurate snapshot of the queue depth when scraping + // metrics, do this here and now rather than in the background. + queue_depth: self.queue.len() as u64, + } + } + + /// Sends all messages in `messages` now (bypass internal message queue.) + pub(crate) async fn send_messages_now(&self, messages: Vec) { + self.send_impl(messages).await; + } + + async fn refresh_connection( + &self, + client_: &mut tokio::sync::MutexGuard<'_, ProducerServiceClient>, + ) -> anyhow::Result<()> { + let new_client = connect(self.fb)?; + **client_ = new_client; + Ok(()) + } + + fn make_retry_intervals(&self) -> Vec { + // Why not use tokio_retry::Retry? We don't want to wholesale re-run the + // entire request because we need to mutate the request to filter out + // successful messages. + let retry_backoff_ms: u64 = self.retry_backoff.as_millis().try_into().unwrap_or(0); + std::iter::once(Duration::from_millis(0)) + .chain(FibonacciBackoff::from_millis(retry_backoff_ms)) + .take(self.retry_attempts) + .collect() + } + + async fn send_impl(&self, messages: Vec) { + if messages.is_empty() { + return; + } + + let mut messages: Vec = messages + .into_iter() + .map(|message| MessageSendState { + message, + error: None, + }) + .collect(); + + let retry_intervals: Vec = self.make_retry_intervals(); + + let mut cc_state = CongestionControlState::new(messages.len()); + let original_len = messages.len(); + let mut retry_count = 0; + + let mut cutoff_len; + let mut success_count; + let mut retryable_error_count; + + if let Some(last_cutoff) = *self.last_cutoff.lock().unwrap() { + cc_state.load_last_cutoff(last_cutoff, messages.len()); + } + + while retry_count < retry_intervals.len() { + cutoff_len = std::cmp::min(messages.len(), cc_state.current_cutoff); + success_count = 0; + retryable_error_count = 0; + + tracing::debug!( + "retry_count={}, interval={:?}, current_cutoff={}, {} remained in batch", + retry_count, + retry_intervals[retry_count], + cc_state.current_cutoff, + messages.len() + ); + + tokio::time::sleep(retry_intervals[retry_count]).await; + + let req = WriteMessagesRequest { + messages: messages[..cutoff_len] + .iter() + .map(|message| message.message.clone().into()) + .collect(), + ..Default::default() + }; + + // Mutex block for `self.client` + let results = { + let mut client_ = self.client.lock().await; + match client_.WriteMessages(&req).await { + Ok(result) => result.results, + Err(e) => { + tracing::debug!( + "scribe_producer: received fatal error from scribe, will retry: {:#}", + e, + ); + match e { + NonthrowingFunctionError::ThriftError(te) => { + // Error on Thrift transport layer. The channel likely got EOF due to any of + // server hitting connection limit, connection age timeout, server connection + // idle timeout, or server crashes and the endpoint is voided. + if te + .to_string() + .contains("apache::thrift::transport::TTransportException") + { + tracing::debug!( + "The existing connection reached EOF. Reconnecting." + ); + // If an error happens during re-connection, ignore it because the build + // command has already started and we don't want to bail out. + let _ignore = self.refresh_connection(&mut client_).await; + } + } + NonthrowingFunctionError::ApplicationException(_) => { + // Error on Thrift application layer. Rarely happens as long as we use an + // official thrift library. No special action is taken but just retry it. + } + } + retry_count += 1; + continue; + } + } + }; + + let mut write_result_iter = results.iter(); + messages.retain_mut(|message| { + write_result_iter.next().map_or(true, |result| { + match result_code_into_result(result.code) { + Ok(_) => { + self.counters + .successes + .fetch_add(1, atomic::Ordering::Relaxed); + success_count += 1; + false + } + Err(e) => { + if e.is_retryable() { + if message.error.is_none() { + message.error = Some(e); + } + retryable_error_count += 1; + true + } else { + e.inc_counter(&self.counters); + false + } + } + } + }) + }); + if messages.is_empty() { + break; + } + + if cutoff_len == retryable_error_count { + // Congested; all messages were pushed back with retryable errors, which means + // Scribed/Scribble couldn't prepare enough buffer to hold the message vector. + cc_state.update_cutoff(true); + retry_count += 1; + } else if cutoff_len == success_count { + // Success; all the messages up to the cutoff were successfully processed. + cc_state.update_cutoff(false); + } else { + // Partial Success; there were some unretryable errors, or partial successes, or both + // not because of congestion. Try again with a longer interval. + retry_count += 1; + } + tracing::debug!( + "Updated cc_state: {:?}; new cutoff: {}", + cc_state, + cc_state.current_cutoff + ); + } + + // Any messages leftover after exiting the loop are ones we failed to send after exhausting retries and + // should be counted as errors. + if !messages.is_empty() { + tracing::debug!("scribe_producer: failed to send all messages"); + } + for message in &messages { + match &message.error { + None => { + self.counters + .failures_unknown + .fetch_add(1, atomic::Ordering::Relaxed); + } + Some(e) => e.inc_counter(&self.counters), + } + } + + if cc_state.current_cutoff < original_len { + *self.last_cutoff.lock().unwrap() = Some(cc_state.current_cutoff); + } else { + *self.last_cutoff.lock().unwrap() = None; + } + } + + pub(crate) async fn run_once(&self) { + if self.queue.is_empty() { + return; + } + + let mut messages: Vec = vec![]; + let count = self.message_batch_size.unwrap_or_else(|| self.queue.len()); + for _ in 0..count { + match self.queue.pop() { + Some(msg) => messages.push(msg), + None => break, + } + } + + self.send_impl(messages).await; + } +} + +/// Connect to Scribe producer service (local Scribe daemon) via Thrift interface. +fn connect(fb: FacebookInit) -> anyhow::Result { + let addr = SocketAddr::new( + IpAddr::V6(Ipv6Addr::LOCALHOST), + DEFAULT_PRODUCER_SERVICE_PORT as u16, + ); + build_ProducerService_client( + ThriftChannelBuilder::from_sock_addr(fb, addr)? + .with_conn_timeout(1000) + .with_recv_timeout(1000) + .with_channel_pool(false) + .with_transport_type(TransportType::Header) + // By default, ThriftChannelBuilder will initiate a TLS handshake with the target server. This works fine + // on production machines, where Chef has set up certificates that make this successful; on corp machines, + // this is not the case, and we must turn it off otherwise we will fail to connect to the local Scribe + // daemon. + // + // Disabling TLS on a localhost connection is fine anyway since there's no way this traffic ever leaves the + // machine. + .with_secure(false), + ) +} + +// Errors returned from Scribe's Producer API. Each corresponds to a return code +// from WriteMessages. +// From: https://fburl.com/code/2jrnviz0 +#[derive(Debug, thiserror::Error)] +pub enum WriteMessageError { + // Returned if the message is evaluated to be invalid (e.g., invalid message + // size, unregistered category, invalid application bucket). + #[error("invalid request")] + InvalidRequest, + // Returned if the Thrift client has attempted to write to a category + // which it lacks permissions for. + #[error("unauthorized")] + Unauthorized, + // Returned if the message is dropped for rate limiting reasons + // (e.g., the category is blocklisted or sampled). + // + // Retrying upon this result code may ingest a message successfully due to + // sampling or if the category is removed from the blocklist in the meantime. + // However, if retry is needed all the time, the pushback write enforcement + // is a better choice. + // + // Please see the following links for more information about the blocking, + // sampling and pushback write enforcement types. + // - https://fburl.com/wiki/6hvwsuir + // - https://fburl.com/wiki/4clxpwm6 + // - https://fburl.com/wiki/7b9vr52e + #[error("ratelimited")] + RateLimited, + // Returned if the request is throttled because the category of the message + // has reached its write rate limit. + // + // It is up to the client to retry its request upon this result code after + // some time. + // + // Please see the following links for more information about the pushback + // write enforcement type: + // - https://fburl.com/wiki/6hvwsuir + // - https://fburl.com/wiki/7b9vr52e + #[error("pushed back")] + PushedBack, + // Returned if the message could not be handled by the Scribe service, + // possibly because clients have been sending too many requests in a short + // period of time and the Scribe service is overloaded. + #[error("enqueue failed")] + EnqueueFailed, + // Returned if an error occurred inside the Scribe service. + #[error("internal error")] + InternalError, + // Returned in case of the request timeout. + #[error("timed out")] + TimedOut, + // Any other errors that arise. + #[error("unknown")] + Unknown, +} + +impl WriteMessageError { + pub fn is_retryable(&self) -> bool { + std::matches!( + self, + Self::RateLimited + | Self::PushedBack + | Self::EnqueueFailed + | Self::InternalError + | Self::TimedOut + | Self::Unknown + ) + } + + fn inc_counter(&self, counter: &ProducerCountersData) { + let counter = match self { + WriteMessageError::InvalidRequest => &counter.failures_invalid_request, + WriteMessageError::Unauthorized => &counter.failures_unauthorized, + WriteMessageError::RateLimited => &counter.failures_rate_limited, + WriteMessageError::PushedBack => &counter.failures_pushed_back, + WriteMessageError::EnqueueFailed => &counter.failures_enqueue_failed, + WriteMessageError::InternalError => &counter.failures_internal_error, + WriteMessageError::TimedOut => &counter.failures_timed_out, + WriteMessageError::Unknown => &counter.failures_unknown, + }; + counter.fetch_add(1, atomic::Ordering::Relaxed); + } +} + +fn result_code_into_result(code: WriteMessageResultCode) -> Result<(), WriteMessageError> { + match code { + WriteMessageResultCode::OK => Ok(()), + WriteMessageResultCode::INVALID_REQUEST => Err(WriteMessageError::InvalidRequest), + WriteMessageResultCode::UNAUTHORIZED => Err(WriteMessageError::Unauthorized), + WriteMessageResultCode::RATE_LIMITED => Err(WriteMessageError::RateLimited), + WriteMessageResultCode::PUSHED_BACK => Err(WriteMessageError::PushedBack), + WriteMessageResultCode::ENQUEUE_FAILED => Err(WriteMessageError::EnqueueFailed), + WriteMessageResultCode::INTERNAL_ERROR => Err(WriteMessageError::InternalError), + WriteMessageResultCode::TIMED_OUT => Err(WriteMessageError::TimedOut), + _ => Err(WriteMessageError::Unknown), + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use assert_matches::assert_matches; + use fbthrift::application_exception::ApplicationException; + use scribe_producer_service::WriteMessageResult as ThriftWriteMessageResult; + use scribe_producer_service::WriteMessagesResponse; + use scribe_producer_service_clients::errors::WriteMessagesError; + use scribe_producer_service_clients::ProducerService; + + use super::*; + + #[allow(non_snake_case)] + fn make_ScribeProducer( + fb: FacebookInit, + client: ProducerServiceClient, + queue_size: usize, + ) -> ScribeProducer { + ScribeProducer { + fb, + client: tokio::sync::Mutex::new(client), + queue: ArrayQueue::new(queue_size), + counters: ProducerCountersData::default(), + retry_backoff: Duration::from_millis(0), + retry_attempts: 5, + message_batch_size: None, + last_cutoff: Mutex::new(None), + } + } + + #[fbinit::test] + async fn success_smoke_test(fb: FacebookInit) { + let client = Arc::new(scribe_producer_service_mocks::new::()); + client.WriteMessages.mock(|req| { + assert_eq!(req.messages.len(), 1); + let msg = &req.messages[0]; + assert_eq!(msg.category, "buck2_events"); + assert_eq!(msg.message, b"hello, world!".to_vec()); + assert_eq!(msg.metadata.messageKey, Some(42)); + WriteMessagesResponse { + results: vec![ThriftWriteMessageResult { + code: WriteMessageResultCode::OK, + ..Default::default() + }], + ..Default::default() + } + }); + + let producer = make_ScribeProducer(fb, client, 5); + + let message = Message { + category: "buck2_events".to_owned(), + message: b"hello, world!".to_vec(), + message_key: Some(42), + }; + + producer.offer(message); + producer.run_once().await; + let counters = producer.export_counters(); + println!("counters: {:?}", counters); + assert_eq!(counters.successes, 1); + assert_eq!(counters.failures(), 0); + } + + // Make a mock ProducerService that returns the provided codes in the same order + fn mock_client( + mut mock_return_code_groups: Vec>, + ) -> ProducerServiceClient { + let client = Arc::new(scribe_producer_service_mocks::new::()); + mock_return_code_groups.reverse(); + client.WriteMessages.mock(move |req| { + let mock_return_code_group = mock_return_code_groups.pop().unwrap_or_else(|| { + panic!("WriteMessages() was called more than the mock ProducerService expected."); + }); + assert!( + mock_return_code_group.len() == req.messages.len(), + "Mock ProducerService received the different number of messages than it expected.", + ); + + WriteMessagesResponse { + results: mock_return_code_group + .iter() + .map(|code| ThriftWriteMessageResult { + code: *code, + ..Default::default() + }) + .collect(), + ..Default::default() + } + }); + + client + } + + fn message(contents: &'static str) -> Message { + Message { + category: "buck2_events".to_owned(), + message: contents.as_bytes().to_vec(), + ..Default::default() + } + } + + #[fbinit::test] + async fn run_once_retries_and_all_succeed(fb: FacebookInit) { + let codes = vec![ + vec![ + WriteMessageResultCode::OK, + WriteMessageResultCode::OK, + WriteMessageResultCode::ENQUEUE_FAILED, + WriteMessageResultCode::ENQUEUE_FAILED, + WriteMessageResultCode::ENQUEUE_FAILED, + ], + vec![ + WriteMessageResultCode::OK, + WriteMessageResultCode::ENQUEUE_FAILED, + WriteMessageResultCode::ENQUEUE_FAILED, + ], + vec![WriteMessageResultCode::OK, WriteMessageResultCode::OK], + ]; + let client = mock_client(codes.clone()); + let producer = make_ScribeProducer(fb, client, codes[0].len()); + + for _ in codes[0].iter() { + producer.offer(message("hello, world!")); + } + producer.run_once().await; + let counters = producer.export_counters(); + assert_eq!(counters.successes, 5); + assert_eq!(counters.failures(), 0); + } + + #[fbinit::test] + async fn run_once_does_not_retry_terminal_failures(fb: FacebookInit) { + let codes = vec![ + vec![ + WriteMessageResultCode::OK, + WriteMessageResultCode::OK, + WriteMessageResultCode::UNAUTHORIZED, + WriteMessageResultCode::INVALID_REQUEST, + WriteMessageResultCode::ENQUEUE_FAILED, + ], + vec![WriteMessageResultCode::OK], + ]; + let client = mock_client(codes.clone()); + let producer = make_ScribeProducer(fb, client, codes[0].len()); + + for _ in codes[0].iter() { + producer.offer(message("hello, world!")); + } + producer.run_once().await; + let counters = producer.export_counters(); + println!("counters: {:?}", counters); + assert_eq!(counters.successes, 3); + assert_eq!(counters.failures(), 2); + } + + #[fbinit::test] + async fn run_once_does_not_retry_after_max_retries(fb: FacebookInit) { + let codes = vec![ + vec![ + WriteMessageResultCode::OK, + WriteMessageResultCode::ENQUEUE_FAILED, + WriteMessageResultCode::ENQUEUE_FAILED, + WriteMessageResultCode::ENQUEUE_FAILED, + WriteMessageResultCode::ENQUEUE_FAILED, + WriteMessageResultCode::ENQUEUE_FAILED, + WriteMessageResultCode::ENQUEUE_FAILED, + ], + vec![ + WriteMessageResultCode::ENQUEUE_FAILED, + WriteMessageResultCode::ENQUEUE_FAILED, + WriteMessageResultCode::OK, + WriteMessageResultCode::ENQUEUE_FAILED, + WriteMessageResultCode::ENQUEUE_FAILED, + WriteMessageResultCode::ENQUEUE_FAILED, + ], + vec![ + WriteMessageResultCode::ENQUEUE_FAILED, + WriteMessageResultCode::OK, + WriteMessageResultCode::ENQUEUE_FAILED, + WriteMessageResultCode::ENQUEUE_FAILED, + WriteMessageResultCode::ENQUEUE_FAILED, + ], + vec![ + WriteMessageResultCode::OK, + WriteMessageResultCode::ENQUEUE_FAILED, + WriteMessageResultCode::ENQUEUE_FAILED, + WriteMessageResultCode::ENQUEUE_FAILED, + ], + vec![ + WriteMessageResultCode::ENQUEUE_FAILED, + WriteMessageResultCode::OK, + WriteMessageResultCode::ENQUEUE_FAILED, + ], + ]; + let client = mock_client(codes.clone()); + let producer = make_ScribeProducer(fb, client, codes[0].len()); + for _ in codes[0].iter() { + producer.offer(message("hello, world!")); + } + + producer.run_once().await; + let counters = producer.export_counters(); + assert_eq!(counters.successes, 5); + assert_eq!(counters.failures(), 2); + } + + #[fbinit::test] + async fn all_messages_are_retried_if_write_messages_fails(fb: FacebookInit) { + let client = Arc::new(scribe_producer_service_mocks::new::()); + client.WriteMessages.mock_result(|_| { + Err(WriteMessagesError::ApplicationException( + ApplicationException::unknown_method(), + )) + }); + + let producer = make_ScribeProducer(fb, client, 5); + for _ in 0..5 { + producer.offer(message("hello, world!")); + } + + producer.run_once().await; + let counters = producer.export_counters(); + assert_eq!(counters.successes, 0); + assert_eq!(counters.failures(), 5); + } + + #[fbinit::test] + async fn send_one_message_with_cutoff_retries_and_succeeds(fb: FacebookInit) { + let codes = vec![ + vec![WriteMessageResultCode::ENQUEUE_FAILED], + // Last one shouldn't be cut off + vec![WriteMessageResultCode::OK], + ]; + let client = mock_client(codes.clone()); + let producer = make_ScribeProducer(fb, client, codes[0].len()); + producer + .send_messages_now(vec![message("hello, world!")]) + .await; + let counters = producer.export_counters(); + assert_eq!(counters.successes, 1); + assert_eq!(counters.failures(), 0); + } + + #[fbinit::test] + async fn run_once_with_cutoff_retries_and_all_succeed(fb: FacebookInit) { + let codes = vec![ + vec![ + WriteMessageResultCode::ENQUEUE_FAILED, + WriteMessageResultCode::ENQUEUE_FAILED, + WriteMessageResultCode::ENQUEUE_FAILED, + WriteMessageResultCode::ENQUEUE_FAILED, + WriteMessageResultCode::ENQUEUE_FAILED, + WriteMessageResultCode::ENQUEUE_FAILED, + WriteMessageResultCode::ENQUEUE_FAILED, + WriteMessageResultCode::ENQUEUE_FAILED, + ], + vec![ + WriteMessageResultCode::ENQUEUE_FAILED, + WriteMessageResultCode::ENQUEUE_FAILED, + WriteMessageResultCode::ENQUEUE_FAILED, + WriteMessageResultCode::ENQUEUE_FAILED, + WriteMessageResultCode::ENQUEUE_FAILED, + ], + vec![ + WriteMessageResultCode::ENQUEUE_FAILED, + WriteMessageResultCode::ENQUEUE_FAILED, + WriteMessageResultCode::ENQUEUE_FAILED, + ], + vec![WriteMessageResultCode::OK, WriteMessageResultCode::OK], + vec![WriteMessageResultCode::OK, WriteMessageResultCode::OK], + vec![ + WriteMessageResultCode::OK, + WriteMessageResultCode::OK, + WriteMessageResultCode::OK, + ], + vec![WriteMessageResultCode::OK], + ]; + let client = mock_client(codes.clone()); + let producer = make_ScribeProducer(fb, client, codes[0].len()); + + for _ in codes[0].iter() { + producer.offer(message("hello, world!")); + } + producer.run_once().await; + let counters = producer.export_counters(); + assert_eq!(counters.successes, 8); + assert_eq!(counters.failures(), 0); + } + + #[fbinit::test] + async fn run_once_with_cutoff_does_not_retry_after_max_retries(fb: FacebookInit) { + let codes = vec![ + vec![ + WriteMessageResultCode::ENQUEUE_FAILED, + WriteMessageResultCode::ENQUEUE_FAILED, + WriteMessageResultCode::ENQUEUE_FAILED, + WriteMessageResultCode::ENQUEUE_FAILED, + WriteMessageResultCode::ENQUEUE_FAILED, + WriteMessageResultCode::ENQUEUE_FAILED, + WriteMessageResultCode::ENQUEUE_FAILED, + WriteMessageResultCode::ENQUEUE_FAILED, + ], + vec![ + WriteMessageResultCode::ENQUEUE_FAILED, + WriteMessageResultCode::ENQUEUE_FAILED, + WriteMessageResultCode::ENQUEUE_FAILED, + WriteMessageResultCode::ENQUEUE_FAILED, + WriteMessageResultCode::ENQUEUE_FAILED, + ], + vec![ + WriteMessageResultCode::ENQUEUE_FAILED, + WriteMessageResultCode::ENQUEUE_FAILED, + WriteMessageResultCode::ENQUEUE_FAILED, + ], + vec![ + WriteMessageResultCode::ENQUEUE_FAILED, + WriteMessageResultCode::ENQUEUE_FAILED, + ], + vec![ + WriteMessageResultCode::ENQUEUE_FAILED, + WriteMessageResultCode::ENQUEUE_FAILED, + ], + ]; + let client = mock_client(codes.clone()); + let producer = make_ScribeProducer(fb, client, codes[0].len()); + for _ in codes[0].iter() { + producer.offer(message("hello, world!")); + } + + producer.run_once().await; + let counters = producer.export_counters(); + assert_eq!(counters.successes, 0); + assert_eq!(counters.failures(), 8); + } + + #[fbinit::test] + async fn last_cutoff_is_memorized(fb: FacebookInit) { + let codes = vec![ + // The first batch begins with cutoff_index = 8 + vec![ + WriteMessageResultCode::ENQUEUE_FAILED, + WriteMessageResultCode::ENQUEUE_FAILED, + WriteMessageResultCode::ENQUEUE_FAILED, + WriteMessageResultCode::ENQUEUE_FAILED, + WriteMessageResultCode::ENQUEUE_FAILED, + WriteMessageResultCode::ENQUEUE_FAILED, + WriteMessageResultCode::ENQUEUE_FAILED, + WriteMessageResultCode::ENQUEUE_FAILED, + ], + vec![ + WriteMessageResultCode::OK, + WriteMessageResultCode::OK, + WriteMessageResultCode::OK, + WriteMessageResultCode::OK, + WriteMessageResultCode::OK, + ], + vec![ + WriteMessageResultCode::OK, + WriteMessageResultCode::OK, + WriteMessageResultCode::OK, + ], + // The second batch begins with cutoff_index = 5 + vec![ + WriteMessageResultCode::OK, + WriteMessageResultCode::OK, + WriteMessageResultCode::OK, + WriteMessageResultCode::OK, + WriteMessageResultCode::OK, + ], + vec![ + WriteMessageResultCode::OK, + WriteMessageResultCode::OK, + WriteMessageResultCode::OK, + ], + ]; + let client = mock_client(codes.clone()); + let producer = make_ScribeProducer(fb, client, 8); + for _ in 0..8 { + producer.offer(message("hello, world!")); + } + producer.run_once().await; + for _ in 0..8 { + producer.offer(message("hello, world!")); + } + producer.run_once().await; + let counters = producer.export_counters(); + assert_eq!(counters.successes, 16); + assert_eq!(counters.failures(), 0); + } + + #[test] + fn normal_cutoff_computations() { + let mut cc_state = CongestionControlState::new(1000); + + // Congested; 1000 -> 501 + cc_state.update_cutoff(true); + assert_matches!( + cc_state, + CongestionControlState { + phase: CongestionControlPhase::EarlyFail, + current_cutoff: 501, + cliff_top: 1000, + cliff_bottom: 501, + } + ); + + // Congested; 501 -> 251 + cc_state.update_cutoff(true); + assert_matches!( + cc_state, + CongestionControlState { + phase: CongestionControlPhase::EarlyFail, + current_cutoff: 251, + cliff_top: 501, + cliff_bottom: 251, + } + ); + + // Not congested; 251 -> 251; phase shifted to FastRecovery + cc_state.update_cutoff(false); + assert_matches!( + cc_state, + CongestionControlState { + phase: CongestionControlPhase::FastRecovery { step: 26 }, + current_cutoff: 251, + cliff_top: 501, + cliff_bottom: 251, + } + ); + + // Not congested; 251 -> 277 (+26) + cc_state.update_cutoff(false); + assert_matches!( + cc_state, + CongestionControlState { + phase: CongestionControlPhase::FastRecovery { step: 26 }, + current_cutoff: 277, + cliff_top: 501, + cliff_bottom: 251, + } + ); + + // Not congested; 277 -> 303 (+26) + cc_state.update_cutoff(false); + assert_matches!( + cc_state, + CongestionControlState { + phase: CongestionControlPhase::FastRecovery { step: 26 }, + current_cutoff: 303, + cliff_top: 501, + cliff_bottom: 251, + } + ); + + // Not congested x10; +26 per step; recovers beyond the cliff top + for _ in 0..10 { + cc_state.update_cutoff(false); + } + assert_matches!( + cc_state, + CongestionControlState { + phase: CongestionControlPhase::FastRecovery { step: 26 }, + current_cutoff: 563, + cliff_top: 501, + cliff_bottom: 251, + } + ); + + // Congested; 563 -> 408 (cut down to the mid point of current value and cliff bottom) + cc_state.update_cutoff(true); + assert_matches!( + cc_state, + CongestionControlState { + phase: CongestionControlPhase::FastRecovery { step: 16 }, + current_cutoff: 408, + cliff_top: 563, + cliff_bottom: 408, + } + ); + + // Congested; 408 -> 307 (cut down to below the cliff bottom) + cc_state.update_cutoff(true); + assert_matches!( + cc_state, + CongestionControlState { + phase: CongestionControlPhase::FastRecovery { step: 11 }, + current_cutoff: 307, + cliff_top: 408, + cliff_bottom: 307, + } + ); + + // Not congested; 307 -> 318 (+11) + cc_state.update_cutoff(false); + assert_matches!( + cc_state, + CongestionControlState { + phase: CongestionControlPhase::FastRecovery { step: 11 }, + current_cutoff: 318, + cliff_top: 408, + cliff_bottom: 307, + } + ); + } + + #[test] + fn last_one_is_not_cut_off() { + let mut cc_state = CongestionControlState::new(1); + + cc_state.update_cutoff(true); + assert_eq!(cc_state.current_cutoff, 1); + cc_state.update_cutoff(false); + cc_state.update_cutoff(true); + assert_eq!(cc_state.current_cutoff, 1); + } +}