Skip to content

Commit

Permalink
fix: separate sessions for temporary tables with memory engine (#16374)
Browse files Browse the repository at this point in the history
* feat: support vacuum temporary table

* fix slt

* add system.temporary_tables

* fix ut

* adjust log

* remove debug level log

* make lint

* fix: separate sessions for temporary tables with memory engine

* fix
  • Loading branch information
SkyFan2002 authored Sep 3, 2024
1 parent b5ce072 commit 42a5f8a
Show file tree
Hide file tree
Showing 11 changed files with 62 additions and 7 deletions.
14 changes: 14 additions & 0 deletions src/meta/api/src/schema_api_test_suite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1507,6 +1507,7 @@ impl SchemaApiTestSuite {
table_name: table_name.to_string(),
tb_id: table_id,
engine: "FUSE".to_string(),
session_id: "".to_string(),
})
.await?;

Expand Down Expand Up @@ -1838,6 +1839,7 @@ impl SchemaApiTestSuite {
table_name: tbl_name.to_string(),
tb_id,
engine: "FUSE".to_string(),
session_id: "".to_string(),
};
mt.drop_table_by_id(plan.clone()).await?;

Expand Down Expand Up @@ -1869,6 +1871,7 @@ impl SchemaApiTestSuite {
table_name: tbl_name.to_string(),
tb_id,
engine: "FUSE".to_string(),
session_id: "".to_string(),
};
let res = mt.drop_table_by_id(plan).await;
let err = res.unwrap_err();
Expand All @@ -1889,6 +1892,7 @@ impl SchemaApiTestSuite {
table_name: tbl_name.to_string(),
tb_id,
engine: "FUSE".to_string(),
session_id: "".to_string(),
};
mt.drop_table_by_id(plan.clone()).await?;
}
Expand Down Expand Up @@ -4154,6 +4158,7 @@ impl SchemaApiTestSuite {
table_name: req.name_ident.table_name.clone(),
tb_id: resp.table_id,
engine: "FUSE".to_string(),
session_id: "".to_string(),
})
.await?;
}
Expand All @@ -4180,6 +4185,7 @@ impl SchemaApiTestSuite {
table_name: req.name_ident.table_name.clone(),
tb_id: resp.table_id,
engine: "FUSE".to_string(),
session_id: "".to_string(),
})
.await?;
let table_id = resp.table_id;
Expand Down Expand Up @@ -4255,6 +4261,7 @@ impl SchemaApiTestSuite {
table_name: req.name_ident.table_name.clone(),
tb_id: resp.table_id,
engine: "FUSE".to_string(),
session_id: "".to_string(),
})
.await?;
}
Expand Down Expand Up @@ -4282,6 +4289,7 @@ impl SchemaApiTestSuite {
table_name: req.name_ident.table_name.clone(),
tb_id: resp.table_id,
engine: "FUSE".to_string(),
session_id: "".to_string(),
})
.await?;
let table_id = resp.table_id;
Expand Down Expand Up @@ -4457,6 +4465,7 @@ impl SchemaApiTestSuite {
table_name: req.name_ident.table_name.clone(),
tb_id: resp.table_id,
engine: "FUSE".to_string(),
session_id: "".to_string(),
})
.await?;
}
Expand Down Expand Up @@ -4698,6 +4707,7 @@ impl SchemaApiTestSuite {
table_name: tbl_name_ident.table_name.clone(),
tb_id,
engine: "FUSE".to_string(),
session_id: "".to_string(),
})
.await?;
let cur_db = mt.get_database(Self::req_get_db(&tenant, db_name)).await?;
Expand Down Expand Up @@ -4750,6 +4760,7 @@ impl SchemaApiTestSuite {
table_name: tbl_name.to_string(),
tb_id,
engine: "FUSE".to_string(),
session_id: "".to_string(),
})
.await?;
let cur_db = mt.get_database(Self::req_get_db(&tenant, db_name)).await?;
Expand Down Expand Up @@ -4808,6 +4819,7 @@ impl SchemaApiTestSuite {
table_name: tbl_name.to_string(),
tb_id: tb_info.ident.table_id,
engine: "FUSE".to_string(),
session_id: "".to_string(),
})
.await?;
let cur_db = mt.get_database(Self::req_get_db(&tenant, db_name)).await?;
Expand Down Expand Up @@ -4910,6 +4922,7 @@ impl SchemaApiTestSuite {
table_name: tbl_name.to_string(),
tb_id: new_tb_info.ident.table_id,
engine: "FUSE".to_string(),
session_id: "".to_string(),
};

