Skip to content

Commit

Permalink
chore(ci): flaky test (#16207)
Browse files Browse the repository at this point in the history
flaky test
  • Loading branch information
zhyass authored Aug 20, 2024
1 parent 7dc8a1a commit 28670ef
Show file tree
Hide file tree
Showing 11 changed files with 86 additions and 186 deletions.
29 changes: 8 additions & 21 deletions src/meta/api/src/schema_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3814,11 +3814,9 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
);

if succ {
break;
return Ok(CreateLockRevReply { revision });
}
}

Ok(CreateLockRevReply { revision })
}

#[logcall::logcall]
Expand All @@ -3842,7 +3840,12 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
let (tb_meta_seq, _) = get_table_by_id_or_err(self, &tbid, ctx).await?;

let (lock_seq, lock_meta_opt): (_, Option<LockMeta>) = get_pb_value(self, &key).await?;
table_lock_has_to_exist(lock_seq, table_id, ctx)?;
if lock_seq == 0 || lock_meta_opt.is_none() {
return Err(KVAppError::AppError(AppError::TableLockExpired(
TableLockExpired::new(table_id, ctx),
)));
}

let mut lock_meta = lock_meta_opt.unwrap();
// Set `acquire_lock = true` to initialize `acquired_on` when the
// first time this lock is acquired. Before the lock is
Expand Down Expand Up @@ -3879,10 +3882,9 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
);

if succ {
break;
return Ok(());
}
}
Ok(())
}

#[logcall::logcall]
Expand Down Expand Up @@ -5904,21 +5906,6 @@ async fn update_mask_policy(
Ok(())
}

/// Return OK if a table lock exists by checking the seq.
///
/// Otherwise returns TableLockExpired error
fn table_lock_has_to_exist(seq: u64, table_id: u64, msg: impl Display) -> Result<(), KVAppError> {
if seq == 0 {
debug!(seq = seq, table_id = table_id; "table lock does not exist");

Err(KVAppError::AppError(AppError::TableLockExpired(
TableLockExpired::new(table_id, format!("{}: {}", msg, table_id)),
)))
} else {
Ok(())
}
}

#[tonic::async_trait]
pub(crate) trait UndropTableStrategy {
fn table_name_ident(&self) -> &TableNameIdent;
Expand Down
6 changes: 6 additions & 0 deletions src/meta/app/src/schema/lock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,12 @@ impl LockKey {
}
}

pub fn get_tenant(&self) -> &Tenant {
match self {
LockKey::Table { tenant, .. } => tenant,
}
}

