Skip to content

Commit

Permalink
[Enhancement] improve cloud native pk table memory cost when handle l…
Browse files Browse the repository at this point in the history
…arge ingestion (backport #45685) (#46548)

Signed-off-by: luohaha <[email protected]>
  • Loading branch information
luohaha authored Jun 3, 2024
1 parent 6b24464 commit 759cc78
Show file tree
Hide file tree
Showing 13 changed files with 755 additions and 560 deletions.
2 changes: 1 addition & 1 deletion be/src/storage/lake/delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,7 @@ Status DeltaWriterImpl::fill_auto_increment_id(const Chunk& chunk) {
auto metadata = _tablet_manager->get_latest_cached_tablet_metadata(_tablet_id);
Status st;
if (metadata != nullptr) {
st = tablet.update_mgr()->get_rowids_from_pkindex(&tablet, metadata->version(), upserts, &rss_rowids, true);
st = tablet.update_mgr()->get_rowids_from_pkindex(tablet.id(), metadata->version(), upserts, &rss_rowids, true);
}

std::vector<uint8_t> filter;
Expand Down
24 changes: 0 additions & 24 deletions be/src/storage/lake/meta_file.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -324,28 +324,4 @@ bool is_primary_key(const TabletMetadata& metadata) {
return metadata.schema().keys_type() == KeysType::PRIMARY_KEYS;
}

void rowset_rssid_to_path(const TabletMetadata& metadata, const TxnLogPB_OpWrite* op_write,
std::unordered_map<uint32_t, FileInfo>& rssid_to_file_info) {
auto get_file_info_from_rowset = [&](const RowsetMetadataPB& meta, const uint32_t rowset_id) -> void {
bool has_segment_size = (meta.segments_size() == meta.segment_size_size());
for (int i = 0; i < meta.segments_size(); i++) {
FileInfo segment_info{.path = meta.segments(i)};
if (LIKELY(has_segment_size)) {
segment_info.size = meta.segment_size(i);
}
rssid_to_file_info[rowset_id + i] = segment_info;
}
};

for (auto& rs : metadata.rowsets()) {
get_file_info_from_rowset(rs, rs.id());
}
if (op_write != nullptr) {
const uint32_t rowset_id = metadata.next_rowset_id();
for (int i = 0; i < op_write->rowset().segments_size(); i++) {
get_file_info_from_rowset(op_write->rowset(), rowset_id);
}
}
}

} // namespace starrocks::lake
4 changes: 0 additions & 4 deletions be/src/storage/lake/meta_file.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,5 @@ Status get_del_vec(TabletManager* tablet_mgr, const TabletMetadata& metadata, ui
bool is_primary_key(TabletMetadata* metadata);
bool is_primary_key(const TabletMetadata& metadata);

// TODO(yixin): cache rowset_rssid_to_path
void rowset_rssid_to_path(const TabletMetadata& metadata, const TxnLogPB_OpWrite* op_write,
std::unordered_map<uint32_t, FileInfo>& rssid_to_path);

} // namespace lake
} // namespace starrocks
712 changes: 337 additions & 375 deletions be/src/storage/lake/rowset_update_state.cpp

Large diffs are not rendered by default.

124 changes: 90 additions & 34 deletions be/src/storage/lake/rowset_update_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,24 @@

