Skip to content

Commit

Permalink
chore(storage): refactor compact source (#16527)
Browse files Browse the repository at this point in the history
refactor compact source
  • Loading branch information
zhyass authored Sep 30, 2024
1 parent abd8266 commit c304e3c
Show file tree
Hide file tree
Showing 7 changed files with 289 additions and 279 deletions.
141 changes: 135 additions & 6 deletions src/query/service/src/pipelines/builders/builder_compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,35 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use databend_common_base::runtime::Runtime;
use databend_common_catalog::plan::PartInfoType;
use databend_common_catalog::plan::Partitions;
use databend_common_catalog::plan::PartitionsShuffleKind;
use databend_common_catalog::plan::Projection;
use databend_common_catalog::table::Table;
use databend_common_catalog::table_context::TableContext;
use databend_common_exception::Result;
use databend_common_pipeline_sources::EmptySource;
use databend_common_sql::executor::physical_plans::CompactSource;
use databend_common_pipeline_sources::PrefetchAsyncSourcer;
use databend_common_pipeline_transforms::processors::TransformPipelineHelper;
use databend_common_sql::executor::physical_plans::CompactSource as PhysicalCompactSource;
use databend_common_sql::executor::physical_plans::MutationKind;
use databend_common_sql::StreamContext;
use databend_common_storages_fuse::operations::BlockCompactMutator;
use databend_common_storages_fuse::operations::CompactLazyPartInfo;
use databend_common_storages_fuse::operations::CompactSource;
use databend_common_storages_fuse::operations::CompactTransform;
use databend_common_storages_fuse::operations::TableMutationAggregator;
use databend_common_storages_fuse::operations::TransformSerializeBlock;
use databend_common_storages_fuse::FuseTable;

use crate::pipelines::PipelineBuilder;

impl PipelineBuilder {
pub(crate) fn build_compact_source(&mut self, compact_block: &CompactSource) -> Result<()> {
pub(crate) fn build_compact_source(
&mut self,
compact_block: &PhysicalCompactSource,
) -> Result<()> {
let table = self
.ctx
.build_table_by_table_info(&compact_block.table_info, None)?;
Expand All @@ -30,11 +50,120 @@ impl PipelineBuilder {
return self.main_pipeline.add_source(EmptySource::create, 1);
}

table.build_compact_source(
let is_lazy = compact_block.parts.partitions_type() == PartInfoType::LazyLevel;
let thresholds = table.get_block_thresholds();
let cluster_key_id = table.cluster_key_id();
let mut max_threads = self.ctx.get_settings().get_max_threads()? as usize;

if is_lazy {
let query_ctx = self.ctx.clone();

let lazy_parts = compact_block
.parts
.partitions
.iter()
.map(|v| {
v.as_any()
.downcast_ref::<CompactLazyPartInfo>()
.unwrap()
.clone()
})
.collect::<Vec<_>>();

let column_ids = compact_block.column_ids.clone();
self.main_pipeline.set_on_init(move || {
let ctx = query_ctx.clone();
let partitions = Runtime::with_worker_threads(2, None)?.block_on(async move {
let partitions = BlockCompactMutator::build_compact_tasks(
ctx.clone(),
column_ids.clone(),
cluster_key_id,
thresholds,
lazy_parts,
)
.await?;

Result::<_>::Ok(partitions)
})?;

let partitions = Partitions::create(PartitionsShuffleKind::Mod, partitions);
query_ctx.set_partitions(partitions)?;
Ok(())
});
} else {
max_threads = max_threads.min(compact_block.parts.len()).max(1);
self.ctx.set_partitions(compact_block.parts.clone())?;
}

let block_reader = table.create_block_reader(
self.ctx.clone(),
Projection::Columns(table.all_column_indices()),
false,
table.change_tracking_enabled(),
false,
)?;
let stream_ctx = if table.change_tracking_enabled() {
Some(StreamContext::try_create(
self.ctx.get_function_context()?,
table.schema_with_stream(),
table.get_table_info().ident.seq,
false,
false,
)?)
} else {
None
};
// Add source pipe.
self.main_pipeline.add_source(
|output| {
let source = CompactSource::create(self.ctx.clone(), block_reader.clone(), 1);
PrefetchAsyncSourcer::create(self.ctx.clone(), output, source)
},
max_threads,
)?;
let storage_format = table.get_storage_format();
self.main_pipeline.add_block_meta_transformer(|| {
CompactTransform::create(
self.ctx.clone(),
block_reader.clone(),
storage_format,
stream_ctx.clone(),
)
});

// sort
let cluster_stats_gen = table.cluster_gen_for_append(
self.ctx.clone(),
compact_block.parts.clone(),
compact_block.column_ids.clone(),
&mut self.main_pipeline,
)
thresholds,
None,
)?;
self.main_pipeline.add_transform(|input, output| {
let proc = TransformSerializeBlock::try_create(
self.ctx.clone(),
input,
output,
table,
cluster_stats_gen.clone(),
MutationKind::Compact,
)?;
proc.into_processor()
})?;

if is_lazy {
self.main_pipeline.try_resize(1)?;
self.main_pipeline.add_async_accumulating_transformer(|| {
TableMutationAggregator::create(
table,
self.ctx.clone(),
vec![],
vec![],
vec![],
Default::default(),
MutationKind::Compact,
)
});
}
Ok(())
}
}
4 changes: 4 additions & 0 deletions src/query/storages/fuse/src/fuse_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,10 @@ impl FuseTable {
};
Ok(retention_period)
}

pub fn get_storage_format(&self) -> FuseStorageFormat {
self.storage_format
}
}

