Skip to content

Commit

Permalink
refactor: cleanup logic to get tables to vacuum (#16450)
Browse files Browse the repository at this point in the history
- Replace `None` limit with `usize::MAX` to simplify capacity checking
  logic.

- Replace `None` retention time with `DateTime::MAX_UTC`.

- Rename `TableInfoFilter` variants to reflect its purpose more
  precisely.
  • Loading branch information
drmingdrmer authored Sep 13, 2024
1 parent be0597b commit ab5af93
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 106 deletions.
143 changes: 56 additions & 87 deletions src/meta/api/src/schema_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
// limitations under the License.

use std::any::type_name;
use std::cmp::min;
use std::collections::BTreeMap;
use std::collections::BTreeSet;
use std::collections::HashMap;
Expand Down Expand Up @@ -2812,7 +2811,9 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
) -> Result<ListDroppedTableResp, KVAppError> {
debug!(req :? =(&req); "SchemaApi: {}", func_name!());

if let TableInfoFilter::AllDroppedTables(filter_drop_on) = &req.filter {
let the_limit = req.limit.unwrap_or(usize::MAX);

if let TableInfoFilter::DroppedTableOrDroppedDatabase(retention_boundary) = &req.filter {
let db_infos = self
.get_database_history(ListDatabaseReq {
tenant: req.inner.tenant().clone(),
Expand All @@ -2821,53 +2822,42 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
})
.await?;

let mut drop_table_infos = vec![];
let mut drop_ids = vec![];
let mut vacuum_table_infos = vec![];
let mut vacuum_ids = vec![];

for db_info in db_infos {
let mut drop_db = false;
let filter = match db_info.meta.drop_on {
Some(db_drop_on) => {
if let Some(filter_drop_on) = filter_drop_on {
if db_drop_on.timestamp() <= filter_drop_on.timestamp() {
// if db drop on before filter time, then get all the db tables.
drop_db = true;
TableInfoFilter::All
} else {
// else get all the db tables drop on before filter time.
TableInfoFilter::Dropped(Some(*filter_drop_on))
}
} else {
// while filter_drop_on is None, then get all the drop db tables
drop_db = true;
TableInfoFilter::All
}
}
None => {
// not drop db, only filter drop tables with filter drop on
TableInfoFilter::Dropped(*filter_drop_on)
}
};
if vacuum_table_infos.len() >= the_limit {
return Ok(ListDroppedTableResp {
drop_table_infos: vacuum_table_infos,
drop_ids: vacuum_ids,
});
}

let db_filter = (filter, db_info.clone());
// If boundary is None, it means choose all tables.
// Thus, we just choose a very large time.
let boundary = retention_boundary.unwrap_or(DateTime::<Utc>::MAX_UTC);

let left_num = if let Some(limit) = req.limit {
if drop_table_infos.len() >= limit {
return Ok(ListDroppedTableResp {
drop_table_infos,
drop_ids,
});
}
Some(limit - drop_table_infos.len())
let vacuum_db = {
let drop_on = db_info.meta.drop_on;
drop_on.is_some() && drop_on <= Some(boundary)
};

// If to vacuum a db, just vacuum all tables.
// Otherwise, choose only dropped tables(before retention time).
let filter = if vacuum_db {
TableInfoFilter::All
} else {
None
TableInfoFilter::DroppedTables(*retention_boundary)
};

let table_infos = do_get_table_history(self, db_filter, left_num).await?;
let take_num = left_num.unwrap_or(usize::MAX);
let db_filter = (filter, db_info.clone());

let capacity = the_limit - vacuum_table_infos.len();
let table_infos = do_get_table_history(self, db_filter, capacity).await?;

// A DB can be removed only when all its tables are removed.
if drop_db && take_num > table_infos.len() {
drop_ids.push(DroppedId::Db {
if vacuum_db && capacity >= table_infos.len() {
vacuum_ids.push(DroppedId::Db {
db_id: db_info.database_id.db_id,
db_name: db_info.name_ident.database_name().to_string(),
tables: table_infos
Expand All @@ -2878,25 +2868,26 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
.collect(),
});
} else {
for (table_info, db_id) in table_infos.iter().take(take_num) {
drop_ids.push(DroppedId::Table(
for (table_info, db_id) in table_infos.iter().take(capacity) {
vacuum_ids.push(DroppedId::Table(
*db_id,
table_info.ident.table_id,
table_info.name.clone(),
));
}
}
drop_table_infos.extend(

vacuum_table_infos.extend(
table_infos
.iter()
.take(take_num)
.take(capacity)
.map(|(table_info, _)| table_info.clone()),
);
}

return Ok(ListDroppedTableResp {
drop_table_infos,
drop_ids,
drop_table_infos: vacuum_table_infos,
drop_ids: vacuum_ids,
});
}

Expand All @@ -2923,16 +2914,11 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
meta: db_meta,
});
let db_filter = (req.filter, db_info);
let table_infos = do_get_table_history(self, db_filter, req.limit).await?;
let table_infos = do_get_table_history(self, db_filter, the_limit).await?;
let mut drop_ids = vec![];
let mut drop_table_infos = vec![];
let num = if let Some(limit) = req.limit {
min(limit, table_infos.len())
} else {
table_infos.len()
};
for table_info in table_infos.iter().take(num) {
let (table_info, db_id) = table_info;

for (table_info, db_id) in table_infos.iter().take(the_limit) {
drop_ids.push(DroppedId::Table(
*db_id,
table_info.ident.table_id,
Expand Down Expand Up @@ -3773,21 +3759,13 @@ async fn batch_filter_table_info(
continue;
};

#[allow(clippy::collapsible_else_if)]
if let TableInfoFilter::Dropped(drop_on) = filter {
if let Some(drop_on) = drop_on {
if let Some(meta_drop_on) = &seq_meta.drop_on {
if meta_drop_on.timestamp_millis() >= drop_on.timestamp_millis() {
continue;
}
} else {
continue;
}
} else {
//
if seq_meta.drop_on.is_none() {
continue;
}
if let TableInfoFilter::DroppedTables(retention_boundary) = filter {
let Some(meta_drop_on) = seq_meta.drop_on else {
continue;
};

if meta_drop_on > retention_boundary.unwrap_or(DateTime::<Utc>::MAX_UTC) {
continue;
}
}

Expand Down Expand Up @@ -3815,7 +3793,7 @@ type TableFilterInfoList<'a> = Vec<(&'a TableInfoFilter, &'a Arc<DatabaseInfo>,
#[fastrace::trace]
async fn get_gc_table_info(
kv_api: &(impl kvapi::KVApi<Error = MetaError> + ?Sized),
limit: Option<usize>,
limit: usize,
table_id_list: &TableFilterInfoList<'_>,
) -> Result<Vec<(Arc<TableInfo>, u64)>, KVAppError> {
let mut filter_tb_infos = vec![];
Expand All @@ -3842,11 +3820,8 @@ async fn get_gc_table_info(

filter_db_info_with_table_name_list.clear();

// check if reach the limit
if let Some(limit) = limit {
if filter_tb_infos.len() >= limit {
return Ok(filter_tb_infos);
}
if filter_tb_infos.len() >= limit {
return Ok(filter_tb_infos);
}
}

Expand All @@ -3867,7 +3842,7 @@ async fn get_gc_table_info(
async fn do_get_table_history(
kv_api: &(impl kvapi::KVApi<Error = MetaError> + ?Sized),
db_filter: (TableInfoFilter, Arc<DatabaseInfo>),
limit: Option<usize>,
limit: usize,
) -> Result<Vec<(Arc<TableInfo>, u64)>, KVAppError> {
let mut filter_tb_infos = vec![];

Expand Down Expand Up @@ -3934,11 +3909,8 @@ async fn do_get_table_history(
filter_tb_infos.extend(ret);
filter_db_info_with_table_id_list.clear();

// check if reach the limit
if let Some(limit) = limit {
if filter_tb_infos.len() >= limit {
return Ok(filter_tb_infos);
}
if filter_tb_infos.len() >= limit {
return Ok(filter_tb_infos);
}
}

Expand All @@ -3947,11 +3919,8 @@ async fn do_get_table_history(
filter_tb_infos.extend(ret);
filter_db_info_with_table_id_list.clear();

// check if reach the limit
if let Some(limit) = limit {
if filter_tb_infos.len() >= limit {
return Ok(filter_tb_infos);
}
if filter_tb_infos.len() >= limit {
return Ok(filter_tb_infos);
}
}
}
Expand Down
14 changes: 7 additions & 7 deletions src/meta/api/src/schema_api_test_suite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3396,7 +3396,7 @@ impl SchemaApiTestSuite {
{
let req = ListDroppedTableReq {
inner: DatabaseNameIdent::new(&tenant, ""),
filter: TableInfoFilter::AllDroppedTables(None),
filter: TableInfoFilter::DroppedTableOrDroppedDatabase(None),
limit: None,
};
let resp = mt.get_drop_table_infos(req).await?;
Expand Down Expand Up @@ -3606,7 +3606,7 @@ impl SchemaApiTestSuite {
{
let req = ListDroppedTableReq {
inner: DatabaseNameIdent::new(&tenant, ""),
filter: TableInfoFilter::AllDroppedTables(None),
filter: TableInfoFilter::DroppedTableOrDroppedDatabase(None),
limit: None,
};
let resp = mt.get_drop_table_infos(req).await?;
Expand Down Expand Up @@ -3803,7 +3803,7 @@ impl SchemaApiTestSuite {
{
let req = ListDroppedTableReq {
inner: DatabaseNameIdent::new(&tenant, ""),
filter: TableInfoFilter::AllDroppedTables(None),
filter: TableInfoFilter::DroppedTableOrDroppedDatabase(None),
limit: None,
};
let resp = mt.get_drop_table_infos(req).await?;
Expand Down Expand Up @@ -4316,7 +4316,7 @@ impl SchemaApiTestSuite {
let now = Utc::now();
let req = ListDroppedTableReq {
inner: DatabaseNameIdent::new(&tenant, ""),
filter: TableInfoFilter::AllDroppedTables(Some(now)),
filter: TableInfoFilter::DroppedTableOrDroppedDatabase(Some(now)),
limit: None,
};
let resp = mt.get_drop_table_infos(req).await?;
Expand Down Expand Up @@ -4348,7 +4348,7 @@ impl SchemaApiTestSuite {
{
let req = ListDroppedTableReq {
inner: DatabaseNameIdent::new(&tenant, ""),
filter: TableInfoFilter::AllDroppedTables(None),
filter: TableInfoFilter::DroppedTableOrDroppedDatabase(None),
limit: None,
};
let resp = mt.get_drop_table_infos(req).await?;
Expand Down Expand Up @@ -4557,7 +4557,7 @@ impl SchemaApiTestSuite {
for (limit, number, drop_ids) in limit_and_drop_ids {
let req = ListDroppedTableReq {
inner: DatabaseNameIdent::new(&tenant, ""),
filter: TableInfoFilter::AllDroppedTables(None),
filter: TableInfoFilter::DroppedTableOrDroppedDatabase(None),
limit,
};
let resp = mt.get_drop_table_infos(req).await?;
Expand Down Expand Up @@ -5234,7 +5234,7 @@ impl SchemaApiTestSuite {
// vacuum drop table
let req = ListDroppedTableReq {
inner: DatabaseNameIdent::new(&tenant, ""),
filter: TableInfoFilter::AllDroppedTables(None),
filter: TableInfoFilter::DroppedTableOrDroppedDatabase(None),
limit: None,
};
let resp = mt.get_drop_table_infos(req).await?;
Expand Down
24 changes: 14 additions & 10 deletions src/meta/app/src/schema/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -897,16 +897,20 @@ impl ListTableReq {

#[derive(Clone, Debug, PartialEq, Eq)]
pub enum TableInfoFilter {
// if datatime is some, filter only dropped tables which drop time before that,
// else filter all dropped tables
Dropped(Option<DateTime<Utc>>),
// filter all dropped tables, including all tables in dropped database and dropped tables in exist dbs,
// in this case, `ListTableReq`.db_name will be ignored
// return Tables in two cases:
// 1) if database drop before date time, then all table in this db will be return;
// 2) else, return all the tables drop before data time.
AllDroppedTables(Option<DateTime<Utc>>),
// return all tables, ignore drop on time.
/// Choose only dropped tables.
///
/// If the arg `retention_boundary` time is Some, choose only tables dropped before this boundary time.
DroppedTables(Option<DateTime<Utc>>),
/// Choose dropped table or all table in dropped databases.
///
/// In this case, `ListTableReq`.db_name will be ignored.
///
/// If the `retention_boundary` time is Some,
/// choose the table dropped before this time
/// or choose the database before this time.
DroppedTableOrDroppedDatabase(Option<DateTime<Utc>>),

/// return all tables, ignore drop on time.
All,
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,9 @@ impl Interpreter for VacuumDropTablesInterpreter {
);
// if database if empty, vacuum all tables
let filter = if self.plan.database.is_empty() {
TableInfoFilter::AllDroppedTables(Some(retention_time))
TableInfoFilter::DroppedTableOrDroppedDatabase(Some(retention_time))
} else {
TableInfoFilter::Dropped(Some(retention_time))
TableInfoFilter::DroppedTables(Some(retention_time))
};

let tenant = self.ctx.get_tenant();
Expand Down

0 comments on commit ab5af93

Please sign in to comment.