From dcf1eab5dd51346f9a9aad1273f6ed539ec7ebb8 Mon Sep 17 00:00:00 2001 From: Sky Fan <3374614481@qq.com> Date: Wed, 11 Sep 2024 18:37:58 +0800 Subject: [PATCH] feat: continue vacuum drop table on per-table cleanup failures (#16424) * feat: continue vacuum drop table on per-table cleanup failures * modify ut * fix ut * fix ut * fix db name, gc_drop_tables() * fix ut * add test * fix ut * fix test name * fix ut * refactor DroppedId * unify code * make lint * fix * add comment --- src/meta/api/src/schema_api_impl.rs | 71 +++---- src/meta/api/src/schema_api_test_suite.rs | 84 +++++++-- src/meta/app/src/schema/table.rs | 24 ++- .../src/storages/fuse/operations/handler.rs | 5 +- .../fuse/operations/vacuum_drop_tables.rs | 174 ++++++++++-------- .../it/storages/fuse/operations/vacuum.rs | 25 ++- .../vacuum_handler/src/vacuum_handler.rs | 13 +- .../interpreter_vacuum_drop_tables.rs | 47 ++++- .../01_003_vacuum_drop_table_continue.result | 18 ++ .../01_003_vacuum_drop_table_continue.sh | 42 +++++ 10 files changed, 337 insertions(+), 166 deletions(-) create mode 100644 tests/suites/5_ee/01_vacuum/01_003_vacuum_drop_table_continue.result create mode 100755 tests/suites/5_ee/01_vacuum/01_003_vacuum_drop_table_continue.sh diff --git a/src/meta/api/src/schema_api_impl.rs b/src/meta/api/src/schema_api_impl.rs index 24f7139f1fd2..43d48471bc7a 100644 --- a/src/meta/api/src/schema_api_impl.rs +++ b/src/meta/api/src/schema_api_impl.rs @@ -2865,56 +2865,35 @@ impl + ?Sized> SchemaApi for KV { }; let table_infos = do_get_table_history(self, db_filter, left_num).await?; + let take_num = left_num.unwrap_or(usize::MAX); - // check if reach the limit - if let Some(left_num) = left_num { - let num = min(left_num, table_infos.len()); - for table_info in table_infos.iter().take(num) { - let (table_info, db_id) = table_info; + // A DB can be removed only when all its tables are removed. + if drop_db && take_num > table_infos.len() { + drop_ids.push(DroppedId::Db { + db_id: db_info.database_id.db_id, + db_name: db_info.name_ident.database_name().to_string(), + tables: table_infos + .iter() + .map(|(table_info, _)| { + (table_info.ident.table_id, table_info.name.clone()) + }) + .collect(), + }); + } else { + for (table_info, db_id) in table_infos.iter().take(take_num) { drop_ids.push(DroppedId::Table( *db_id, table_info.ident.table_id, table_info.name.clone(), )); - drop_table_infos.push(table_info.clone()); - } - - // if limit is Some, append DroppedId::Db only when table_infos is empty - if drop_db && table_infos.is_empty() { - drop_ids.push(DroppedId::Db( - db_info.database_id.db_id, - db_info.name_ident.database_name().to_string(), - )); - } - if num == left_num { - return Ok(ListDroppedTableResp { - drop_table_infos, - drop_ids, - }); - } - } else { - table_infos.iter().for_each(|(table_info, db_id)| { - if !drop_db { - drop_ids.push(DroppedId::Table( - *db_id, - table_info.ident.table_id, - table_info.name.clone(), - )) - } - }); - drop_table_infos.extend( - table_infos - .into_iter() - .map(|(table_info, _)| table_info) - .collect::>(), - ); - if drop_db { - drop_ids.push(DroppedId::Db( - db_info.database_id.db_id, - db_info.name_ident.database_name().to_string(), - )); } } + drop_table_infos.extend( + table_infos + .iter() + .take(take_num) + .map(|(table_info, _)| table_info.clone()), + ); } return Ok(ListDroppedTableResp { @@ -2974,9 +2953,11 @@ impl + ?Sized> SchemaApi for KV { async fn gc_drop_tables(&self, req: GcDroppedTableReq) -> Result<(), KVAppError> { for drop_id in req.drop_ids { match drop_id { - DroppedId::Db(db_id, db_name) => { - gc_dropped_db_by_id(self, db_id, &req.tenant, db_name).await? - } + DroppedId::Db { + db_id, + db_name, + tables: _, + } => gc_dropped_db_by_id(self, db_id, &req.tenant, db_name).await?, DroppedId::Table(db_id, table_id, table_name) => { gc_dropped_table_by_id(self, &req.tenant, db_id, table_id, table_name).await? } diff --git a/src/meta/api/src/schema_api_test_suite.rs b/src/meta/api/src/schema_api_test_suite.rs index 9d7b6d9a8d34..4f3e2483c8f3 100644 --- a/src/meta/api/src/schema_api_test_suite.rs +++ b/src/meta/api/src/schema_api_test_suite.rs @@ -4020,14 +4020,16 @@ impl SchemaApiTestSuite { }; let res = mt.create_database(req).await?; - drop_ids_1.push(DroppedId::Db( - *res.db_id, - db_name.database_name().to_string(), - )); - drop_ids_2.push(DroppedId::Db( - *res.db_id, - db_name.database_name().to_string(), - )); + drop_ids_1.push(DroppedId::Db { + db_id: *res.db_id, + db_name: db_name.database_name().to_string(), + tables: vec![], + }); + drop_ids_2.push(DroppedId::Db { + db_id: *res.db_id, + db_name: db_name.database_name().to_string(), + tables: vec![], + }); let req = CreateTableReq { create_option: CreateOption::Create, @@ -4063,7 +4065,11 @@ impl SchemaApiTestSuite { let res = mt.create_database(create_db_req.clone()).await?; let db_id = res.db_id; - drop_ids_2.push(DroppedId::Db(*db_id, "db2".to_string())); + drop_ids_2.push(DroppedId::Db { + db_id: *db_id, + db_name: "db2".to_string(), + tables: vec![], + }); info!("--- create and drop db2.tb1"); { @@ -4262,15 +4268,47 @@ impl SchemaApiTestSuite { left_table_id.cmp(right_table_id) } } - (DroppedId::Db(left_db_id, _), DroppedId::Db(right_db_id, _)) => { - left_db_id.cmp(right_db_id) - } - (DroppedId::Db(left_db_id, _), DroppedId::Table(right_db_id, _, _)) => { - left_db_id.cmp(right_db_id) - } - (DroppedId::Table(left_db_id, _, _), DroppedId::Db(right_db_id, _)) => { - left_db_id.cmp(right_db_id) - } + ( + DroppedId::Db { + db_id: left_db_id, .. + }, + DroppedId::Db { + db_id: right_db_id, .. + }, + ) => left_db_id.cmp(right_db_id), + ( + DroppedId::Db { + db_id: left_db_id, + db_name: _, + tables: _, + }, + DroppedId::Table(right_db_id, _, _), + ) => left_db_id.cmp(right_db_id), + ( + DroppedId::Table(left_db_id, _, _), + DroppedId::Db { + db_id: right_db_id, + db_name: _, + tables: _, + }, + ) => left_db_id.cmp(right_db_id), + } + } + fn is_dropped_id_eq(l: &DroppedId, r: &DroppedId) -> bool { + match (l, r) { + ( + DroppedId::Db { + db_id: left_db_id, + db_name: left_db_name, + tables: _, + }, + DroppedId::Db { + db_id: right_db_id, + db_name: right_db_name, + tables: _, + }, + ) => left_db_id == right_db_id && left_db_name == right_db_name, + _ => l == r, } } // case 1: test AllDroppedTables with filter time @@ -4285,7 +4323,10 @@ impl SchemaApiTestSuite { // sort drop id by table id let mut sort_drop_ids = resp.drop_ids; sort_drop_ids.sort_by(cmp_dropped_id); - assert_eq!(sort_drop_ids, drop_ids_1); + assert_eq!(sort_drop_ids.len(), drop_ids_1.len()); + for (id1, id2) in sort_drop_ids.iter().zip(drop_ids_1.iter()) { + assert!(is_dropped_id_eq(id1, id2)); + } let expected: BTreeSet = [ "'db1'.'tb1'".to_string(), @@ -4314,7 +4355,10 @@ impl SchemaApiTestSuite { // sort drop id by table id let mut sort_drop_ids = resp.drop_ids; sort_drop_ids.sort_by(cmp_dropped_id); - assert_eq!(sort_drop_ids, drop_ids_2); + assert_eq!(sort_drop_ids.len(), drop_ids_2.len()); + for (id1, id2) in sort_drop_ids.iter().zip(drop_ids_2.iter()) { + assert!(is_dropped_id_eq(id1, id2)); + } let expected: BTreeSet = [ "'db1'.'tb1'".to_string(), diff --git a/src/meta/app/src/schema/table.rs b/src/meta/app/src/schema/table.rs index e1de3eacae85..ab1fe33dbd79 100644 --- a/src/meta/app/src/schema/table.rs +++ b/src/meta/app/src/schema/table.rs @@ -25,6 +25,7 @@ use std::time::Duration; use anyerror::func_name; use chrono::DateTime; use chrono::Utc; +use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::FieldIndex; use databend_common_expression::TableField; @@ -201,6 +202,20 @@ pub struct TableInfo { pub db_type: DatabaseType, } +impl TableInfo { + pub fn database_name(&self) -> Result<&str> { + if self.engine() != "FUSE" { + return Err(ErrorCode::Internal(format!( + "Invalid engine: {}", + self.engine() + ))); + } + let database_name = self.desc.split('.').next().unwrap(); + let database_name = &database_name[1..database_name.len() - 1]; + Ok(database_name) + } +} + #[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Eq, PartialEq, Default)] pub struct TableStatistics { /// Number of rows @@ -360,7 +375,7 @@ impl Default for TableMeta { fn default() -> Self { TableMeta { schema: Arc::new(TableSchema::empty()), - engine: "".to_string(), + engine: "FUSE".to_string(), engine_options: BTreeMap::new(), storage_params: None, part_prefix: "".to_string(), @@ -907,8 +922,11 @@ pub struct ListDroppedTableReq { #[derive(Clone, Debug, PartialEq, Eq)] pub enum DroppedId { - // db id, db name - Db(u64, String), + Db { + db_id: u64, + db_name: String, + tables: Vec<(u64, String)>, + }, // db id, table id, table name Table(u64, u64, String), } diff --git a/src/query/ee/src/storages/fuse/operations/handler.rs b/src/query/ee/src/storages/fuse/operations/handler.rs index 7544ad528b3e..b4e19a297724 100644 --- a/src/query/ee/src/storages/fuse/operations/handler.rs +++ b/src/query/ee/src/storages/fuse/operations/handler.rs @@ -22,14 +22,13 @@ use databend_common_catalog::table::Table; use databend_common_catalog::table_context::TableContext; use databend_common_exception::Result; use databend_common_storages_fuse::FuseTable; -use databend_enterprise_vacuum_handler::vacuum_handler::VacuumDropFileInfo; +use databend_enterprise_vacuum_handler::vacuum_handler::VacuumDropTablesResult; use databend_enterprise_vacuum_handler::VacuumHandler; use databend_enterprise_vacuum_handler::VacuumHandlerWrapper; use crate::storages::fuse::do_vacuum; use crate::storages::fuse::operations::vacuum_temporary_files::do_vacuum_temporary_files; use crate::storages::fuse::vacuum_drop_tables; - pub struct RealVacuumHandler {} #[async_trait::async_trait] @@ -49,7 +48,7 @@ impl VacuumHandler for RealVacuumHandler { threads_nums: usize, tables: Vec>, dry_run_limit: Option, - ) -> Result>> { + ) -> VacuumDropTablesResult { vacuum_drop_tables(threads_nums, tables, dry_run_limit).await } diff --git a/src/query/ee/src/storages/fuse/operations/vacuum_drop_tables.rs b/src/query/ee/src/storages/fuse/operations/vacuum_drop_tables.rs index aa62b1c3371f..1167a330c0c7 100644 --- a/src/query/ee/src/storages/fuse/operations/vacuum_drop_tables.rs +++ b/src/query/ee/src/storages/fuse/operations/vacuum_drop_tables.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashSet; use std::sync::Arc; use std::time::Instant; @@ -21,92 +22,111 @@ use databend_common_exception::Result; use databend_common_meta_app::schema::TableInfo; use databend_common_storages_fuse::FuseTable; use databend_enterprise_vacuum_handler::vacuum_handler::VacuumDropFileInfo; +use databend_enterprise_vacuum_handler::vacuum_handler::VacuumDropTablesResult; use futures_util::TryStreamExt; use log::error; use log::info; use opendal::EntryMode; use opendal::Metakey; use opendal::Operator; - #[async_backtrace::framed] pub async fn do_vacuum_drop_table( tables: Vec<(TableInfo, Operator)>, dry_run_limit: Option, -) -> Result>> { +) -> VacuumDropTablesResult { let mut list_files = vec![]; + let mut failed_dbs = HashSet::new(); + let mut failed_tables = HashSet::new(); for (table_info, operator) in tables { - let dir = format!( - "{}/", - FuseTable::parse_storage_prefix_from_table_info(&table_info)? - ); - - info!( - "vacuum drop table {:?} dir {:?}, is_external_table:{:?}", - table_info.name, - dir, - table_info.meta.storage_params.is_some() - ); - - let start = Instant::now(); - - match dry_run_limit { - None => { - let result = operator.remove_all(&dir).await; - if let Err(ref err) = result { - error!("failed to remove all in directory {}: {}", dir, err); - } - result?; + let result = + vacuum_drop_single_table(&table_info, operator, dry_run_limit, &mut list_files).await; + if result.is_err() { + let db_name = table_info.database_name()?; + let table_id = table_info.ident.table_id; + failed_dbs.insert(db_name.to_string()); + failed_tables.insert(table_id); + } + } + Ok(if dry_run_limit.is_some() { + (Some(list_files), failed_dbs, failed_tables) + } else { + (None, failed_dbs, failed_tables) + }) +} + +async fn vacuum_drop_single_table( + table_info: &TableInfo, + operator: Operator, + dry_run_limit: Option, + list_files: &mut Vec, +) -> Result<()> { + let dir = format!( + "{}/", + FuseTable::parse_storage_prefix_from_table_info(table_info)? + ); + + info!( + "vacuum drop table {:?} dir {:?}, is_external_table:{:?}", + table_info.name, + dir, + table_info.meta.storage_params.is_some() + ); + + let start = Instant::now(); + + match dry_run_limit { + None => { + let result = operator.remove_all(&dir).await; + if let Err(ref err) = result { + error!("failed to remove all in directory {}: {}", dir, err); } - Some(dry_run_limit) => { - let mut ds = operator - .lister_with(&dir) - .recursive(true) - .metakey(Metakey::Mode) - .metakey(Metakey::ContentLength) - .await?; - - loop { - let entry = ds.try_next().await; - match entry { - Ok(Some(de)) => { - let meta = de.metadata(); - if EntryMode::FILE == meta.mode() { - list_files.push(( - table_info.name.clone(), - de.name().to_string(), - meta.content_length(), - )); - if list_files.len() >= dry_run_limit { - break; - } + result?; + } + Some(dry_run_limit) => { + let mut ds = operator + .lister_with(&dir) + .recursive(true) + .metakey(Metakey::Mode) + .metakey(Metakey::ContentLength) + .await?; + + loop { + let entry = ds.try_next().await; + match entry { + Ok(Some(de)) => { + let meta = de.metadata(); + if EntryMode::FILE == meta.mode() { + list_files.push(( + table_info.name.clone(), + de.name().to_string(), + meta.content_length(), + )); + if list_files.len() >= dry_run_limit { + break; } } - Ok(None) => break, - Err(e) => { - if e.kind() == opendal::ErrorKind::NotFound { - info!("target not found, ignored. {}", e); - continue; - } else { - return Err(e.into()); - } + } + Ok(None) => break, + Err(e) => { + if e.kind() == opendal::ErrorKind::NotFound { + info!("target not found, ignored. {}", e); + continue; + } else { + return Err(e.into()); } } } } - }; - - info!( - "vacuum drop table {:?} dir {:?}, cost:{:?}", - table_info.name, - dir, - start.elapsed() - ); - } - Ok(if dry_run_limit.is_some() { - Some(list_files) - } else { - None - }) + } + }; + + info!( + "vacuum drop table {:?} dir {:?}, cost:{:?}", + table_info.name, + dir, + start.elapsed() + ); + Ok(()) } #[async_backtrace::framed] @@ -114,7 +134,7 @@ pub async fn vacuum_drop_tables_by_table_info( num_threads: usize, table_infos: Vec<(TableInfo, Operator)>, dry_run_limit: Option, -) -> Result>> { +) -> VacuumDropTablesResult { let start = Instant::now(); let num_tables = table_infos.len(); @@ -157,17 +177,21 @@ pub async fn vacuum_drop_tables_by_table_info( // longer be roll-forward. if dry_run_limit.is_some() { let mut ret_files = vec![]; - for file in result { - if let Some(files) = file? { + for res in result { + if let Some(files) = res?.0 { ret_files.extend(files); } } - Some(ret_files) + (Some(ret_files), HashSet::new(), HashSet::new()) } else { - for file in result { - let _ = file?; + let mut failed_dbs = HashSet::new(); + let mut failed_tables = HashSet::new(); + for res in result { + let (_, db, tbl) = res?; + failed_dbs.extend(db); + failed_tables.extend(tbl); } - None + (None, failed_dbs, failed_tables) } }; @@ -185,7 +209,7 @@ pub async fn vacuum_drop_tables( threads_nums: usize, tables: Vec>, dry_run_limit: Option, -) -> Result>> { +) -> VacuumDropTablesResult { let num_tables = tables.len(); info!("vacuum_drop_tables {} tables", num_tables); diff --git a/src/query/ee/tests/it/storages/fuse/operations/vacuum.rs b/src/query/ee/tests/it/storages/fuse/operations/vacuum.rs index 333c370f431b..f0d01c1aa69b 100644 --- a/src/query/ee/tests/it/storages/fuse/operations/vacuum.rs +++ b/src/query/ee/tests/it/storages/fuse/operations/vacuum.rs @@ -294,6 +294,7 @@ async fn test_fuse_do_vacuum_drop_table_deletion_error() -> Result<()> { .meta .options .insert(OPT_KEY_DATABASE_ID.to_owned(), "1".to_owned()); + table_info.desc = "`default`.`t`".to_string(); use test_accessor::AccessorFaultyDeletion; // Operator with mocked accessor that will fail on `remove_all` @@ -305,9 +306,9 @@ async fn test_fuse_do_vacuum_drop_table_deletion_error() -> Result<()> { let operator = OperatorBuilder::new(faulty_accessor.clone()).finish(); let tables = vec![(table_info, operator)]; - let result = do_vacuum_drop_table(tables, None).await; - assert!(result.is_err()); - + let result = do_vacuum_drop_table(tables, None).await?; + assert!(!result.1.is_empty()); + assert!(!result.2.is_empty()); // verify that accessor.delete() was called assert!(faulty_accessor.hit_delete_operation()); @@ -321,7 +322,7 @@ async fn test_fuse_vacuum_drop_tables_in_parallel_with_deletion_error() -> Resul .meta .options .insert(OPT_KEY_DATABASE_ID.to_owned(), "1".to_owned()); - + table_info.desc = "`default`.`t`".to_string(); use test_accessor::AccessorFaultyDeletion; // Case 1: non-parallel vacuum dropped tables @@ -334,12 +335,13 @@ async fn test_fuse_vacuum_drop_tables_in_parallel_with_deletion_error() -> Resul // with one table and one thread, `vacuum_drop_tables_by_table_info` will NOT run in parallel let tables = vec![table]; let num_threads = 1; - let result = vacuum_drop_tables_by_table_info(num_threads, tables, None).await; + let result = vacuum_drop_tables_by_table_info(num_threads, tables, None).await?; // verify that accessor.delete() was called assert!(faulty_accessor.hit_delete_operation()); // verify that errors of deletions are not swallowed - assert!(result.is_err()); + assert!(!result.1.is_empty()); + assert!(!result.2.is_empty()); } // Case 2: parallel vacuum dropped tables @@ -351,11 +353,12 @@ async fn test_fuse_vacuum_drop_tables_in_parallel_with_deletion_error() -> Resul // with 2 tables and 2 threads, `vacuum_drop_tables_by_table_info` will run in parallel (one table per thread) let tables = vec![table.clone(), table]; let num_threads = 2; - let result = vacuum_drop_tables_by_table_info(num_threads, tables, None).await; + let result = vacuum_drop_tables_by_table_info(num_threads, tables, None).await?; // verify that accessor.delete() was called assert!(faulty_accessor.hit_delete_operation()); // verify that errors of deletions are not swallowed - assert!(result.is_err()); + assert!(!result.1.is_empty()); + assert!(!result.2.is_empty()); } Ok(()) @@ -416,6 +419,7 @@ async fn test_fuse_do_vacuum_drop_table_external_storage() -> Result<()> { }; let table_info = TableInfo { + desc: "`default`.`t`".to_string(), meta, ..Default::default() }; @@ -427,8 +431,9 @@ async fn test_fuse_do_vacuum_drop_table_external_storage() -> Result<()> { let operator = OperatorBuilder::new(accessor.clone()).finish(); let tables = vec![(table_info, operator)]; - let result = do_vacuum_drop_table(tables, None).await; - assert!(result.is_err()); + let result = do_vacuum_drop_table(tables, None).await?; + assert!(!result.1.is_empty()); + assert!(!result.2.is_empty()); // verify that accessor.delete() was called assert!(!accessor.hit_delete_operation()); diff --git a/src/query/ee_features/vacuum_handler/src/vacuum_handler.rs b/src/query/ee_features/vacuum_handler/src/vacuum_handler.rs index 0971b32a44a6..b8437918c201 100644 --- a/src/query/ee_features/vacuum_handler/src/vacuum_handler.rs +++ b/src/query/ee_features/vacuum_handler/src/vacuum_handler.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashSet; use std::sync::Arc; use std::time::Duration; @@ -22,10 +23,16 @@ use databend_common_catalog::table::Table; use databend_common_catalog::table_context::TableContext; use databend_common_exception::Result; use databend_common_storages_fuse::FuseTable; - // (TableName, file, file size) pub type VacuumDropFileInfo = (String, String, u64); +// (drop_files, failed_dbs, failed_tables) +pub type VacuumDropTablesResult = Result<( + Option>, + HashSet, + HashSet, +)>; + #[async_trait::async_trait] pub trait VacuumHandler: Sync + Send { async fn do_vacuum( @@ -41,7 +48,7 @@ pub trait VacuumHandler: Sync + Send { threads_nums: usize, tables: Vec>, dry_run_limit: Option, - ) -> Result>>; + ) -> VacuumDropTablesResult; async fn do_vacuum_temporary_files( &self, @@ -79,7 +86,7 @@ impl VacuumHandlerWrapper { threads_nums: usize, tables: Vec>, dry_run_limit: Option, - ) -> Result>> { + ) -> VacuumDropTablesResult { self.handler .do_vacuum_drop_tables(threads_nums, tables, dry_run_limit) .await diff --git a/src/query/service/src/interpreters/interpreter_vacuum_drop_tables.rs b/src/query/service/src/interpreters/interpreter_vacuum_drop_tables.rs index 1c6ced64c453..a7f1467a9e86 100644 --- a/src/query/service/src/interpreters/interpreter_vacuum_drop_tables.rs +++ b/src/query/service/src/interpreters/interpreter_vacuum_drop_tables.rs @@ -65,11 +65,11 @@ impl VacuumDropTablesInterpreter { let mut drop_db_table_ids = vec![]; for drop_id in drop_ids { match drop_id { - DroppedId::Db(db_id, db_name) => { - drop_db_ids.push(DroppedId::Db(db_id, db_name)); + DroppedId::Db { .. } => { + drop_db_ids.push(drop_id); } - DroppedId::Table(db_id, table_id, table_name) => { - drop_db_table_ids.push(DroppedId::Table(db_id, table_id, table_name)); + DroppedId::Table(_, _, _) => { + drop_db_table_ids.push(drop_id); } } } @@ -146,7 +146,7 @@ impl Interpreter for VacuumDropTablesInterpreter { "vacuum drop table from db {:?}, get_drop_table_infos return tables: {:?}, drop_ids: {:?}", self.plan.database, tables.len(), - drop_ids.len() + drop_ids ); // TODO buggy, table as catalog obj should be allowed to drop @@ -159,7 +159,7 @@ impl Interpreter for VacuumDropTablesInterpreter { let handler = get_vacuum_handler(); let threads_nums = self.ctx.get_settings().get_max_threads()? as usize; - let files_opt = handler + let (files_opt, failed_dbs, failed_tables) = handler .do_vacuum_drop_tables( threads_nums, tables, @@ -172,7 +172,40 @@ impl Interpreter for VacuumDropTablesInterpreter { .await?; // gc meta data only when not dry run if self.plan.option.dry_run.is_none() { - self.gc_drop_tables(catalog, drop_ids).await?; + let mut success_dropped_ids = vec![]; + for drop_id in drop_ids { + match &drop_id { + DroppedId::Db { + db_id, + db_name, + tables, + } => { + if !failed_dbs.contains(db_name) { + success_dropped_ids.push(drop_id); + } else { + for (table_id, table_name) in tables.iter() { + if !failed_tables.contains(table_id) { + success_dropped_ids.push(DroppedId::Table( + *db_id, + *table_id, + table_name.clone(), + )); + } + } + } + } + DroppedId::Table(_, table_id, _) => { + if !failed_tables.contains(table_id) { + success_dropped_ids.push(drop_id); + } + } + } + } + info!( + "failed dbs:{:?}, failed_tables:{:?}, success_drop_ids:{:?}", + failed_dbs, failed_tables, success_dropped_ids + ); + self.gc_drop_tables(catalog, success_dropped_ids).await?; } match files_opt { diff --git a/tests/suites/5_ee/01_vacuum/01_003_vacuum_drop_table_continue.result b/tests/suites/5_ee/01_vacuum/01_003_vacuum_drop_table_continue.result new file mode 100644 index 000000000000..ab6c604174f9 --- /dev/null +++ b/tests/suites/5_ee/01_vacuum/01_003_vacuum_drop_table_continue.result @@ -0,0 +1,18 @@ +>>>> create or replace database test_vacuum_drop_table_continue +>>>> create table test_vacuum_drop_table_continue.a(c int) 'fs:///tmp/test_vacuum_drop_table_continue/' +>>>> create table test_vacuum_drop_table_continue.b(c int) +>>>> create table test_vacuum_drop_table_continue.c(c int) +>>>> create table test_vacuum_drop_table_continue.d(c int) +>>>> insert into test_vacuum_drop_table_continue.a values (1) +>>>> insert into test_vacuum_drop_table_continue.b values (1) +>>>> insert into test_vacuum_drop_table_continue.c values (1) +>>>> insert into test_vacuum_drop_table_continue.d values (1) +>>>> drop database test_vacuum_drop_table_continue +>>>> set data_retention_time_in_days=0; vacuum drop table +>>>> undrop database test_vacuum_drop_table_continue +>>>> use test_vacuum_drop_table_continue;show tables +a +<<<< +>>>> select * from test_vacuum_drop_table_continue.a +1 +<<<< diff --git a/tests/suites/5_ee/01_vacuum/01_003_vacuum_drop_table_continue.sh b/tests/suites/5_ee/01_vacuum/01_003_vacuum_drop_table_continue.sh new file mode 100755 index 000000000000..cd2b715d9de3 --- /dev/null +++ b/tests/suites/5_ee/01_vacuum/01_003_vacuum_drop_table_continue.sh @@ -0,0 +1,42 @@ +#!/usr/bin/env bash + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +. "$CURDIR"/../../../shell_env.sh + +stmt "create or replace database test_vacuum_drop_table_continue" + +mkdir -p /tmp/test_vacuum_drop_table_continue/ + +stmt "create table test_vacuum_drop_table_continue.a(c int) 'fs:///tmp/test_vacuum_drop_table_continue/'" + +stmt "create table test_vacuum_drop_table_continue.b(c int)" + +stmt "create table test_vacuum_drop_table_continue.c(c int)" + +stmt "create table test_vacuum_drop_table_continue.d(c int)" + + +stmt "insert into test_vacuum_drop_table_continue.a values (1)" + +stmt "insert into test_vacuum_drop_table_continue.b values (1)" + +stmt "insert into test_vacuum_drop_table_continue.c values (1)" + +stmt "insert into test_vacuum_drop_table_continue.d values (1)" + +chmod 444 /tmp/test_vacuum_drop_table_continue/ + +stmt "drop database test_vacuum_drop_table_continue" + +# can't vacuum files of table a, but can go on vacuum other tables +stmt "set data_retention_time_in_days=0; vacuum drop table" + +chmod 755 /tmp/test_vacuum_drop_table_continue/ +find /tmp/test_vacuum_drop_table_continue/ -type d -exec chmod 755 {} + +find /tmp/test_vacuum_drop_table_continue/ -type f -exec chmod 644 {} + + +stmt "undrop database test_vacuum_drop_table_continue" + +query "use test_vacuum_drop_table_continue;show tables" + +query "select * from test_vacuum_drop_table_continue.a"