namespace starrocks::lake {

class MetaFileBuilder;
class RssidFileInfoContainer;

struct PartialUpdateState {
std::vector<uint64_t> src_rss_rowids;
std::vector<std::unique_ptr<Column>> write_columns;
void reset() {
src_rss_rowids.clear();
write_columns.clear();
}
size_t memory_usage() const {
size_t memory_usage = 0;
for (const auto& col : write_columns) {
if (col != nullptr) {
memory_usage += col->memory_usage();
}
}
return memory_usage;
}
};

struct AutoIncrementPartialUpdateState {
Expand All @@ -49,6 +62,21 @@ struct AutoIncrementPartialUpdateState {
this->id = id;
this->segment_id = segment_id;
}
void reset() {
src_rss_rowids.clear();
write_column.reset();
schema.reset();
rowids.clear();
}
size_t memory_usage() const { return write_column ? write_column->memory_usage() : 0; }
};

struct RowsetUpdateStateParams {
const TxnLogPB_OpWrite& op_write;
const TabletSchemaPtr& tablet_schema;
const TabletMetadata& metadata;
const Tablet* tablet;
const RssidFileInfoContainer& container;
};

class RowsetUpdateState {
Expand All @@ -58,76 +86,104 @@ class RowsetUpdateState {
RowsetUpdateState();
~RowsetUpdateState();

Status load(const TxnLogPB_OpWrite& op_write, const TabletMetadata& metadata, int64_t base_version, Tablet* tablet,
const MetaFileBuilder* builder, bool need_check_conflict, bool need_lock);

Status rewrite_segment(const TxnLogPB_OpWrite& op_write, const TabletMetadata& metadata, Tablet* tablet,
DISALLOW_COPY_AND_MOVE(RowsetUpdateState);

// How to use `RowsetUpdateState` when publish:
//
// init()
//
// for each segment:
// load_segment()
// rewrite_segment()
// ...
// release_segment()
//
// for each del file:
// load_delete()
// ...
// release_delete()

// init params in RowsetUpdateState.
void init(const RowsetUpdateStateParams& params);

// Load `segment_id`-th segment file's state.
Status load_segment(uint32_t segment_id, const RowsetUpdateStateParams& params, int64_t base_version,
bool need_resolve_conflict, bool need_lock);

// Handle `segment_id`-th segment file's partial update request.
Status rewrite_segment(uint32_t segment_id, const RowsetUpdateStateParams& params,
std::map<int, FileInfo>* replace_segments, std::vector<std::string>* orphan_files);

const std::vector<ColumnUniquePtr>& upserts() const { return _upserts; }
const std::vector<ColumnUniquePtr>& deletes() const { return _deletes; }
// Release `segment_id`-th segment file's state.
void release_segment(uint32_t segment_id);

// Load `del_id`-th delete file's state.
Status load_delete(uint32_t del_id, const RowsetUpdateStateParams& params);

// Release `del_id`-th delete file's state.
void release_delete(uint32_t del_id);

const ColumnUniquePtr& upserts(uint32_t segment_id) const { return _upserts[segment_id]; }
const ColumnUniquePtr& deletes(uint32_t segment_id) const { return _deletes[segment_id]; }

std::size_t memory_usage() const { return _memory_usage; }

std::string to_string() const;

const std::vector<PartialUpdateState>& parital_update_states() { return _partial_update_states; }
const PartialUpdateState& parital_update_states(uint32_t segment_id) { return _partial_update_states[segment_id]; }

static void plan_read_by_rssid(const std::vector<uint64_t>& rowids, size_t* num_default,
std::map<uint32_t, std::vector<uint32_t>>* rowids_by_rssid,
std::vector<uint32_t>* idxes);

const std::vector<std::unique_ptr<Column>>& auto_increment_deletes() const;
const ColumnUniquePtr& auto_increment_deletes(uint32_t segment_id) const;

static StatusOr<bool> file_exist(const std::string& full_path);

private:
Status _do_load(const TxnLogPB_OpWrite& op_write, const TabletMetadata& metadata, Tablet* tablet, bool need_lock);
// Load segment state
Status _do_load_upserts(uint32_t segment_id, const RowsetUpdateStateParams& params);

Status _do_load_upserts_deletes(const TxnLogPB_OpWrite& op_write, const TabletSchemaCSPtr& tablet_schema,
Tablet* tablet, Rowset* rowset_ptr);
Status _prepare_partial_update_states(uint32_t segment_id, const RowsetUpdateStateParams& params, bool need_lock);

Status _prepare_partial_update_states(const TxnLogPB_OpWrite& op_write, const TabletMetadata& metadata,
Tablet* tablet, const TabletSchemaCSPtr& tablet_schema, bool need_lock);
Status _prepare_auto_increment_partial_update_states(uint32_t segment_id, const RowsetUpdateStateParams& params,
bool need_lock);

Status _resolve_conflict(const TxnLogPB_OpWrite& op_write, const TabletMetadata& metadata, int64_t base_version,
Tablet* tablet, const MetaFileBuilder* builder);
// resolve conflict when publish transaction
Status _resolve_conflict(uint32_t segment_id, const RowsetUpdateStateParams& params, int64_t base_version);

Status _resolve_conflict_partial_update(const TxnLogPB_OpWrite& op_write, const TabletMetadata& metadata,
Tablet* tablet, const std::vector<uint64_t>& new_rss_rowids,
Status _resolve_conflict_partial_update(const RowsetUpdateStateParams& params,
const std::vector<uint64_t>& new_rss_rowids,
std::vector<uint32_t>& read_column_ids, uint32_t segment_id,
size_t& total_conflicts, const TabletSchemaCSPtr& tablet_schema);

Status _resolve_conflict_auto_increment(const TxnLogPB_OpWrite& op_write, const TabletMetadata& metadata,
Tablet* tablet, const std::vector<uint64_t>& new_rss_rowids,
uint32_t segment_id, size_t& total_conflicts,
const TabletSchemaCSPtr& tablet_schema);
size_t& total_conflicts);

Status _prepare_auto_increment_partial_update_states(const TxnLogPB_OpWrite& op_write,
const TabletMetadata& metadata, Tablet* tablet,
const TabletSchemaCSPtr& tablet_schema, bool need_lock);
Status _resolve_conflict_auto_increment(const RowsetUpdateStateParams& params,
const std::vector<uint64_t>& new_rss_rowids, uint32_t segment_id,
size_t& total_conflicts);

std::once_flag _load_once_flag;
Status _status;
// one for each segment file
std::vector<ColumnUniquePtr> _upserts;
// one for each delete file
std::vector<ColumnUniquePtr> _deletes;
size_t _memory_usage = 0;
int64_t _tablet_id = 0;
// Because we can load partial segments when preload, so need vector to track their version.
std::vector<int64_t> _base_versions;

// TODO: dump to disk if memory usage is too large
std::vector<PartialUpdateState> _partial_update_states;

std::vector<AutoIncrementPartialUpdateState> _auto_increment_partial_update_states;

std::vector<std::unique_ptr<Column>> _auto_increment_delete_pks;
std::vector<ColumnUniquePtr> _auto_increment_delete_pks;

int64_t _base_version;
const MetaFileBuilder* _builder;
// `_rowset_meta_ptr` contains full life cycle rowset meta in `_rowset_ptr`.
RowsetMetadataUniquePtr _rowset_meta_ptr;
std::unique_ptr<Rowset> _rowset_ptr;

RowsetUpdateState(const RowsetUpdateState&) = delete;
const RowsetUpdateState& operator=(const RowsetUpdateState&) = delete;
// to be destructed after segment iters
OlapReaderStatistics _stats;
std::vector<ChunkIteratorPtr> _segment_iters;
};

inline std::ostream& operator<<(std::ostream& os, const RowsetUpdateState& o) {
Expand Down
1 change: 1 addition & 0 deletions be/src/storage/lake/types_fwd.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class RowsetMetadataPB;
using ChunkIteratorPtr = std::shared_ptr<ChunkIterator>;
using RowsetMetadata = RowsetMetadataPB;
using RowsetMetadataPtr = std::shared_ptr<const RowsetMetadata>;
using RowsetMetadataUniquePtr = std::unique_ptr<const RowsetMetadata>;

namespace lake {

Expand Down
Loading

0 comments on commit 759cc78

Please sign in to comment.