let old_db = mt.get_database(Self::req_get_db(&tenant, db_name)).await?;
Expand Down Expand Up @@ -7677,6 +7690,7 @@ where MT: SchemaApi + kvapi::AsKVApi<Error = MetaError>
db_id: self.db_id,
tb_id: self.table_id,
engine: "FUSE".to_string(),
session_id: "".to_string(),
};
self.mt.drop_table_by_id(req.clone()).await?;

Expand Down
2 changes: 2 additions & 0 deletions src/meta/api/src/share_api_test_suite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,7 @@ impl ShareApiTestSuite {
tb_id: table_id,
db_id,
engine: "FUSE".to_string(),
session_id: "".to_string(),
};
let res = mt.drop_table_by_id(plan).await?;
let (share_db_id, share_specs) = res.spec_vec.unwrap();
Expand Down Expand Up @@ -2468,6 +2469,7 @@ impl ShareApiTestSuite {
tb_id: table_id,
db_id,
engine: "FUSE".to_string(),
session_id: "".to_string(),
};
let _res = mt.drop_table_by_id(plan).await;

Expand Down
2 changes: 2 additions & 0 deletions src/meta/app/src/schema/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,8 @@ pub struct DropTableByIdReq {
pub db_id: MetaId,

pub engine: String,

pub session_id: String,
}

impl DropTableByIdReq {
Expand Down
1 change: 1 addition & 0 deletions src/meta/binaries/metabench/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ async fn benchmark_table(client: &Arc<ClientHandle>, prefix: u64, client_num: u6
table_name: table_name(),
tb_id: t.ident.table_id,
engine: "FUSE".to_string(),
session_id: "".to_string(),
})
.await;

Expand Down
1 change: 1 addition & 0 deletions src/query/ee/src/stream/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ impl StreamHandler for RealStreamHandler {
tb_id: table.get_id(),
db_id: db.get_db_info().database_id.db_id,
engine: engine.to_string(),
session_id: "".to_string(),
})
.await
} else if plan.if_exists {
Expand Down
6 changes: 6 additions & 0 deletions src/query/service/src/interpreters/interpreter_table_drop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use databend_common_storages_stream::stream_table::STREAM_ENGINE;
use databend_common_storages_view::view_table::VIEW_ENGINE;
use databend_common_users::RoleCacheManager;
use databend_common_users::UserApiProvider;
use databend_storages_common_table_meta::table::OPT_KEY_TEMP_PREFIX;

use crate::interpreters::Interpreter;
use crate::pipelines::PipelineBuildResult;
Expand Down Expand Up @@ -126,6 +127,11 @@ impl Interpreter for DropTableInterpreter {
tb_id: tbl.get_table_info().ident.table_id,
db_id: db.get_db_info().database_id.db_id,
engine: tbl.engine().to_string(),
session_id: tbl
.options()
.get(OPT_KEY_TEMP_PREFIX)
.cloned()
.unwrap_or_default(),
})
.await?;

Expand Down
6 changes: 6 additions & 0 deletions src/query/service/src/interpreters/interpreter_view_drop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use databend_common_meta_app::schema::DropTableByIdReq;
use databend_common_sql::plans::DropViewPlan;
use databend_common_storages_stream::stream_table::STREAM_ENGINE;
use databend_common_storages_view::view_table::VIEW_ENGINE;
use databend_storages_common_table_meta::table::OPT_KEY_TEMP_PREFIX;

use crate::interpreters::Interpreter;
use crate::pipelines::PipelineBuildResult;
Expand Down Expand Up @@ -94,6 +95,11 @@ impl Interpreter for DropViewInterpreter {
tb_id: table.get_id(),
db_id: db.get_db_info().database_id.db_id,
engine: table.engine().to_string(),
session_id: table
.options()
.get(OPT_KEY_TEMP_PREFIX)
.cloned()
.unwrap_or_default(),
})
.await?;
};
Expand Down
1 change: 1 addition & 0 deletions src/query/service/tests/it/catalogs/database_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ async fn test_catalogs_table() -> Result<()> {
tb_id: tbl.get_table_info().ident.table_id,
db_id: db.get_db_info().database_id.db_id,
engine: tbl.engine().to_string(),
session_id: "".to_string(),
})
.await;
assert!(res.is_ok());
Expand Down
8 changes: 7 additions & 1 deletion src/query/storages/common/blocks/src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,11 @@ use parking_lot::RwLock;
/// Indexed by table id etc.
pub type InMemoryData<K> = HashMap<K, Arc<RwLock<Vec<DataBlock>>>>;

