Skip to content

Commit

Permalink
[Enhancement] Skip tablet schema in rowset meta during ingestion. (ba…
Browse files Browse the repository at this point in the history
…ckport #50873) (#51843)

Signed-off-by: sevev <qiangzh95@gmail.com>
Co-authored-by: zhangqiang <qiangzh95@gmail.com>
  • Loading branch information
mergify[bot] and sevev authored Oct 24, 2024
1 parent 13d33f1 commit a6264c9
Show file tree
Hide file tree
Showing 19 changed files with 457 additions and 68 deletions.
4 changes: 4 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1303,4 +1303,8 @@ CONF_mBool(enable_lake_compaction_use_partial_segments, "false");
// chunk size used by lake compaction
CONF_mInt32(lake_compaction_chunk_size, "4096");

CONF_mBool(skip_schema_in_rowset_meta, "true");

CONF_mInt32(max_committed_without_schema_rowset, "1000");

} // namespace starrocks::config
1 change: 1 addition & 0 deletions be/src/storage/base_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ class BaseTablet : public std::enable_shared_from_this<BaseTablet> {
for (const RowsetMetaSharedPtr& rowset_meta : _tablet_meta->all_rs_metas()) {
if (!rowset_meta->has_tablet_schema_pb()) {
rowset_meta->set_tablet_schema(tablet_schema());
rowset_meta->set_skip_tablet_schema(true);
flag = true;
}
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/storage/compaction_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ class CompactionTask : public BackgroundTask {
}
std::vector<RowsetSharedPtr> to_replace;
_tablet->modify_rowsets({_output_rowset}, _input_rowsets, &to_replace);
_tablet->save_meta();
_tablet->save_meta(config::skip_schema_in_rowset_meta);
Rowset::close_rowsets(_input_rowsets);
for (auto& rs : to_replace) {
StorageEngine::instance()->add_unused_rowset(rs);
Expand Down
20 changes: 2 additions & 18 deletions be/src/storage/data_dir.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -391,15 +391,7 @@ Status DataDir::load() {
if (!rowset_meta->tablet_schema()) {
auto tablet_schema_ptr = tablet->tablet_schema();
rowset_meta->set_tablet_schema(tablet_schema_ptr);
RowsetMetaPB meta_pb;
rowset_meta->get_full_meta_pb(&meta_pb);
Status rs_meta_save_status = RowsetMetaManager::save(get_meta(), rowset_meta->tablet_uid(), meta_pb);
if (!rs_meta_save_status.ok()) {
LOG(WARNING) << "Failed to save rowset meta, rowset=" << rowset_meta->rowset_id()
<< " tablet=" << rowset_meta->tablet_id() << " txn_id: " << rowset_meta->txn_id();
error_rowset_count++;
return true;
}
rowset_meta->set_skip_tablet_schema(true);
}
Status commit_txn_status = _txn_manager->commit_txn(
_kv_store, rowset_meta->partition_id(), rowset_meta->txn_id(), rowset_meta->tablet_id(),
Expand All @@ -418,15 +410,7 @@ Status DataDir::load() {
Status publish_status = tablet->load_rowset(rowset);
if (!rowset_meta->tablet_schema()) {
rowset_meta->set_tablet_schema(tablet->tablet_schema());
RowsetMetaPB meta_pb;
rowset_meta->get_full_meta_pb(&meta_pb);
Status rs_meta_save_status = RowsetMetaManager::save(get_meta(), rowset_meta->tablet_uid(), meta_pb);
if (!rs_meta_save_status.ok()) {
LOG(WARNING) << "Failed to save rowset meta, rowset=" << rowset_meta->rowset_id()
<< " tablet=" << rowset_meta->tablet_id() << " txn_id: " << rowset_meta->txn_id();
error_rowset_count++;
return true;
}
rowset_meta->set_skip_tablet_schema(true);
}
if (!publish_status.ok() && !publish_status.is_already_exist()) {
LOG(WARNING) << "Fail to add visible rowset=" << rowset->rowset_id()
Expand Down
10 changes: 10 additions & 0 deletions be/src/storage/olap_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,16 @@ struct RowsetId {
}
};

struct HashOfRowsetId {
size_t operator()(const RowsetId& rowset_id) const {
size_t seed = 0;
seed = HashUtil::hash64(&rowset_id.hi, sizeof(rowset_id.hi), seed);
seed = HashUtil::hash64(&rowset_id.mi, sizeof(rowset_id.mi), seed);
seed = HashUtil::hash64(&rowset_id.lo, sizeof(rowset_id.lo), seed);
return seed;
}
};

struct TabletSegmentId {
int64_t tablet_id = INT64_MAX;
uint32_t segment_id = UINT32_MAX;
Expand Down
31 changes: 22 additions & 9 deletions be/src/storage/rowset/rowset_meta.h
Original file line number Diff line number Diff line change
Expand Up @@ -236,14 +236,16 @@ class RowsetMeta {
// new rowset.
// Before calling it, please confirm if you need a complete `rowset_meta` that includes `tablet_schema_pb`.
// If not, perhaps `get_meta_pb_without_schema()` is enough.
void get_full_meta_pb(RowsetMetaPB* rs_meta_pb, const TabletSchemaCSPtr& tablet_schema = nullptr) const {
void get_full_meta_pb(RowsetMetaPB* rs_meta_pb, bool skip_schema = false,
const TabletSchemaCSPtr& tablet_schema = nullptr) const {
*rs_meta_pb = *_rowset_meta_pb;
const TabletSchemaCSPtr& target_schema = (tablet_schema != nullptr) ? tablet_schema : _schema;

if (target_schema != nullptr) {
rs_meta_pb->clear_tablet_schema();
TabletSchemaPB* ts_pb = rs_meta_pb->mutable_tablet_schema();
target_schema->to_schema_pb(ts_pb);
if (!skip_schema) {
const TabletSchemaCSPtr& target_schema = (tablet_schema != nullptr) ? tablet_schema : _schema;
if (target_schema != nullptr) {
rs_meta_pb->clear_tablet_schema();
TabletSchemaPB* ts_pb = rs_meta_pb->mutable_tablet_schema();
target_schema->to_schema_pb(ts_pb);
}
}
}

Expand Down Expand Up @@ -274,6 +276,16 @@ class RowsetMeta {

bool has_tablet_schema_pb() { return _has_tablet_schema_pb; }

void set_skip_tablet_schema(bool skip_tablet_schema) { _skip_tablet_schema = skip_tablet_schema; }
bool skip_tablet_schema() { return _skip_tablet_schema; }
bool check_schema_id(int64_t latest_tablet_schema_id) {
if (_schema != nullptr && _schema->id() != TabletSchema::invalid_id() &&
_schema->id() == latest_tablet_schema_id) {
return true;
}
return false;
}

private:
bool _deserialize_from_pb(std::string_view value) {
return _rowset_meta_pb->ParseFromArray(value.data(), value.size());
Expand All @@ -294,8 +306,8 @@ class RowsetMeta {
_schema = TabletSchema::create(_rowset_meta_pb->tablet_schema());
}
}
_has_tablet_schema_pb = _rowset_meta_pb->has_tablet_schema();

_has_tablet_schema_pb = _rowset_meta_pb->has_tablet_schema();
// clear does not release memory but only set it to default value, so we need to copy a new _rowset_meta_pb
_rowset_meta_pb->clear_tablet_schema();
std::unique_ptr<RowsetMetaPB> ptr = std::make_unique<RowsetMetaPB>(*_rowset_meta_pb);
Expand Down Expand Up @@ -329,6 +341,7 @@ class RowsetMeta {
bool _is_removed_from_rowset_meta = false;
TabletSchemaCSPtr _schema = nullptr;
bool _has_tablet_schema_pb = false;
bool _skip_tablet_schema = false;
};

} // namespace starrocks
} // namespace starrocks
7 changes: 7 additions & 0 deletions be/src/storage/rowset/rowset_meta_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,4 +103,11 @@ Status RowsetMetaManager::traverse_rowset_metas(
return meta->iterate(META_COLUMN_FAMILY_INDEX, ROWSET_PREFIX, traverse_rowset_meta_func);
}

Status RowsetMetaManager::get_rowset_meta_value(KVStore* meta, const TabletUid& tablet_uid, const RowsetId& rowset_id,
std::string* value) {
std::string key = get_rowset_meta_key(tablet_uid, rowset_id);
RETURN_IF_ERROR(meta->get(META_COLUMN_FAMILY_INDEX, key, value));
return Status::OK();
}

} // namespace starrocks
3 changes: 3 additions & 0 deletions be/src/storage/rowset/rowset_meta_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ class RowsetMetaManager {

static Status traverse_rowset_metas(
KVStore* meta, std::function<bool(const TabletUid&, const RowsetId&, std::string_view)> const& func);

static Status get_rowset_meta_value(KVStore* meta, const TabletUid& tablet_uid, const RowsetId& rowset_id,
std::string* value);
};

} // namespace starrocks
69 changes: 51 additions & 18 deletions be/src/storage/tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,19 +83,7 @@ Tablet::Tablet(const TabletMetaSharedPtr& tablet_meta, DataDir* data_dir)
_cumulative_point(kInvalidCumulativePoint) {
// change _rs_graph to _timestamped_version_tracker
_timestamped_version_tracker.construct_versioned_tracker(_tablet_meta->all_rs_metas());

// if !_tablet_meta->all_rs_metas()[0]->tablet_schema(),
// that mean the tablet_meta is still no upgrade to support-light-schema-change versions.
// Before support-light-schema-change version, rowset metas don't have tablet schema.
// And when upgrade to starrocks support-light-schema-change version,
// all rowset metas will be set the tablet schema from tablet meta.
if (_tablet_meta->all_rs_metas().empty() || !_tablet_meta->all_rs_metas()[0]->tablet_schema()) {
_max_version_schema = BaseTablet::tablet_schema();
} else {
_max_version_schema =
TabletMeta::rowset_meta_with_max_rowset_version(_tablet_meta->all_rs_metas())->tablet_schema();
}

_max_version_schema = BaseTablet::tablet_schema();
MEM_TRACKER_SAFE_CONSUME(GlobalEnv::GetInstance()->tablet_metadata_mem_tracker(), _mem_usage());
}

Expand Down Expand Up @@ -171,8 +159,8 @@ Status Tablet::init() {

// should save tablet meta to remote meta store
// if it's a primary replica
void Tablet::save_meta() {
auto st = _tablet_meta->save_meta(_data_dir);
void Tablet::save_meta(bool skip_tablet_schema) {
auto st = _tablet_meta->save_meta(_data_dir, skip_tablet_schema);
CHECK(st.ok()) << "fail to save tablet_meta: " << st;
}

Expand Down Expand Up @@ -572,7 +560,11 @@ Status Tablet::add_inc_rowset(const RowsetSharedPtr& rowset, int64_t version) {
}

RowsetMetaPB rowset_meta_pb;
rowset->rowset_meta()->get_full_meta_pb(&rowset_meta_pb);
if (rowset->rowset_meta()->skip_tablet_schema()) {
rowset_meta_pb = rowset->rowset_meta()->get_meta_pb_without_schema();
} else {
rowset->rowset_meta()->get_full_meta_pb(&rowset_meta_pb);
}
// No matter whether contains the version, the rowset meta should always be saved. TxnManager::publish_txn
// will remove the in-memory txn information if Status::AlreadlyExist, but not the committed rowset meta
// (RowsetStatePB = COMMITTED) saved in rocksdb. Here modify the rowset to visible, and save it again
Expand Down Expand Up @@ -621,6 +613,26 @@ Status Tablet::add_inc_rowset(const RowsetSharedPtr& rowset, int64_t version) {
return Status::OK();
}

bool Tablet::add_committed_rowset(const RowsetSharedPtr& rowset) {
if (_committed_rs_map.size() >= config::max_committed_without_schema_rowset) {
VLOG(1) << "tablet: " << tablet_id()
<< " too many committed without schema rowset : " << _committed_rs_map.size();
return false;
}

if (rowset->rowset_meta()->check_schema_id(_max_version_schema->id())) {
_committed_rs_map[rowset->rowset_id()] = rowset;
return true;
}
return false;
}

void Tablet::erase_committed_rowset(const RowsetSharedPtr& rowset) {
if (rowset != nullptr) {
_committed_rs_map.erase(rowset->rowset_id());
}
}

void Tablet::_delete_inc_rowset_by_version(const Version& version) {
// delete incremental rowset from map
_inc_rs_version_map.erase(version);
Expand Down Expand Up @@ -1367,7 +1379,7 @@ void Tablet::do_tablet_meta_checkpoint() {
return;
}
LOG(INFO) << "start to do tablet meta checkpoint, tablet=" << full_name();
save_meta();
save_meta(config::skip_schema_in_rowset_meta);
// if save meta successfully, then should remove the rowset meta existing in tablet
// meta from rowset meta store
for (auto& rs_meta : _tablet_meta->all_rs_metas()) {
Expand Down Expand Up @@ -1736,17 +1748,38 @@ const TabletSchemaCSPtr Tablet::thread_safe_get_tablet_schema() const {
return _max_version_schema;
}

// for non-pk tablet, all published rowset will be rewrite when save `tablet_meta`
// for pk tablet, we need to get the rowset which without `tablet_schema` and rewrite
// the rowsets in `_committed_rs_map` is committed success but not publish yet, so if we update the
// tablet schema, we need to rewrite.
void Tablet::_get_rewrite_meta_rs(std::vector<RowsetSharedPtr>& rewrite_meta_rs) {
for (auto& [_, rs] : _committed_rs_map) {
if (rs->rowset_meta()->skip_tablet_schema()) {
rewrite_meta_rs.emplace_back(rs);
}
}

if (_updates) {
_updates->rewrite_rs_meta();
}
}

void Tablet::update_max_version_schema(const TabletSchemaCSPtr& tablet_schema) {
std::lock_guard l0(_meta_lock);
std::lock_guard l1(_schema_lock);
DeferOp defer([&]() { _update_schema_running.store(false); });
_update_schema_running.store(true);
// Double Check for concurrent update
if (!_max_version_schema || tablet_schema->schema_version() > _max_version_schema->schema_version()) {
if (tablet_schema->id() == TabletSchema::invalid_id()) {
_max_version_schema = tablet_schema;
} else {
_max_version_schema = GlobalTabletSchemaMap::Instance()->emplace(tablet_schema).first;
}
_tablet_meta->save_tablet_schema(_max_version_schema, _data_dir);
std::vector<RowsetSharedPtr> rewrite_meta_rs;
_get_rewrite_meta_rs(rewrite_meta_rs);
_tablet_meta->save_tablet_schema(_max_version_schema, rewrite_meta_rs, _data_dir);
_committed_rs_map.clear();
}
}

Expand Down
18 changes: 17 additions & 1 deletion be/src/storage/tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
#include "storage/utils.h"
#include "storage/version_graph.h"
#include "util/once.h"
#include "util/phmap/phmap.h"

namespace starrocks {

Expand Down Expand Up @@ -99,7 +100,7 @@ class Tablet : public BaseTablet {
void register_tablet_into_dir();
void deregister_tablet_from_dir();

void save_meta();
void save_meta(bool skip_tablet_schema = false);
// Used in clone task, to update local meta when finishing a clone job
[[nodiscard]] Status revise_tablet_meta(const std::vector<RowsetMetaSharedPtr>& rowsets_to_clone,
const std::vector<Version>& versions_to_delete);
Expand Down Expand Up @@ -331,6 +332,13 @@ class Tablet : public BaseTablet {
// set true when start to drop tablet. only set in `TabletManager::drop_tablet` right now
void set_is_dropping(bool is_dropping) { _is_dropping = is_dropping; }

[[nodiscard]] bool is_update_schema_running() const { return _update_schema_running.load(); }
void set_update_schema_running(bool is_running) { _update_schema_running.store(is_running); }
std::shared_mutex& get_schema_lock() { return _schema_lock; }
bool add_committed_rowset(const RowsetSharedPtr& rowset);
void erase_committed_rowset(const RowsetSharedPtr& rowset);
int64_t committed_rowset_size() { return _committed_rs_map.size(); }

void on_shutdown() override;

private:
Expand Down Expand Up @@ -367,6 +375,7 @@ class Tablet : public BaseTablet {
// check whether there is useless binlog, and update the in-memory TabletMeta to the state after
// those binlog is deleted. Return true the meta has been changed, and needs to be persisted
bool _check_useless_binlog_and_update_meta(int64_t current_second);
void _get_rewrite_meta_rs(std::vector<RowsetSharedPtr>& rewrite_meta_rs);

friend class TabletUpdates;
static const int64_t kInvalidCumulativePoint = -1;
Expand Down Expand Up @@ -410,6 +419,12 @@ class Tablet : public BaseTablet {
// this policy is judged and computed by TimestampedVersionTracker.
std::unordered_map<Version, RowsetSharedPtr, HashOfVersion> _stale_rs_version_map;

// Keep the rowsets committed but not publish which rowset meta without schema
phmap::parallel_flat_hash_map<RowsetId, std::shared_ptr<Rowset>, HashOfRowsetId, std::equal_to<RowsetId>,
std::allocator<std::pair<const RowsetId, std::shared_ptr<Rowset>>>, 5, std::mutex,
true>
_committed_rs_map;

// States used for updatable tablets only
std::unique_ptr<TabletUpdates> _updates;

Expand Down Expand Up @@ -448,6 +463,7 @@ class Tablet : public BaseTablet {
bool _will_be_force_replaced = false;

std::atomic<bool> _is_dropping{false};
std::atomic<bool> _update_schema_running{false};
};

inline bool Tablet::init_succeeded() {
Expand Down
Loading

0 comments on commit a6264c9

Please sign in to comment.