Skip to content

Commit

Permalink
chore(query): revert #16192 (#16271)
Browse files Browse the repository at this point in the history
  • Loading branch information
Dousir9 authored Aug 18, 2024
1 parent b32117a commit a8da519
Show file tree
Hide file tree
Showing 5 changed files with 5 additions and 173 deletions.
2 changes: 0 additions & 2 deletions src/query/storages/common/table_meta/src/meta/v2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,11 @@ pub mod statistics;
mod table_snapshot_statistics;

pub use segment::BlockMeta;
pub use segment::BlockMetaMessagePack;
pub use segment::ColumnMeta;
pub use segment::SegmentInfo;
pub use snapshot::TableSnapshot;
pub use statistics::ClusterStatistics;
pub use statistics::ColumnStatistics;
pub use statistics::Statistics;
pub use statistics::StatisticsMessagePack;
pub use table_snapshot_statistics::MetaHLL;
pub use table_snapshot_statistics::TableSnapshotStatistics;
44 changes: 0 additions & 44 deletions src/query/storages/common/table_meta/src/meta/v2/segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,50 +84,6 @@ pub struct BlockMeta {
pub create_on: Option<DateTime<Utc>>,
}

/// An exact copy of `BlockMeta` with specific `deserialize_with` implementation that
/// can correctly deserialize legacy MessagePack format.
#[derive(Clone, Deserialize)]
pub struct BlockMetaMessagePack {
row_count: u64,
block_size: u64,
file_size: u64,
#[serde(deserialize_with = "crate::meta::v2::statistics::default_on_error")]
col_stats: HashMap<ColumnId, ColumnStatistics>,
col_metas: HashMap<ColumnId, ColumnMeta>,
cluster_stats: Option<ClusterStatistics>,
/// location of data block
location: Location,
/// location of bloom filter index
bloom_filter_index_location: Option<Location>,

#[serde(default)]
bloom_filter_index_size: u64,
inverted_index_size: Option<u64>,
compression: Compression,

// block create_on
create_on: Option<DateTime<Utc>>,
}

impl From<BlockMetaMessagePack> for BlockMeta {
fn from(b: BlockMetaMessagePack) -> Self {
Self {
row_count: b.row_count,
block_size: b.block_size,
file_size: b.file_size,
col_stats: b.col_stats,
col_metas: b.col_metas,
cluster_stats: b.cluster_stats,
location: b.location,
bloom_filter_index_location: b.bloom_filter_index_location,
bloom_filter_index_size: b.bloom_filter_index_size,
inverted_index_size: b.inverted_index_size,
compression: b.compression,
create_on: b.create_on,
}
}
}

impl BlockMeta {
#[allow(clippy::too_many_arguments)]
pub fn new(
Expand Down
57 changes: 0 additions & 57 deletions src/query/storages/common/table_meta/src/meta/v2/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use databend_common_expression::Scalar;
use databend_common_expression::TableDataType;
use databend_common_expression::TableField;
use serde::de::Error;
use serde::Deserialize;

use crate::meta::v0;

Expand Down Expand Up @@ -84,38 +83,6 @@ pub struct Statistics {
pub cluster_stats: Option<ClusterStatistics>,
}

/// An exact copy of `Statistics` with specific `deserialize_with` implementation that can
/// correctly deserialize legacy MessagePack format.
#[derive(serde::Deserialize)]
pub struct StatisticsMessagePack {
row_count: u64,
block_count: u64,
perfect_block_count: u64,

uncompressed_byte_size: u64,
compressed_byte_size: u64,
index_size: u64,

#[serde(deserialize_with = "crate::meta::v2::statistics::default_on_error")]
col_stats: HashMap<ColumnId, ColumnStatistics>,
cluster_stats: Option<ClusterStatistics>,
}

impl From<StatisticsMessagePack> for Statistics {
fn from(v: StatisticsMessagePack) -> Self {
Self {
row_count: v.row_count,
block_count: v.block_count,
perfect_block_count: v.perfect_block_count,
uncompressed_byte_size: v.uncompressed_byte_size,
compressed_byte_size: v.compressed_byte_size,
index_size: v.index_size,
col_stats: v.col_stats,
cluster_stats: v.cluster_stats,
}
}
}

// conversions from old meta data
// ----------------------------------------------------------------
// ----------------------------------------------------------------
Expand Down Expand Up @@ -437,27 +404,3 @@ where
Ok(map)
}
}