pub static IN_MEMORY_DATA: LazyLock<Arc<RwLock<InMemoryData<u64>>>> =
pub static IN_MEMORY_DATA: LazyLock<Arc<RwLock<InMemoryData<InMemoryDataKey>>>> =
LazyLock::new(|| Arc::new(Default::default()));

#[derive(Debug, Clone, Hash, Eq, PartialEq)]
pub struct InMemoryDataKey {
pub temp_prefix: Option<String>,
pub table_id: u64,
}
15 changes: 12 additions & 3 deletions src/query/storages/common/session/src/temp_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use databend_common_meta_app::schema::UpsertTableOptionReply;
use databend_common_meta_app::schema::UpsertTableOptionReq;
use databend_common_meta_types::SeqV;
use databend_common_storage::DataOperator;
use databend_storages_common_blocks::memory::InMemoryDataKey;
use databend_storages_common_blocks::memory::IN_MEMORY_DATA;
use databend_storages_common_table_meta::meta::parse_storage_prefix;
use databend_storages_common_table_meta::table::OPT_KEY_DATABASE_ID;
Expand Down Expand Up @@ -349,8 +350,12 @@ pub async fn drop_table_by_id(
return Ok(None);
}
}
let key = InMemoryDataKey {
temp_prefix: Some(req.session_id.clone()),
table_id: *tb_id,
};
let mut in_mem_data = IN_MEMORY_DATA.write();
in_mem_data.remove(tb_id).ok_or_else(|| {
in_mem_data.remove(&key).ok_or_else(|| {
ErrorCode::Internal(format!(
"Table not found in memory data {:?}, drop table request: {:?}",
in_mem_data, req
Expand All @@ -362,7 +367,7 @@ pub async fn drop_table_by_id(
Ok(Some(DropTableReply { spec_vec: None }))
}

pub async fn drop_all_temp_tables(mgr: TempTblMgrRef) -> Result<()> {
pub async fn drop_all_temp_tables(session_id: &str, mgr: TempTblMgrRef) -> Result<()> {
let (fuse_dirs, mem_tbl_ids) = {
let mut guard = mgr.lock();
let mut fuse_dirs = Vec::new();
Expand All @@ -389,7 +394,11 @@ pub async fn drop_all_temp_tables(mgr: TempTblMgrRef) -> Result<()> {
if !mem_tbl_ids.is_empty() {
let mut in_mem_data = IN_MEMORY_DATA.write();
for id in mem_tbl_ids {
in_mem_data.remove(&id);
let key = InMemoryDataKey {
temp_prefix: Some(session_id.to_string()),
table_id: id,
};
in_mem_data.remove(&key);
}
}
Ok(())
Expand Down
13 changes: 10 additions & 3 deletions src/query/storages/memory/src/memory_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,10 @@ use databend_common_pipeline_sinks::Sinker;
use databend_common_pipeline_sources::SyncSource;
use databend_common_pipeline_sources::SyncSourcer;
use databend_common_storage::StorageMetrics;
use databend_storages_common_blocks::memory::InMemoryDataKey;
use databend_storages_common_blocks::memory::IN_MEMORY_DATA;
use databend_storages_common_table_meta::meta::SnapshotId;
use databend_storages_common_table_meta::table::OPT_KEY_TEMP_PREFIX;
use parking_lot::Mutex;
use parking_lot::RwLock;

Expand All @@ -65,13 +67,18 @@ pub struct MemoryTable {

impl MemoryTable {
pub fn try_create(table_info: TableInfo) -> Result<Box<dyn Table>> {
let table_id = &table_info.ident.table_id;
let table_id = table_info.ident.table_id;
let temp_prefix = table_info.options().get(OPT_KEY_TEMP_PREFIX).cloned();
let blocks = {
let mut in_mem_data = IN_MEMORY_DATA.write();
let x = in_mem_data.get(table_id);
let key = InMemoryDataKey {
temp_prefix,
table_id,
};
let x = in_mem_data.get(&key);
x.cloned().unwrap_or_else(|| {
let blocks = Arc::new(RwLock::new(vec![]));
in_mem_data.insert(*table_id, blocks.clone());
in_mem_data.insert(key, blocks.clone());
blocks
})
};
Expand Down

0 comments on commit 42a5f8a

Please sign in to comment.