Skip to content

Commit

Permalink
Open source our wrapper around the scribe client
Browse files Browse the repository at this point in the history
Summary: This won't be able to build in OSS, but we can at least make the code available. Of interest specifically to #685

Reviewed By: ndmitchell

Differential Revision: D59262895

fbshipit-source-id: e4e72a402e174dedd08715a627a84e7b16d1a225
  • Loading branch information
JakobDegen authored and facebook-github-bot committed Jul 2, 2024
1 parent 7a34c8b commit def3d4b
Show file tree
Hide file tree
Showing 5 changed files with 1,361 additions and 1 deletion.
2 changes: 1 addition & 1 deletion app/buck2_events/BUCK
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ rust_library(
"//buck2/app/buck2_error:buck2_error",
"//buck2/app/buck2_util:buck2_util",
"//buck2/app/buck2_wrapper_common:buck2_wrapper_common",
"//buck2/facebook/scribe_client:scribe_client",
"//buck2/gazebo/dupe:dupe",
"//buck2/gazebo/gazebo:gazebo",
# @oss-disable: "//buck2/shed/scribe_client:scribe_client",
"//common/rust/shed/fbinit:fbinit",
# @oss-disable: "//common/rust/user:user",
],
Expand Down
37 changes: 37 additions & 0 deletions shed/scribe_client/BUCK
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
load("@fbcode_macros//build_defs:rust_library.bzl", "rust_library")
load("@fbsource//tools/build_defs:glob_defs.bzl", "glob")

oncall("build_infra")

# @oss-disable: _is_oss = False
_is_oss = True # @oss-enable

# buildifier: disable=no-effect
rust_library(
name = "scribe_client",
srcs = glob(["src/**/*.rs"]),
test_deps = [
"fbsource//third-party/rust:assert_matches",
"//common/rust/shed/fbinit:fbinit-tokio",
"//scribe/api/producer/thrift:producer_service-rust-mocks",
],
visibility = [
"//buck2/...",
],
deps = [
"fbsource//third-party/rust:anyhow",
"fbsource//third-party/rust:crossbeam",
"fbsource//third-party/rust:thiserror",
"fbsource//third-party/rust:tokio",
"fbsource//third-party/rust:tokio-retry",
"fbsource//third-party/rust:tracing",
"//common/rust/shed/fbinit:fbinit",
"//common/rust/thrift/bareclient:thriftclient",
"//scribe/api/producer/thrift:producer_service-rust",
"//scribe/api/producer/thrift:producer_service-rust-clients",
"//scribe/api/producer/thrift:producer_service-rust-thriftclients",
"//scribe/api/producer/thrift:use_case-rust",
"//scribe/api/thrift:message_metadata-rust",
"//thrift/lib/rust:fbthrift",
],
) if not _is_oss else None
43 changes: 43 additions & 0 deletions shed/scribe_client/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# Buck2 Scribe Client

This folder houses Buck2's Scribe client, which Buck2 uses to send information
that powers all of our internal tooling around Buck2. Despite this client
serving the needs of Buck2, there is no Buck2-specific logic contained within
this library.