/// Deserializes `T`, falling back to `Default::default()` on error.
///
/// This function is designed to handle legacy `ColumnStatistics` items that incorrectly
/// include unsupported `min` and `max` index types. In the new `IndexScalar` type, these
/// unsupported index types cannot be deserialized correctly.
pub fn default_on_error<'de, T, D>(deserializer: D) -> Result<T, D::Error>
where
T: Default + serde::Deserialize<'de>,
D: serde::Deserializer<'de>,
{
#[derive(Deserialize)]
#[serde(untagged)]
enum DefaultOnError<T> {
Success(T),
Error(serde::de::IgnoredAny),
}

let v = DefaultOnError::<T>::deserialize(deserializer);
match v {
Ok(DefaultOnError::Success(v)) => Ok(v),
_ => Ok(T::default()),
}
}
28 changes: 4 additions & 24 deletions src/query/storages/common/table_meta/src/meta/v4/segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ use crate::meta::format::MetaCompression;
use crate::meta::format::SegmentHeader;
use crate::meta::format::MAX_SEGMENT_BLOCK_NUMBER;
use crate::meta::v2::BlockMeta;
use crate::meta::v2::BlockMetaMessagePack;
use crate::meta::v2::StatisticsMessagePack;
use crate::meta::FormatVersion;
use crate::meta::MetaEncoding;
use crate::meta::Statistics;
Expand Down Expand Up @@ -188,28 +186,10 @@ impl SegmentInfo {
summary_size,
} = decode_segment_header(&mut cursor)?;

let (blocks, summary): (Vec<Arc<BlockMeta>>, Statistics) = match encoding {
MetaEncoding::MessagePack => {
let blocks: Vec<Arc<BlockMetaMessagePack>> =
read_and_deserialize(&mut cursor, blocks_size, &encoding, &compression)?;
let summary: StatisticsMessagePack =
read_and_deserialize(&mut cursor, summary_size, &encoding, &compression)?;
(
blocks
.into_iter()
.map(|v| Arc::new(v.as_ref().clone().into()))
.collect(),
summary.into(),
)
}
MetaEncoding::Bincode | MetaEncoding::Json => {
let blocks: Vec<Arc<BlockMeta>> =
read_and_deserialize(&mut cursor, blocks_size, &encoding, &compression)?;
let summary: Statistics =
read_and_deserialize(&mut cursor, summary_size, &encoding, &compression)?;
(blocks, summary)
}
};
let blocks: Vec<Arc<BlockMeta>> =
read_and_deserialize(&mut cursor, blocks_size, &encoding, &compression)?;
let summary: Statistics =
read_and_deserialize(&mut cursor, summary_size, &encoding, &compression)?;

let mut segment = Self::new(blocks, summary);

Expand Down
47 changes: 1 addition & 46 deletions src/query/storages/common/table_meta/src/meta/v4/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ use crate::meta::format::MetaCompression;
use crate::meta::monotonically_increased_timestamp;
use crate::meta::trim_timestamp_to_micro_second;
use crate::meta::v2;
use crate::meta::v2::StatisticsMessagePack;
use crate::meta::v3;
use crate::meta::ClusterKey;
use crate::meta::FormatVersion;
Expand Down Expand Up @@ -90,39 +89,6 @@ pub struct TableSnapshot {
pub table_statistics_location: Option<String>,
}

/// An exact copy of `TableSnapshot` with specific `deserialize_with` implementation
/// in `summary` that can correctly deserialize legacy MessagePack format.
#[derive(Deserialize)]
pub struct TableSnapshotMessagePack {
format_version: FormatVersion,
snapshot_id: SnapshotId,
timestamp: Option<DateTime<Utc>>,
prev_table_seq: Option<u64>,
prev_snapshot_id: Option<(SnapshotId, FormatVersion)>,
schema: TableSchema,
summary: StatisticsMessagePack,
segments: Vec<Location>,
cluster_key_meta: Option<ClusterKey>,
table_statistics_location: Option<String>,
}

impl From<TableSnapshotMessagePack> for TableSnapshot {
fn from(v: TableSnapshotMessagePack) -> Self {
Self {
format_version: v.format_version,
snapshot_id: v.snapshot_id,
timestamp: v.timestamp,
prev_table_seq: v.prev_table_seq,
prev_snapshot_id: v.prev_snapshot_id,
schema: v.schema,
summary: v.summary.into(),
segments: v.segments,
cluster_key_meta: v.cluster_key_meta,
table_statistics_location: v.table_statistics_location,
}
}
}

impl TableSnapshot {
pub fn new(
snapshot_id: SnapshotId,
Expand Down Expand Up @@ -243,18 +209,7 @@ impl TableSnapshot {
let compression = MetaCompression::try_from(r.read_scalar::<u8>()?)?;
let snapshot_size: u64 = r.read_scalar::<u64>()?;

match encoding {
MetaEncoding::MessagePack => {
let snapshot: TableSnapshotMessagePack =
read_and_deserialize(&mut r, snapshot_size, &encoding, &compression)?;
Ok(snapshot.into())
}
MetaEncoding::Bincode | MetaEncoding::Json => {
let snapshot: TableSnapshot =
read_and_deserialize(&mut r, snapshot_size, &encoding, &compression)?;
Ok(snapshot)
}
}
read_and_deserialize(&mut r, snapshot_size, &encoding, &compression)
}

#[inline]
Expand Down

0 comments on commit a8da519

Please sign in to comment.