pub fn get_extra_info(&self) -> BTreeMap<String, String> {
match self {
LockKey::Table { .. } => BTreeMap::new(),
Expand Down
6 changes: 0 additions & 6 deletions src/query/catalog/src/lock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,6 @@ pub enum LockTableOption {
pub trait Lock: Sync + Send {
fn lock_type(&self) -> LockType;

fn get_catalog(&self) -> &str;

fn get_table_id(&self) -> u64;

fn tenant_name(&self) -> &str;

async fn try_lock(
&self,
ctx: Arc<dyn TableContext>,
Expand Down
68 changes: 0 additions & 68 deletions src/query/service/src/locks/lock_ext.rs

This file was deleted.

36 changes: 13 additions & 23 deletions src/query/service/src/locks/lock_holder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,13 @@ use databend_common_base::base::tokio::time::sleep;
use databend_common_base::runtime::GlobalIORuntime;
use databend_common_base::runtime::TrySpawn;
use databend_common_catalog::catalog::Catalog;
use databend_common_catalog::lock::Lock;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_meta_app::schema::CreateLockRevReq;
use databend_common_meta_app::schema::DeleteLockRevReq;
use databend_common_meta_app::schema::ExtendLockRevReq;
use databend_common_meta_app::schema::LockKey;
use databend_common_meta_app::tenant::Tenant;
use databend_common_metrics::lock::record_created_lock_nums;
use databend_common_storages_fuse::operations::set_backoff;
use fastrace::func_name;
use futures::future::select;
use futures::future::Either;
use rand::thread_rng;
Expand All @@ -48,34 +46,26 @@ pub struct LockHolder {

impl LockHolder {
#[async_backtrace::framed]
pub async fn start<T: Lock + ?Sized>(
pub async fn start(
self: &Arc<Self>,
query_id: String,
catalog: Arc<dyn Catalog>,
lock: &T,
revision: u64,
expire_secs: u64,
) -> Result<()> {
req: CreateLockRevReq,
) -> Result<u64> {
let lock_key = req.lock_key.clone();
let expire_secs = req.expire_secs;
let sleep_range = (expire_secs * 1000 / 3)..=(expire_secs * 1000 * 2 / 3);

let tenant_name = lock.tenant_name();
let tenant = Tenant::new_or_err(tenant_name, func_name!())?;
let lock_key = LockKey::Table {
tenant: tenant.clone(),
table_id: lock.get_table_id(),
};
// get a new table lock revision.
let res = catalog.create_lock_revision(req).await?;
let revision = res.revision;
// metrics.
record_created_lock_nums(lock_key.lock_type().to_string(), lock_key.get_table_id(), 1);

let delete_table_lock_req = DeleteLockRevReq::new(lock_key.clone(), revision);
let extend_table_lock_req =
ExtendLockRevReq::new(lock_key.clone(), revision, expire_secs, false);

self.try_extend_lock(
catalog.clone(),
extend_table_lock_req.clone(),
Some(Duration::from_millis(expire_secs * 1000)),
)
.await?;

GlobalIORuntime::instance().spawn({
let self_clone = self.clone();
async move {
Expand Down Expand Up @@ -122,7 +112,7 @@ impl LockHolder {
}
});

Ok(())
Ok(revision)
}

pub fn shutdown(&self) {
Expand Down
76 changes: 37 additions & 39 deletions src/query/service/src/locks/lock_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;

use databend_common_base::base::tokio::sync::mpsc;
use databend_common_base::base::tokio::time::timeout;
Expand All @@ -32,18 +33,15 @@ use databend_common_meta_app::schema::ListLockRevReq;
use databend_common_meta_app::schema::LockKey;
use databend_common_meta_app::schema::TableInfo;
use databend_common_meta_app::schema::TableLockIdent;
use databend_common_meta_app::tenant::Tenant;
use databend_common_meta_kvapi::kvapi::Key;
use databend_common_meta_types::protobuf::watch_request::FilterType;
use databend_common_meta_types::protobuf::WatchRequest;
use databend_common_metrics::lock::metrics_inc_shutdown_lock_holder_nums;
use databend_common_metrics::lock::metrics_inc_start_lock_holder_nums;
use databend_common_metrics::lock::record_acquired_lock_nums;
use databend_common_metrics::lock::record_created_lock_nums;
use databend_common_pipeline_core::LockGuard;
use databend_common_pipeline_core::UnlockApi;
use databend_common_users::UserApiProvider;
use fastrace::func_name;
use futures_util::StreamExt;
use parking_lot::RwLock;

Expand Down Expand Up @@ -91,39 +89,32 @@ impl LockManager {
/// NOTICE: the lock holder is not 100% reliable.
/// E.g., there is a very small probability of failure in extending or deleting the lock.
#[async_backtrace::framed]
pub async fn try_lock<T: Lock + ?Sized>(
pub async fn try_lock(
self: &Arc<Self>,
ctx: Arc<dyn TableContext>,
lock: &T,
lock_key: LockKey,
catalog_name: &str,
should_retry: bool,
) -> Result<Option<Arc<LockGuard>>> {
let user = ctx.get_current_user()?.name;
let node = ctx.get_cluster().local_id.clone();
let query_id = ctx.get_current_session_id();
let expire_secs = ctx.get_settings().get_table_lock_expire_secs()?;

let catalog = ctx.get_catalog(lock.get_catalog()).await?;

let tenant_name = lock.tenant_name();
let tenant = Tenant::new_or_err(tenant_name, func_name!())?;
let table_id = lock.get_table_id();
let lock_key = LockKey::Table {
tenant: tenant.clone(),
table_id,
};
let start = Instant::now();

let req = CreateLockRevReq::new(lock_key.clone(), user, node, query_id, expire_secs);
let lock_type = lock_key.lock_type().to_string();
let table_id = lock_key.get_table_id();
let tenant = lock_key.get_tenant();
let expire_secs = ctx.get_settings().get_table_lock_expire_secs()?;
let query_id = ctx.get_id();
let req = CreateLockRevReq::new(
lock_key.clone(),
ctx.get_current_user()?.name, // user
ctx.get_cluster().local_id.clone(), // node
query_id.clone(), // query_id
expire_secs,
);

// get a new table lock revision.
let res = catalog.create_lock_revision(req).await?;
let revision = res.revision;
// metrics.
record_created_lock_nums(lock.lock_type().to_string(), table_id, 1);
let catalog = ctx.get_catalog(catalog_name).await?;

let lock_holder = Arc::new(LockHolder::default());
lock_holder
.start(ctx.get_id(), catalog.clone(), lock, revision, expire_secs)
.await?;
let revision = lock_holder.start(query_id, catalog.clone(), req).await?;

self.insert_lock(revision, lock_holder);
let guard = LockGuard::new(self.clone(), revision);
Expand All @@ -141,10 +132,15 @@ impl LockManager {
let reply = catalog
.list_lock_revisions(list_table_lock_req.clone())
.await?;
let position = reply.iter().position(|(x, _)| *x == revision).ok_or_else(||
let rev_list = reply.into_iter().map(|(x, _)| x).collect::<Vec<_>>();
let position = rev_list.iter().position(|x| *x == revision).ok_or_else(||
// If the current is not found in list, it means that the current has been expired.
ErrorCode::TableLockExpired("the acquired table lock has been expired".to_string()),
)?;
ErrorCode::TableLockExpired(format!(
"the acquired table lock with revision '{}' is not in {:?}, maybe expired(elapsed: {:?})",
revision,
rev_list,
start.elapsed(),
)))?;

if position == 0 {
// The lock is acquired by current session.
Expand All @@ -153,7 +149,7 @@ impl LockManager {

catalog.extend_lock_revision(extend_table_lock_req).await?;
// metrics.
record_acquired_lock_nums(lock.lock_type().to_string(), table_id, 1);
record_acquired_lock_nums(lock_type, table_id, 1);
break;
}

Expand All @@ -162,12 +158,13 @@ impl LockManager {
catalog
.delete_lock_revision(delete_table_lock_req.clone())
.await?;
return Err(ErrorCode::TableAlreadyLocked(
"table is locked by other session, please retry later".to_string(),
));
return Err(ErrorCode::TableAlreadyLocked(format!(
"table is locked by other session, please retry later(elapsed: {:?})",
start.elapsed()
)));
}

let watch_delete_ident = TableLockIdent::new(&tenant, table_id, reply[position - 1].0);
let watch_delete_ident = TableLockIdent::new(tenant, table_id, rev_list[position - 1]);

// Get the previous revision, watch the delete event.
let req = WatchRequest {
Expand All @@ -193,9 +190,10 @@ impl LockManager {
catalog
.delete_lock_revision(delete_table_lock_req.clone())
.await?;
Err(ErrorCode::TableAlreadyLocked(
"table is locked by other session, please retry later".to_string(),
))
Err(ErrorCode::TableAlreadyLocked(format!(
"table is locked by other session, please retry later(elapsed: {:?})",
start.elapsed()
)))
}
}?;
}
Expand Down
2 changes: 0 additions & 2 deletions src/query/service/src/locks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

mod lock_ext;
mod lock_holder;
mod lock_manager;
mod table_lock;

pub use lock_ext::LockExt;
pub use lock_manager::LockManager;
Loading

0 comments on commit 28670ef

Please sign in to comment.