diff --git a/Cargo.lock b/Cargo.lock index b8bc179744c..474c5784ac2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2790,6 +2790,19 @@ dependencies = [ "tokio", ] +[[package]] +name = "influxdb3_test_helpers" +version = "0.1.0" +dependencies = [ + "async-trait", + "bytes", + "futures", + "hashbrown 0.14.5", + "object_store", + "parking_lot", + "tokio", +] + [[package]] name = "influxdb3_wal" version = "0.1.0" @@ -2839,6 +2852,7 @@ dependencies = [ "influxdb-line-protocol", "influxdb3_catalog", "influxdb3_id", + "influxdb3_test_helpers", "influxdb3_wal", "insta", "iox_catalog", diff --git a/Cargo.toml b/Cargo.toml index 83afaf932f6..d40188b3d39 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,10 +8,11 @@ members = [ "influxdb3_load_generator", "influxdb3_process", "influxdb3_server", + "influxdb3_telemetry", + "influxdb3_test_helpers", "influxdb3_wal", "influxdb3_write", "iox_query_influxql_rewrite", - "influxdb3_telemetry", ] default-members = ["influxdb3"] diff --git a/influxdb3_test_helpers/Cargo.toml b/influxdb3_test_helpers/Cargo.toml new file mode 100644 index 00000000000..f5948ea3f84 --- /dev/null +++ b/influxdb3_test_helpers/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "influxdb3_test_helpers" +version.workspace = true +authors.workspace = true +edition.workspace = true +license.workspace = true + +[dependencies] +async-trait.workspace = true +bytes.workspace = true +futures.workspace = true +hashbrown.workspace = true +object_store.workspace = true +parking_lot.workspace = true +tokio.workspace = true + +[lints] +workspace = true diff --git a/influxdb3_test_helpers/src/lib.rs b/influxdb3_test_helpers/src/lib.rs new file mode 100644 index 00000000000..ef12e4fd986 --- /dev/null +++ b/influxdb3_test_helpers/src/lib.rs @@ -0,0 +1 @@ +pub mod object_store; diff --git a/influxdb3_test_helpers/src/object_store/mod.rs b/influxdb3_test_helpers/src/object_store/mod.rs new file mode 100644 index 00000000000..99df242ea62 --- /dev/null +++ b/influxdb3_test_helpers/src/object_store/mod.rs @@ -0,0 +1,341 @@ +use std::{ops::Range, sync::Arc}; + +use async_trait::async_trait; +use bytes::Bytes; +use futures::stream::BoxStream; +use hashbrown::HashMap; +use object_store::{ + path::Path, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore, + PutMultipartOpts, PutOptions, PutPayload, PutResult, +}; +use parking_lot::RwLock; +use tokio::sync::Notify; + +type RequestCounter = RwLock>; + +/// A wrapper around an inner object store that tracks requests made to the inner store +#[derive(Debug)] +pub struct RequestCountedObjectStore { + inner: Arc, + get: RequestCounter, + get_opts: RequestCounter, + get_range: RequestCounter, + get_ranges: RequestCounter, + head: RequestCounter, +} + +impl RequestCountedObjectStore { + pub fn new(inner: Arc) -> Self { + Self { + inner, + get: Default::default(), + get_opts: Default::default(), + get_range: Default::default(), + get_ranges: Default::default(), + head: Default::default(), + } + } + + /// Get the total request count accross READ-style requests for a specific `Path` in the inner + /// object store. + pub fn total_read_request_count(&self, path: &Path) -> usize { + self.get_request_count(path) + + self.get_opts_request_count(path) + + self.get_range_request_count(path) + + self.get_ranges_request_count(path) + + self.head_request_count(path) + } + + pub fn get_request_count(&self, path: &Path) -> usize { + self.get.read().get(path).copied().unwrap_or(0) + } + + pub fn get_opts_request_count(&self, path: &Path) -> usize { + self.get_opts.read().get(path).copied().unwrap_or(0) + } + + pub fn get_range_request_count(&self, path: &Path) -> usize { + self.get_range.read().get(path).copied().unwrap_or(0) + } + + pub fn get_ranges_request_count(&self, path: &Path) -> usize { + self.get_ranges.read().get(path).copied().unwrap_or(0) + } + + pub fn head_request_count(&self, path: &Path) -> usize { + self.head.read().get(path).copied().unwrap_or(0) + } +} + +impl std::fmt::Display for RequestCountedObjectStore { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "TestObjectStore({})", self.inner) + } +} + +#[async_trait] +impl ObjectStore for RequestCountedObjectStore { + async fn put(&self, location: &Path, bytes: PutPayload) -> object_store::Result { + self.inner.put(location, bytes).await + } + + async fn put_opts( + &self, + location: &Path, + bytes: PutPayload, + opts: PutOptions, + ) -> object_store::Result { + self.inner.put_opts(location, bytes, opts).await + } + + async fn put_multipart( + &self, + location: &Path, + ) -> object_store::Result> { + self.inner.put_multipart(location).await + } + + async fn put_multipart_opts( + &self, + location: &Path, + opts: PutMultipartOpts, + ) -> object_store::Result> { + self.inner.put_multipart_opts(location, opts).await + } + + async fn get(&self, location: &Path) -> object_store::Result { + *self.get.write().entry(location.clone()).or_insert(0) += 1; + self.inner.get(location).await + } + + async fn get_opts( + &self, + location: &Path, + options: GetOptions, + ) -> object_store::Result { + *self.get_opts.write().entry(location.clone()).or_insert(0) += 1; + self.inner.get_opts(location, options).await + } + + async fn get_range(&self, location: &Path, range: Range) -> object_store::Result { + *self.get_range.write().entry(location.clone()).or_insert(0) += 1; + self.inner.get_range(location, range).await + } + + async fn get_ranges( + &self, + location: &Path, + ranges: &[Range], + ) -> object_store::Result> { + *self.get_ranges.write().entry(location.clone()).or_insert(0) += 1; + self.inner.get_ranges(location, ranges).await + } + + async fn head(&self, location: &Path) -> object_store::Result { + *self.head.write().entry(location.clone()).or_insert(0) += 1; + self.inner.head(location).await + } + + /// Delete an object on object store, but also remove it from the cache. + async fn delete(&self, location: &Path) -> object_store::Result<()> { + self.inner.delete(location).await + } + + fn delete_stream<'a>( + &'a self, + locations: BoxStream<'a, object_store::Result>, + ) -> BoxStream<'a, object_store::Result> { + self.inner.delete_stream(locations) + } + + fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, object_store::Result> { + self.inner.list(prefix) + } + + fn list_with_offset( + &self, + prefix: Option<&Path>, + offset: &Path, + ) -> BoxStream<'_, object_store::Result> { + self.inner.list_with_offset(prefix, offset) + } + + async fn list_with_delimiter(&self, prefix: Option<&Path>) -> object_store::Result { + self.inner.list_with_delimiter(prefix).await + } + + async fn copy(&self, from: &Path, to: &Path) -> object_store::Result<()> { + self.inner.copy(from, to).await + } + + async fn rename(&self, from: &Path, to: &Path) -> object_store::Result<()> { + self.inner.rename(from, to).await + } + + async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> object_store::Result<()> { + self.inner.copy_if_not_exists(from, to).await + } + + async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> object_store::Result<()> { + self.inner.rename_if_not_exists(from, to).await + } +} + +/// A wrapper around an inner object store that can hold execution of certain object store methods +/// to synchronize other processes before the request is forwarded to the inner object store +/// +/// # Example +/// ```ignore +/// // set up notifiers: +/// let to_store_notify = Arc::new(Notify::new()); +/// let from_store_notify = Arc::new(Notify::new()); +/// +/// // create the synchronized store wrapping an in-memory object store: +/// let inner_store = Arc::new( +/// SynchronizedObjectStore::new(Arc::new(InMemory::new())) +/// .with_notifies(Arc::clone(&to_store_notify), Arc::clone(&from_store_notify)), +/// ); +/// +/// // we are in the middle of a get request once this call to notified wakes: +/// let _ = from_store_notify.notified().await; +/// +/// // spawn a thread to wake the in-flight get request: +/// let h = tokio::spawn(async move { +/// to_store_notify.notify_one(); +/// let _ = notifier_rx.await; +/// }); +/// ``` +#[derive(Debug)] +pub struct SynchronizedObjectStore { + inner: Arc, + get_notifies: Option<(Arc, Arc)>, +} + +impl SynchronizedObjectStore { + pub fn new(inner: Arc) -> Self { + Self { + inner, + get_notifies: None, + } + } + + /// Add notifiers for `get` requests so that async execution can be halted to synchronize other + /// processes before the request is forwarded to the inner object store. + pub fn with_get_notifies(mut self, inbound: Arc, outbound: Arc) -> Self { + self.get_notifies = Some((inbound, outbound)); + self + } +} + +impl std::fmt::Display for SynchronizedObjectStore { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "TestObjectStore({})", self.inner) + } +} + +#[async_trait] +impl ObjectStore for SynchronizedObjectStore { + async fn put(&self, location: &Path, bytes: PutPayload) -> object_store::Result { + self.inner.put(location, bytes).await + } + + async fn put_opts( + &self, + location: &Path, + bytes: PutPayload, + opts: PutOptions, + ) -> object_store::Result { + self.inner.put_opts(location, bytes, opts).await + } + + async fn put_multipart( + &self, + location: &Path, + ) -> object_store::Result> { + self.inner.put_multipart(location).await + } + + async fn put_multipart_opts( + &self, + location: &Path, + opts: PutMultipartOpts, + ) -> object_store::Result> { + self.inner.put_multipart_opts(location, opts).await + } + + async fn get(&self, location: &Path) -> object_store::Result { + if let Some((inbound, outbound)) = &self.get_notifies { + outbound.notify_one(); + inbound.notified().await; + } + self.inner.get(location).await + } + + async fn get_opts( + &self, + location: &Path, + options: GetOptions, + ) -> object_store::Result { + self.inner.get_opts(location, options).await + } + + async fn get_range(&self, location: &Path, range: Range) -> object_store::Result { + self.inner.get_range(location, range).await + } + + async fn get_ranges( + &self, + location: &Path, + ranges: &[Range], + ) -> object_store::Result> { + self.inner.get_ranges(location, ranges).await + } + + async fn head(&self, location: &Path) -> object_store::Result { + self.inner.head(location).await + } + + /// Delete an object on object store, but also remove it from the cache. + async fn delete(&self, location: &Path) -> object_store::Result<()> { + self.inner.delete(location).await + } + + fn delete_stream<'a>( + &'a self, + locations: BoxStream<'a, object_store::Result>, + ) -> BoxStream<'a, object_store::Result> { + self.inner.delete_stream(locations) + } + + fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, object_store::Result> { + self.inner.list(prefix) + } + + fn list_with_offset( + &self, + prefix: Option<&Path>, + offset: &Path, + ) -> BoxStream<'_, object_store::Result> { + self.inner.list_with_offset(prefix, offset) + } + + async fn list_with_delimiter(&self, prefix: Option<&Path>) -> object_store::Result { + self.inner.list_with_delimiter(prefix).await + } + + async fn copy(&self, from: &Path, to: &Path) -> object_store::Result<()> { + self.inner.copy(from, to).await + } + + async fn rename(&self, from: &Path, to: &Path) -> object_store::Result<()> { + self.inner.rename(from, to).await + } + + async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> object_store::Result<()> { + self.inner.copy_if_not_exists(from, to).await + } + + async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> object_store::Result<()> { + self.inner.rename_if_not_exists(from, to).await + } +} diff --git a/influxdb3_write/Cargo.toml b/influxdb3_write/Cargo.toml index 9fb21822ff6..a7abb6dc8c9 100644 --- a/influxdb3_write/Cargo.toml +++ b/influxdb3_write/Cargo.toml @@ -21,6 +21,7 @@ schema.workspace = true # Local deps influxdb3_catalog = { path = "../influxdb3_catalog" } influxdb3_id = { path = "../influxdb3_id" } +influxdb3_test_helpers = { path = "../influxdb3_test_helpers" } influxdb3_wal = { path = "../influxdb3_wal" } # crates.io dependencies diff --git a/influxdb3_write/src/parquet_cache/mod.rs b/influxdb3_write/src/parquet_cache/mod.rs index 57e374e4171..55ab26f6840 100644 --- a/influxdb3_write/src/parquet_cache/mod.rs +++ b/influxdb3_write/src/parquet_cache/mod.rs @@ -677,19 +677,15 @@ fn background_cache_pruner( #[cfg(test)] pub(crate) mod tests { - use std::{ops::Range, sync::Arc, time::Duration}; + use std::{sync::Arc, time::Duration}; use arrow::datatypes::ToByteSlice; - use async_trait::async_trait; - use bytes::Bytes; - use futures::stream::BoxStream; - use hashbrown::HashMap; - use iox_time::{MockProvider, Time, TimeProvider}; - use object_store::{ - memory::InMemory, path::Path, GetOptions, GetResult, ListResult, MultipartUpload, - ObjectMeta, ObjectStore, PutMultipartOpts, PutOptions, PutPayload, PutResult, + use influxdb3_test_helpers::object_store::{ + RequestCountedObjectStore, SynchronizedObjectStore, }; - use parking_lot::RwLock; + use iox_time::{MockProvider, Time, TimeProvider}; + use object_store::{memory::InMemory, path::Path, ObjectStore, PutPayload}; + use pretty_assertions::assert_eq; use tokio::sync::Notify; @@ -716,7 +712,7 @@ pub(crate) mod tests { #[tokio::test] async fn hit_cache_instead_of_object_store() { // set up the inner test object store and then wrap it with the mem cached store: - let inner_store = Arc::new(TestObjectStore::new(Arc::new(InMemory::new()))); + let inner_store = Arc::new(RequestCountedObjectStore::new(Arc::new(InMemory::new()))); let time_provider: Arc = Arc::new(MockProvider::new(Time::from_timestamp_nanos(0))); let (cached_store, oracle) = test_cached_obj_store_and_oracle( @@ -733,8 +729,7 @@ pub(crate) mod tests { // GET the payload from the object store before caching: assert_payload_at_equals!(cached_store, payload, path); - assert_eq!(1, inner_store.total_get_request_count()); - assert_eq!(1, inner_store.get_request_count(&path)); + assert_eq!(1, inner_store.total_read_request_count(&path)); // cache the entry: let (cache_request, notifier_rx) = CacheRequest::create(path.clone()); @@ -744,21 +739,19 @@ pub(crate) mod tests { let _ = notifier_rx.await; // another request to inner store should have been made: - assert_eq!(2, inner_store.total_get_request_count()); - assert_eq!(2, inner_store.get_request_count(&path)); + assert_eq!(2, inner_store.total_read_request_count(&path)); // get the payload from the outer store again: assert_payload_at_equals!(cached_store, payload, path); // should hit the cache this time, so the inner store should not have been hit, and counts // should therefore be same as previous: - assert_eq!(2, inner_store.total_get_request_count()); - assert_eq!(2, inner_store.get_request_count(&path)); + assert_eq!(2, inner_store.total_read_request_count(&path)); } #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn cache_evicts_lru_when_full() { - let inner_store = Arc::new(TestObjectStore::new(Arc::new(InMemory::new()))); + let inner_store = Arc::new(RequestCountedObjectStore::new(Arc::new(InMemory::new()))); let time_provider = Arc::new(MockProvider::new(Time::from_timestamp_nanos(0))); // these are magic numbers that will make it so the third entry exceeds the cache capacity: let cache_capacity_bytes = 60; @@ -784,8 +777,7 @@ pub(crate) mod tests { oracle.register(cache_request); let _ = notifier_rx.await; // there will have been one get request made by the cache oracle: - assert_eq!(1, inner_store.total_get_request_count()); - assert_eq!(1, inner_store.get_request_count(&path_1)); + assert_eq!(1, inner_store.total_read_request_count(&path_1)); // update time: time_provider.set(Time::from_timestamp_nanos(1)); @@ -793,8 +785,7 @@ pub(crate) mod tests { // GET the entry to check its there and was retrieved from cache, i.e., that the request // counts do not change: assert_payload_at_equals!(cached_store, payload_1, path_1); - assert_eq!(1, inner_store.total_get_request_count()); - assert_eq!(1, inner_store.get_request_count(&path_1)); + assert_eq!(1, inner_store.total_read_request_count(&path_1)); // PUT a second entry into the store: let path_2 = Path::from("1.parquet"); @@ -813,9 +804,8 @@ pub(crate) mod tests { oracle.register(cache_request); let _ = notifier_rx.await; // will have another request for the second path to the inner store, by the oracle: - assert_eq!(2, inner_store.total_get_request_count()); - assert_eq!(1, inner_store.get_request_count(&path_1)); - assert_eq!(1, inner_store.get_request_count(&path_2)); + assert_eq!(1, inner_store.total_read_request_count(&path_1)); + assert_eq!(1, inner_store.total_read_request_count(&path_2)); // update time: time_provider.set(Time::from_timestamp_nanos(3)); @@ -823,9 +813,8 @@ pub(crate) mod tests { // GET the second entry and assert that it was retrieved from the cache, i.e., that the // request counts do not change: assert_payload_at_equals!(cached_store, payload_2, path_2); - assert_eq!(2, inner_store.total_get_request_count()); - assert_eq!(1, inner_store.get_request_count(&path_1)); - assert_eq!(1, inner_store.get_request_count(&path_2)); + assert_eq!(1, inner_store.total_read_request_count(&path_1)); + assert_eq!(1, inner_store.total_read_request_count(&path_2)); // update time: time_provider.set(Time::from_timestamp_nanos(4)); @@ -834,8 +823,8 @@ pub(crate) mod tests { // will also update the hit count so that the first entry (janeway) was used more recently // than the second entry (paris): assert_payload_at_equals!(cached_store, payload_1, path_1); - assert_eq!(2, inner_store.total_get_request_count()); - assert_eq!(1, inner_store.get_request_count(&path_1)); + assert_eq!(1, inner_store.total_read_request_count(&path_1)); + assert_eq!(1, inner_store.total_read_request_count(&path_2)); // PUT a third entry into the store: let path_3 = Path::from("2.parquet"); @@ -854,20 +843,18 @@ pub(crate) mod tests { oracle.register(cache_request); let _ = notifier_rx.await; // will now have another request for the third path to the inner store, by the oracle: - assert_eq!(3, inner_store.total_get_request_count()); - assert_eq!(1, inner_store.get_request_count(&path_1)); - assert_eq!(1, inner_store.get_request_count(&path_2)); - assert_eq!(1, inner_store.get_request_count(&path_3)); + assert_eq!(1, inner_store.total_read_request_count(&path_1)); + assert_eq!(1, inner_store.total_read_request_count(&path_2)); + assert_eq!(1, inner_store.total_read_request_count(&path_3)); // update time: time_provider.set(Time::from_timestamp_nanos(6)); // GET the new entry from the strore, and check that it was served by the cache: assert_payload_at_equals!(cached_store, payload_3, path_3); - assert_eq!(3, inner_store.total_get_request_count()); - assert_eq!(1, inner_store.get_request_count(&path_1)); - assert_eq!(1, inner_store.get_request_count(&path_2)); - assert_eq!(1, inner_store.get_request_count(&path_3)); + assert_eq!(1, inner_store.total_read_request_count(&path_1)); + assert_eq!(1, inner_store.total_read_request_count(&path_2)); + assert_eq!(1, inner_store.total_read_request_count(&path_3)); // allow some time for pruning: tokio::time::sleep(Duration::from_millis(500)).await; @@ -875,20 +862,21 @@ pub(crate) mod tests { // GET paris from the cached store, this will not be served by the cache, because paris was // evicted by neelix: assert_payload_at_equals!(cached_store, payload_2, path_2); - assert_eq!(4, inner_store.total_get_request_count()); - assert_eq!(1, inner_store.get_request_count(&path_1)); - assert_eq!(2, inner_store.get_request_count(&path_2)); - assert_eq!(1, inner_store.get_request_count(&path_3)); + assert_eq!(1, inner_store.total_read_request_count(&path_1)); + assert_eq!(2, inner_store.total_read_request_count(&path_2)); + assert_eq!(1, inner_store.total_read_request_count(&path_3)); } #[tokio::test] async fn cache_hit_while_fetching() { - // Create a test store with a barrier: + // Create the object store with the following layers: + // Synchronized -> RequestCounted -> Inner let to_store_notify = Arc::new(Notify::new()); let from_store_notify = Arc::new(Notify::new()); + let counter = Arc::new(RequestCountedObjectStore::new(Arc::new(InMemory::new()))); let inner_store = Arc::new( - TestObjectStore::new(Arc::new(InMemory::new())) - .with_notifies(Arc::clone(&to_store_notify), Arc::clone(&from_store_notify)), + SynchronizedObjectStore::new(Arc::clone(&counter) as _) + .with_get_notifies(Arc::clone(&to_store_notify), Arc::clone(&from_store_notify)), ); let time_provider = Arc::new(MockProvider::new(Time::from_timestamp_nanos(0))); let (cached_store, oracle) = test_cached_obj_store_and_oracle( @@ -928,193 +916,10 @@ pub(crate) mod tests { h.await.unwrap(); // there should only have been one request made, i.e., from the cache oracle: - assert_eq!(1, inner_store.total_get_request_count()); - assert_eq!(1, inner_store.get_request_count(&path)); + assert_eq!(1, counter.total_read_request_count(&path)); // make another request to the store, to be sure that it is in the cache: assert_payload_at_equals!(cached_store, payload, path); - assert_eq!(1, inner_store.total_get_request_count()); - assert_eq!(1, inner_store.get_request_count(&path)); - } - - type RequestCounter = RwLock>; - - #[derive(Debug)] - pub(crate) struct TestObjectStore { - inner: Arc, - get: RequestCounter, - get_opts: RequestCounter, - get_range: RequestCounter, - get_ranges: RequestCounter, - head: RequestCounter, - notifies: Option<(Arc, Arc)>, - } - - impl TestObjectStore { - pub(crate) fn new(inner: Arc) -> Self { - Self { - inner, - get: Default::default(), - get_opts: Default::default(), - get_range: Default::default(), - get_ranges: Default::default(), - head: Default::default(), - notifies: None, - } - } - - fn with_notifies(mut self, inbound: Arc, outbound: Arc) -> Self { - self.notifies = Some((inbound, outbound)); - self - } - - pub(crate) fn total_get_request_count(&self) -> usize { - self.get.read().iter().map(|(_, size)| size).sum() - } - - pub(crate) fn get_request_count(&self, path: &Path) -> usize { - self.get.read().get(path).copied().unwrap_or(0) - } - - pub(crate) fn get_opts_request_count(&self, path: &Path) -> usize { - self.get_opts.read().get(path).copied().unwrap_or(0) - } - - pub(crate) fn get_range_request_count(&self, path: &Path) -> usize { - self.get_range.read().get(path).copied().unwrap_or(0) - } - - pub(crate) fn get_ranges_request_count(&self, path: &Path) -> usize { - self.get_ranges.read().get(path).copied().unwrap_or(0) - } - - pub(crate) fn head_request_count(&self, path: &Path) -> usize { - self.head.read().get(path).copied().unwrap_or(0) - } - } - - impl std::fmt::Display for TestObjectStore { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "TestObjectStore({})", self.inner) - } - } - - #[async_trait] - impl ObjectStore for TestObjectStore { - async fn put(&self, location: &Path, bytes: PutPayload) -> object_store::Result { - self.inner.put(location, bytes).await - } - - async fn put_opts( - &self, - location: &Path, - bytes: PutPayload, - opts: PutOptions, - ) -> object_store::Result { - self.inner.put_opts(location, bytes, opts).await - } - - async fn put_multipart( - &self, - location: &Path, - ) -> object_store::Result> { - self.inner.put_multipart(location).await - } - - async fn put_multipart_opts( - &self, - location: &Path, - opts: PutMultipartOpts, - ) -> object_store::Result> { - self.inner.put_multipart_opts(location, opts).await - } - - async fn get(&self, location: &Path) -> object_store::Result { - *self.get.write().entry(location.clone()).or_insert(0) += 1; - if let Some((inbound, outbound)) = &self.notifies { - outbound.notify_one(); - inbound.notified().await; - } - self.inner.get(location).await - } - - async fn get_opts( - &self, - location: &Path, - options: GetOptions, - ) -> object_store::Result { - *self.get_opts.write().entry(location.clone()).or_insert(0) += 1; - self.inner.get_opts(location, options).await - } - - async fn get_range( - &self, - location: &Path, - range: Range, - ) -> object_store::Result { - *self.get_range.write().entry(location.clone()).or_insert(0) += 1; - self.inner.get_range(location, range).await - } - - async fn get_ranges( - &self, - location: &Path, - ranges: &[Range], - ) -> object_store::Result> { - *self.get_ranges.write().entry(location.clone()).or_insert(0) += 1; - self.inner.get_ranges(location, ranges).await - } - - async fn head(&self, location: &Path) -> object_store::Result { - *self.head.write().entry(location.clone()).or_insert(0) += 1; - self.inner.head(location).await - } - - /// Delete an object on object store, but also remove it from the cache. - async fn delete(&self, location: &Path) -> object_store::Result<()> { - self.inner.delete(location).await - } - - fn delete_stream<'a>( - &'a self, - locations: BoxStream<'a, object_store::Result>, - ) -> BoxStream<'a, object_store::Result> { - self.inner.delete_stream(locations) - } - - fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, object_store::Result> { - self.inner.list(prefix) - } - - fn list_with_offset( - &self, - prefix: Option<&Path>, - offset: &Path, - ) -> BoxStream<'_, object_store::Result> { - self.inner.list_with_offset(prefix, offset) - } - - async fn list_with_delimiter( - &self, - prefix: Option<&Path>, - ) -> object_store::Result { - self.inner.list_with_delimiter(prefix).await - } - - async fn copy(&self, from: &Path, to: &Path) -> object_store::Result<()> { - self.inner.copy(from, to).await - } - - async fn rename(&self, from: &Path, to: &Path) -> object_store::Result<()> { - self.inner.rename(from, to).await - } - - async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> object_store::Result<()> { - self.inner.copy_if_not_exists(from, to).await - } - - async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> object_store::Result<()> { - self.inner.rename_if_not_exists(from, to).await - } + assert_eq!(1, counter.total_read_request_count(&path)); } } diff --git a/influxdb3_write/src/write_buffer/mod.rs b/influxdb3_write/src/write_buffer/mod.rs index b046eb75ff7..d07e19abef1 100644 --- a/influxdb3_write/src/write_buffer/mod.rs +++ b/influxdb3_write/src/write_buffer/mod.rs @@ -526,7 +526,6 @@ impl WriteBuffer for WriteBufferImpl {} mod tests { use super::*; use crate::parquet_cache::test_cached_obj_store_and_oracle; - use crate::parquet_cache::tests::TestObjectStore; use crate::paths::{CatalogFilePath, SnapshotInfoFilePath}; use crate::persister::Persister; use crate::PersistedSnapshot; @@ -537,6 +536,7 @@ mod tests { use futures_util::StreamExt; use influxdb3_catalog::catalog::SequenceNumber; use influxdb3_id::DbId; + use influxdb3_test_helpers::object_store::RequestCountedObjectStore; use influxdb3_wal::{Gen1Duration, SnapshotSequenceNumber, WalFileSequenceNumber}; use iox_query::exec::IOxSessionContext; use iox_time::{MockProvider, Time}; @@ -1580,7 +1580,7 @@ mod tests { async fn test_parquet_cache() { // set up a write buffer using a TestObjectStore so we can spy on requests that get // through to the object store for parquet files: - let test_store = Arc::new(TestObjectStore::new(Arc::new(InMemory::new()))); + let test_store = Arc::new(RequestCountedObjectStore::new(Arc::new(InMemory::new()))); let obj_store: Arc = Arc::clone(&test_store) as _; let (wbuf, ctx) = setup_cache_optional( Time::from_timestamp_nanos(0), @@ -1684,7 +1684,7 @@ mod tests { async fn test_no_parquet_cache() { // set up a write buffer using a TestObjectStore so we can spy on requests that get // through to the object store for parquet files: - let test_store = Arc::new(TestObjectStore::new(Arc::new(InMemory::new()))); + let test_store = Arc::new(RequestCountedObjectStore::new(Arc::new(InMemory::new()))); let obj_store: Arc = Arc::clone(&test_store) as _; let (wbuf, ctx) = setup_cache_optional( Time::from_timestamp_nanos(0),