See
[this post](https://fb.workplace.com/groups/buck2prototyping/posts/2829650903999058)
for justification of why this library exists and why it is here. This library is
intended to be an implementation detail of Buck2; please do not depend directly
on this library without speaking to us first.

Buck2 writes to Scribe by interfacing directly with the Thrift service running
on port 1456 on all Meta-owned machines. In prod, the service listening on port
`1456` is
[`scribed`](https://www.internalfb.com/intern/wiki/Documentation/Scribe/), our
production Scribe daemon. In corp, or in non-Linux prod, the service listening
on on this port is
[`scribbled`](https://www.internalfb.com/intern/wiki/Scribe/users/Knowledge_Base/Interacting_with_Scribe_categories/Write_from_Alternative_Environments/Scribble/).
Both services are expected to behave the same, as far as this client is
cooncerned, so this client concerns itself with using the
[ProducerService Thrift API](https://www.internalfb.com/intern/wiki/Scribe/users/Knowledge_Base/Interacting_with_Scribe_categories/producer/producer-service-thrift-api/)
to send messages to Scribe.

Why don't we use the already-existing
[Rust wrapper around the ProducerService Thrift API](https://www.internalfb.com/intern/wiki/Scribe/users/Knowledge_Base/Interacting_with_Scribe_categories/producer/producer-service-thrift-api/#producerservice-thrift-c)?
Unfortunately, this library does not provide a few key features that we need in
Buck2:

1. On Linux, this library
[defaults to using ServiceRouter to construct a client](https://fburl.com/code/15fy5dyk),
which is not acceptable for Buck2 (which often runs in environments where
ServiceRouter cannot function).
2. `ScribeProducer` presents an asynchronous API for pushing messages, which is
not acceptable for Buck2.
3. Buck2 needs functionality that exists in the C++ Scribe client -
specifically, intelligent retries and message buffering. The Rust
ProducerService client does not provide any of these things, and we would
need to implement them on top of the library anyway.

While this library cannot build in OSS, the code is still available for people
to inspect.
113 changes: 113 additions & 0 deletions shed/scribe_client/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under both the MIT license found in the
* LICENSE-MIT file in the root directory of this source tree and the Apache
* License, Version 2.0 found in the LICENSE-APACHE file in the root directory
* of this source tree.
*/

#![feature(once_cell_try)]
#![deny(warnings)]

mod producer;

use std::sync::Arc;
use std::sync::OnceLock;
use std::time::Duration;

use fbinit::FacebookInit;
pub use producer::Message;
use tokio::runtime::Builder;

use crate::producer::ProducerCounters;
use crate::producer::ScribeProducer;

static PRODUCER: OnceLock<Arc<ScribeProducer>> = OnceLock::new();

/// Initializes the Scribe producer that backs all Scribe clients. Returns an error if a connection can't be
/// established to a remote Scribe daemon process.
fn initialize(
fb: FacebookInit,
buffer_size: usize,
retry_backoff: Duration,
retry_attempts: usize,
message_batch_size: Option<usize>,
) -> anyhow::Result<&'static ScribeProducer> {
Ok(&**PRODUCER.get_or_try_init(|| -> anyhow::Result<_> {
let producer = Arc::new(ScribeProducer::new(
fb,
buffer_size,
retry_backoff,
retry_attempts,
message_batch_size,
)?);

// Instead of relying on any existing runtimes, we bootstrap a new tokio runtime bound to a single thread that
// we spawn here. Running on the same runtime as the rest of the program runs the risk of the producer loop not
// getting polled in a timely fashion, which leads directly to the message queue filling up and messages getting
// dropped.
//
// We need a separate runtime for now because loading and analysis do large amounts of blocking work on Tokio
// runtime threads.
std::thread::Builder::new()
.name("scribe-producer".to_owned())
.spawn({
let producer = producer.clone();
move || {
let runtime = Builder::new_current_thread().enable_all().build().unwrap();
runtime.block_on(producer_loop(&producer));
}
})?;

Ok(producer)
})?)
}

/// Task that drives the producer to regularly drain its queue.
async fn producer_loop(producer: &ScribeProducer) {
const SLEEP_INTERVAL: Duration = Duration::from_millis(500);

loop {
producer.run_once().await;
tokio::time::sleep(SLEEP_INTERVAL).await;
}
}

/// A Scribe client that sends messages to Scribe via `offer`.
pub struct ScribeClient {
scribe_producer: &'static ScribeProducer,
}

impl ScribeClient {
pub fn new(
fb: FacebookInit,
buffer_size: usize,
retry_backoff: Duration,
retry_attempts: usize,
message_batch_size: Option<usize>,
) -> anyhow::Result<ScribeClient> {
let scribe_producer = initialize(
fb,
buffer_size,
retry_backoff,
retry_attempts,
message_batch_size,
)?;
Ok(ScribeClient { scribe_producer })
}

pub fn export_counters(&self) -> ProducerCounters {
self.scribe_producer.export_counters()
}

/// Offers a single message to Scribe. Does not block.
pub fn offer(&self, message: Message) {
self.scribe_producer.offer(message);
}

/// Sends all messages in `messages` now (bypass internal message queue.)
pub async fn send_messages_now(&self, messages: Vec<Message>) {
self.scribe_producer.send_messages_now(messages).await
}
}
Loading

0 comments on commit def3d4b

Please sign in to comment.