#[async_trait::async_trait]
Expand Down
134 changes: 0 additions & 134 deletions src/query/storages/fuse/src/operations/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,31 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashSet;
use std::sync::Arc;

use databend_common_base::runtime::Runtime;
use databend_common_catalog::plan::PartInfoType;
use databend_common_catalog::plan::Partitions;
use databend_common_catalog::plan::PartitionsShuffleKind;
use databend_common_catalog::plan::Projection;
use databend_common_catalog::table::CompactionLimits;
use databend_common_exception::Result;
use databend_common_expression::ColumnId;
use databend_common_expression::ComputedExpr;
use databend_common_expression::FieldIndex;
use databend_common_pipeline_core::Pipeline;
use databend_common_pipeline_transforms::processors::TransformPipelineHelper;
use databend_common_sql::executor::physical_plans::MutationKind;
use databend_common_sql::StreamContext;
use databend_storages_common_table_meta::meta::Statistics;
use databend_storages_common_table_meta::meta::TableSnapshot;

use crate::operations::common::TableMutationAggregator;
use crate::operations::common::TransformSerializeBlock;
use crate::operations::mutation::BlockCompactMutator;
use crate::operations::mutation::CompactLazyPartInfo;
use crate::operations::mutation::CompactSource;
use crate::operations::mutation::SegmentCompactMutator;
use crate::FuseTable;
use crate::Table;
Expand Down Expand Up @@ -119,125 +104,6 @@ impl FuseTable {
)))
}

pub fn build_compact_source(
&self,
ctx: Arc<dyn TableContext>,
parts: Partitions,
column_ids: HashSet<ColumnId>,
pipeline: &mut Pipeline,
) -> Result<()> {
let is_lazy = parts.partitions_type() == PartInfoType::LazyLevel;
let thresholds = self.get_block_thresholds();
let cluster_key_id = self.cluster_key_id();
let mut max_threads = ctx.get_settings().get_max_threads()? as usize;

if is_lazy {
let query_ctx = ctx.clone();

let lazy_parts = parts
.partitions
.into_iter()
.map(|v| {
v.as_any()
.downcast_ref::<CompactLazyPartInfo>()
.unwrap()
.clone()
})
.collect::<Vec<_>>();

pipeline.set_on_init(move || {
let ctx = query_ctx.clone();
let column_ids = column_ids.clone();
let partitions = Runtime::with_worker_threads(2, None)?.block_on(async move {
let partitions = BlockCompactMutator::build_compact_tasks(
ctx.clone(),
column_ids,
cluster_key_id,
thresholds,
lazy_parts,
)
.await?;

Result::<_>::Ok(partitions)
})?;

let partitions = Partitions::create(PartitionsShuffleKind::Mod, partitions);
query_ctx.set_partitions(partitions)?;
Ok(())
});
} else {
max_threads = max_threads.min(parts.len()).max(1);
ctx.set_partitions(parts)?;
}

let all_column_indices = self.all_column_indices();
let projection = Projection::Columns(all_column_indices);
let block_reader = self.create_block_reader(
ctx.clone(),
projection,
false,
self.change_tracking_enabled(),
false,
)?;
let stream_ctx = if self.change_tracking_enabled() {
Some(StreamContext::try_create(
ctx.get_function_context()?,
self.schema_with_stream(),
self.get_table_info().ident.seq,
false,
false,
)?)
} else {
None
};
// Add source pipe.
pipeline.add_source(
|output| {
CompactSource::try_create(
ctx.clone(),
self.storage_format,
block_reader.clone(),
stream_ctx.clone(),
output,
)
},
max_threads,
)?;

// sort
let cluster_stats_gen =
self.cluster_gen_for_append(ctx.clone(), pipeline, thresholds, None)?;
pipeline.add_transform(
|input: Arc<databend_common_pipeline_core::processors::InputPort>, output| {
let proc = TransformSerializeBlock::try_create(
ctx.clone(),
input,
output,
self,
cluster_stats_gen.clone(),
MutationKind::Compact,
)?;
proc.into_processor()
},
)?;

if is_lazy {
pipeline.try_resize(1)?;
pipeline.add_async_accumulating_transformer(|| {
TableMutationAggregator::create(
self,
ctx.clone(),
vec![],
vec![],
vec![],
Statistics::default(),
MutationKind::Compact,
)
});
}
Ok(())
}

async fn compact_options_with_segment_limit(
&self,
num_segment_limit: Option<usize>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub use compact_part::CompactExtraInfo;
pub use compact_part::CompactLazyPartInfo;
pub use compact_part::CompactTaskInfo;
pub use mutation_meta::ClusterStatsGenType;
pub use mutation_meta::CompactSourceMeta;
pub use mutation_meta::SerializeBlock;
pub use mutation_meta::SerializeDataMeta;
pub use mutation_part::DeletedSegmentInfo;
Expand Down
Loading

0 comments on commit c304e3c

Please sign in to comment.