diff --git a/be/src/common/config.h b/be/src/common/config.h index 849eeaff35419..78e2fc67b9f68 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1303,4 +1303,8 @@ CONF_mBool(enable_lake_compaction_use_partial_segments, "false"); // chunk size used by lake compaction CONF_mInt32(lake_compaction_chunk_size, "4096"); +CONF_mBool(skip_schema_in_rowset_meta, "true"); + +CONF_mInt32(max_committed_without_schema_rowset, "1000"); + } // namespace starrocks::config diff --git a/be/src/storage/base_tablet.h b/be/src/storage/base_tablet.h index 07466a0471841..d32d26471305a 100644 --- a/be/src/storage/base_tablet.h +++ b/be/src/storage/base_tablet.h @@ -109,6 +109,7 @@ class BaseTablet : public std::enable_shared_from_this { for (const RowsetMetaSharedPtr& rowset_meta : _tablet_meta->all_rs_metas()) { if (!rowset_meta->has_tablet_schema_pb()) { rowset_meta->set_tablet_schema(tablet_schema()); + rowset_meta->set_skip_tablet_schema(true); flag = true; } } diff --git a/be/src/storage/compaction_task.h b/be/src/storage/compaction_task.h index 596a2c0b884b9..3597a108242ef 100644 --- a/be/src/storage/compaction_task.h +++ b/be/src/storage/compaction_task.h @@ -277,7 +277,7 @@ class CompactionTask : public BackgroundTask { } std::vector to_replace; _tablet->modify_rowsets({_output_rowset}, _input_rowsets, &to_replace); - _tablet->save_meta(); + _tablet->save_meta(config::skip_schema_in_rowset_meta); Rowset::close_rowsets(_input_rowsets); for (auto& rs : to_replace) { StorageEngine::instance()->add_unused_rowset(rs); diff --git a/be/src/storage/data_dir.cpp b/be/src/storage/data_dir.cpp index e0625cd2f6a76..4c8f6de1f1dce 100644 --- a/be/src/storage/data_dir.cpp +++ b/be/src/storage/data_dir.cpp @@ -391,15 +391,7 @@ Status DataDir::load() { if (!rowset_meta->tablet_schema()) { auto tablet_schema_ptr = tablet->tablet_schema(); rowset_meta->set_tablet_schema(tablet_schema_ptr); - RowsetMetaPB meta_pb; - rowset_meta->get_full_meta_pb(&meta_pb); - Status rs_meta_save_status = RowsetMetaManager::save(get_meta(), rowset_meta->tablet_uid(), meta_pb); - if (!rs_meta_save_status.ok()) { - LOG(WARNING) << "Failed to save rowset meta, rowset=" << rowset_meta->rowset_id() - << " tablet=" << rowset_meta->tablet_id() << " txn_id: " << rowset_meta->txn_id(); - error_rowset_count++; - return true; - } + rowset_meta->set_skip_tablet_schema(true); } Status commit_txn_status = _txn_manager->commit_txn( _kv_store, rowset_meta->partition_id(), rowset_meta->txn_id(), rowset_meta->tablet_id(), @@ -418,15 +410,7 @@ Status DataDir::load() { Status publish_status = tablet->load_rowset(rowset); if (!rowset_meta->tablet_schema()) { rowset_meta->set_tablet_schema(tablet->tablet_schema()); - RowsetMetaPB meta_pb; - rowset_meta->get_full_meta_pb(&meta_pb); - Status rs_meta_save_status = RowsetMetaManager::save(get_meta(), rowset_meta->tablet_uid(), meta_pb); - if (!rs_meta_save_status.ok()) { - LOG(WARNING) << "Failed to save rowset meta, rowset=" << rowset_meta->rowset_id() - << " tablet=" << rowset_meta->tablet_id() << " txn_id: " << rowset_meta->txn_id(); - error_rowset_count++; - return true; - } + rowset_meta->set_skip_tablet_schema(true); } if (!publish_status.ok() && !publish_status.is_already_exist()) { LOG(WARNING) << "Fail to add visible rowset=" << rowset->rowset_id() diff --git a/be/src/storage/olap_common.h b/be/src/storage/olap_common.h index 3a621c6442b99..be77f9cef0389 100644 --- a/be/src/storage/olap_common.h +++ b/be/src/storage/olap_common.h @@ -374,6 +374,16 @@ struct RowsetId { } }; +struct HashOfRowsetId { + size_t operator()(const RowsetId& rowset_id) const { + size_t seed = 0; + seed = HashUtil::hash64(&rowset_id.hi, sizeof(rowset_id.hi), seed); + seed = HashUtil::hash64(&rowset_id.mi, sizeof(rowset_id.mi), seed); + seed = HashUtil::hash64(&rowset_id.lo, sizeof(rowset_id.lo), seed); + return seed; + } +}; + struct TabletSegmentId { int64_t tablet_id = INT64_MAX; uint32_t segment_id = UINT32_MAX; diff --git a/be/src/storage/rowset/rowset_meta.h b/be/src/storage/rowset/rowset_meta.h index d55d3740b660c..83321b62f29b0 100644 --- a/be/src/storage/rowset/rowset_meta.h +++ b/be/src/storage/rowset/rowset_meta.h @@ -236,14 +236,16 @@ class RowsetMeta { // new rowset. // Before calling it, please confirm if you need a complete `rowset_meta` that includes `tablet_schema_pb`. // If not, perhaps `get_meta_pb_without_schema()` is enough. - void get_full_meta_pb(RowsetMetaPB* rs_meta_pb, const TabletSchemaCSPtr& tablet_schema = nullptr) const { + void get_full_meta_pb(RowsetMetaPB* rs_meta_pb, bool skip_schema = false, + const TabletSchemaCSPtr& tablet_schema = nullptr) const { *rs_meta_pb = *_rowset_meta_pb; - const TabletSchemaCSPtr& target_schema = (tablet_schema != nullptr) ? tablet_schema : _schema; - - if (target_schema != nullptr) { - rs_meta_pb->clear_tablet_schema(); - TabletSchemaPB* ts_pb = rs_meta_pb->mutable_tablet_schema(); - target_schema->to_schema_pb(ts_pb); + if (!skip_schema) { + const TabletSchemaCSPtr& target_schema = (tablet_schema != nullptr) ? tablet_schema : _schema; + if (target_schema != nullptr) { + rs_meta_pb->clear_tablet_schema(); + TabletSchemaPB* ts_pb = rs_meta_pb->mutable_tablet_schema(); + target_schema->to_schema_pb(ts_pb); + } } } @@ -274,6 +276,16 @@ class RowsetMeta { bool has_tablet_schema_pb() { return _has_tablet_schema_pb; } + void set_skip_tablet_schema(bool skip_tablet_schema) { _skip_tablet_schema = skip_tablet_schema; } + bool skip_tablet_schema() { return _skip_tablet_schema; } + bool check_schema_id(int64_t latest_tablet_schema_id) { + if (_schema != nullptr && _schema->id() != TabletSchema::invalid_id() && + _schema->id() == latest_tablet_schema_id) { + return true; + } + return false; + } + private: bool _deserialize_from_pb(std::string_view value) { return _rowset_meta_pb->ParseFromArray(value.data(), value.size()); @@ -294,8 +306,8 @@ class RowsetMeta { _schema = TabletSchema::create(_rowset_meta_pb->tablet_schema()); } } - _has_tablet_schema_pb = _rowset_meta_pb->has_tablet_schema(); + _has_tablet_schema_pb = _rowset_meta_pb->has_tablet_schema(); // clear does not release memory but only set it to default value, so we need to copy a new _rowset_meta_pb _rowset_meta_pb->clear_tablet_schema(); std::unique_ptr ptr = std::make_unique(*_rowset_meta_pb); @@ -329,6 +341,7 @@ class RowsetMeta { bool _is_removed_from_rowset_meta = false; TabletSchemaCSPtr _schema = nullptr; bool _has_tablet_schema_pb = false; + bool _skip_tablet_schema = false; }; -} // namespace starrocks +} // namespace starrocks \ No newline at end of file diff --git a/be/src/storage/rowset/rowset_meta_manager.cpp b/be/src/storage/rowset/rowset_meta_manager.cpp index a1a10cedb9f88..a4e9429422bc3 100644 --- a/be/src/storage/rowset/rowset_meta_manager.cpp +++ b/be/src/storage/rowset/rowset_meta_manager.cpp @@ -103,4 +103,11 @@ Status RowsetMetaManager::traverse_rowset_metas( return meta->iterate(META_COLUMN_FAMILY_INDEX, ROWSET_PREFIX, traverse_rowset_meta_func); } +Status RowsetMetaManager::get_rowset_meta_value(KVStore* meta, const TabletUid& tablet_uid, const RowsetId& rowset_id, + std::string* value) { + std::string key = get_rowset_meta_key(tablet_uid, rowset_id); + RETURN_IF_ERROR(meta->get(META_COLUMN_FAMILY_INDEX, key, value)); + return Status::OK(); +} + } // namespace starrocks diff --git a/be/src/storage/rowset/rowset_meta_manager.h b/be/src/storage/rowset/rowset_meta_manager.h index 9495a6d323493..bd2f5a9823fdf 100644 --- a/be/src/storage/rowset/rowset_meta_manager.h +++ b/be/src/storage/rowset/rowset_meta_manager.h @@ -58,6 +58,9 @@ class RowsetMetaManager { static Status traverse_rowset_metas( KVStore* meta, std::function const& func); + + static Status get_rowset_meta_value(KVStore* meta, const TabletUid& tablet_uid, const RowsetId& rowset_id, + std::string* value); }; } // namespace starrocks diff --git a/be/src/storage/tablet.cpp b/be/src/storage/tablet.cpp index 77553716a19b0..1277437ef0fd7 100644 --- a/be/src/storage/tablet.cpp +++ b/be/src/storage/tablet.cpp @@ -83,19 +83,7 @@ Tablet::Tablet(const TabletMetaSharedPtr& tablet_meta, DataDir* data_dir) _cumulative_point(kInvalidCumulativePoint) { // change _rs_graph to _timestamped_version_tracker _timestamped_version_tracker.construct_versioned_tracker(_tablet_meta->all_rs_metas()); - - // if !_tablet_meta->all_rs_metas()[0]->tablet_schema(), - // that mean the tablet_meta is still no upgrade to support-light-schema-change versions. - // Before support-light-schema-change version, rowset metas don't have tablet schema. - // And when upgrade to starrocks support-light-schema-change version, - // all rowset metas will be set the tablet schema from tablet meta. - if (_tablet_meta->all_rs_metas().empty() || !_tablet_meta->all_rs_metas()[0]->tablet_schema()) { - _max_version_schema = BaseTablet::tablet_schema(); - } else { - _max_version_schema = - TabletMeta::rowset_meta_with_max_rowset_version(_tablet_meta->all_rs_metas())->tablet_schema(); - } - + _max_version_schema = BaseTablet::tablet_schema(); MEM_TRACKER_SAFE_CONSUME(GlobalEnv::GetInstance()->tablet_metadata_mem_tracker(), _mem_usage()); } @@ -171,8 +159,8 @@ Status Tablet::init() { // should save tablet meta to remote meta store // if it's a primary replica -void Tablet::save_meta() { - auto st = _tablet_meta->save_meta(_data_dir); +void Tablet::save_meta(bool skip_tablet_schema) { + auto st = _tablet_meta->save_meta(_data_dir, skip_tablet_schema); CHECK(st.ok()) << "fail to save tablet_meta: " << st; } @@ -572,7 +560,11 @@ Status Tablet::add_inc_rowset(const RowsetSharedPtr& rowset, int64_t version) { } RowsetMetaPB rowset_meta_pb; - rowset->rowset_meta()->get_full_meta_pb(&rowset_meta_pb); + if (rowset->rowset_meta()->skip_tablet_schema()) { + rowset_meta_pb = rowset->rowset_meta()->get_meta_pb_without_schema(); + } else { + rowset->rowset_meta()->get_full_meta_pb(&rowset_meta_pb); + } // No matter whether contains the version, the rowset meta should always be saved. TxnManager::publish_txn // will remove the in-memory txn information if Status::AlreadlyExist, but not the committed rowset meta // (RowsetStatePB = COMMITTED) saved in rocksdb. Here modify the rowset to visible, and save it again @@ -621,6 +613,26 @@ Status Tablet::add_inc_rowset(const RowsetSharedPtr& rowset, int64_t version) { return Status::OK(); } +bool Tablet::add_committed_rowset(const RowsetSharedPtr& rowset) { + if (_committed_rs_map.size() >= config::max_committed_without_schema_rowset) { + VLOG(1) << "tablet: " << tablet_id() + << " too many committed without schema rowset : " << _committed_rs_map.size(); + return false; + } + + if (rowset->rowset_meta()->check_schema_id(_max_version_schema->id())) { + _committed_rs_map[rowset->rowset_id()] = rowset; + return true; + } + return false; +} + +void Tablet::erase_committed_rowset(const RowsetSharedPtr& rowset) { + if (rowset != nullptr) { + _committed_rs_map.erase(rowset->rowset_id()); + } +} + void Tablet::_delete_inc_rowset_by_version(const Version& version) { // delete incremental rowset from map _inc_rs_version_map.erase(version); @@ -1367,7 +1379,7 @@ void Tablet::do_tablet_meta_checkpoint() { return; } LOG(INFO) << "start to do tablet meta checkpoint, tablet=" << full_name(); - save_meta(); + save_meta(config::skip_schema_in_rowset_meta); // if save meta successfully, then should remove the rowset meta existing in tablet // meta from rowset meta store for (auto& rs_meta : _tablet_meta->all_rs_metas()) { @@ -1736,9 +1748,27 @@ const TabletSchemaCSPtr Tablet::thread_safe_get_tablet_schema() const { return _max_version_schema; } +// for non-pk tablet, all published rowset will be rewrite when save `tablet_meta` +// for pk tablet, we need to get the rowset which without `tablet_schema` and rewrite +// the rowsets in `_committed_rs_map` is committed success but not publish yet, so if we update the +// tablet schema, we need to rewrite. +void Tablet::_get_rewrite_meta_rs(std::vector& rewrite_meta_rs) { + for (auto& [_, rs] : _committed_rs_map) { + if (rs->rowset_meta()->skip_tablet_schema()) { + rewrite_meta_rs.emplace_back(rs); + } + } + + if (_updates) { + _updates->rewrite_rs_meta(); + } +} + void Tablet::update_max_version_schema(const TabletSchemaCSPtr& tablet_schema) { std::lock_guard l0(_meta_lock); std::lock_guard l1(_schema_lock); + DeferOp defer([&]() { _update_schema_running.store(false); }); + _update_schema_running.store(true); // Double Check for concurrent update if (!_max_version_schema || tablet_schema->schema_version() > _max_version_schema->schema_version()) { if (tablet_schema->id() == TabletSchema::invalid_id()) { @@ -1746,7 +1776,10 @@ void Tablet::update_max_version_schema(const TabletSchemaCSPtr& tablet_schema) { } else { _max_version_schema = GlobalTabletSchemaMap::Instance()->emplace(tablet_schema).first; } - _tablet_meta->save_tablet_schema(_max_version_schema, _data_dir); + std::vector rewrite_meta_rs; + _get_rewrite_meta_rs(rewrite_meta_rs); + _tablet_meta->save_tablet_schema(_max_version_schema, rewrite_meta_rs, _data_dir); + _committed_rs_map.clear(); } } diff --git a/be/src/storage/tablet.h b/be/src/storage/tablet.h index 2905e29b3f8b1..01d62c778759f 100644 --- a/be/src/storage/tablet.h +++ b/be/src/storage/tablet.h @@ -57,6 +57,7 @@ #include "storage/utils.h" #include "storage/version_graph.h" #include "util/once.h" +#include "util/phmap/phmap.h" namespace starrocks { @@ -99,7 +100,7 @@ class Tablet : public BaseTablet { void register_tablet_into_dir(); void deregister_tablet_from_dir(); - void save_meta(); + void save_meta(bool skip_tablet_schema = false); // Used in clone task, to update local meta when finishing a clone job [[nodiscard]] Status revise_tablet_meta(const std::vector& rowsets_to_clone, const std::vector& versions_to_delete); @@ -331,6 +332,13 @@ class Tablet : public BaseTablet { // set true when start to drop tablet. only set in `TabletManager::drop_tablet` right now void set_is_dropping(bool is_dropping) { _is_dropping = is_dropping; } + [[nodiscard]] bool is_update_schema_running() const { return _update_schema_running.load(); } + void set_update_schema_running(bool is_running) { _update_schema_running.store(is_running); } + std::shared_mutex& get_schema_lock() { return _schema_lock; } + bool add_committed_rowset(const RowsetSharedPtr& rowset); + void erase_committed_rowset(const RowsetSharedPtr& rowset); + int64_t committed_rowset_size() { return _committed_rs_map.size(); } + void on_shutdown() override; private: @@ -367,6 +375,7 @@ class Tablet : public BaseTablet { // check whether there is useless binlog, and update the in-memory TabletMeta to the state after // those binlog is deleted. Return true the meta has been changed, and needs to be persisted bool _check_useless_binlog_and_update_meta(int64_t current_second); + void _get_rewrite_meta_rs(std::vector& rewrite_meta_rs); friend class TabletUpdates; static const int64_t kInvalidCumulativePoint = -1; @@ -410,6 +419,12 @@ class Tablet : public BaseTablet { // this policy is judged and computed by TimestampedVersionTracker. std::unordered_map _stale_rs_version_map; + // Keep the rowsets committed but not publish which rowset meta without schema + phmap::parallel_flat_hash_map, HashOfRowsetId, std::equal_to, + std::allocator>>, 5, std::mutex, + true> + _committed_rs_map; + // States used for updatable tablets only std::unique_ptr _updates; @@ -448,6 +463,7 @@ class Tablet : public BaseTablet { bool _will_be_force_replaced = false; std::atomic _is_dropping{false}; + std::atomic _update_schema_running{false}; }; inline bool Tablet::init_succeeded() { diff --git a/be/src/storage/tablet_meta.cpp b/be/src/storage/tablet_meta.cpp index 68a91ea5dd3f7..2214553d51bd7 100644 --- a/be/src/storage/tablet_meta.cpp +++ b/be/src/storage/tablet_meta.cpp @@ -41,6 +41,7 @@ #include "storage/metadata_util.h" #include "storage/olap_common.h" #include "storage/protobuf_file.h" +#include "storage/rowset/rowset_meta_manager.h" #include "storage/tablet_meta_manager.h" #include "storage/tablet_schema_map.h" #include "storage/tablet_updates.h" @@ -192,26 +193,43 @@ Status TabletMeta::save(const string& file_path, const TabletMetaPB& tablet_meta return file.save(tablet_meta_pb, true); } -Status TabletMeta::save_meta(DataDir* data_dir) { +Status TabletMeta::save_meta(DataDir* data_dir, bool skip_tablet_schema) { std::unique_lock wrlock(_meta_lock); - return _save_meta(data_dir); + return _save_meta(data_dir, skip_tablet_schema); } -void TabletMeta::save_tablet_schema(const TabletSchemaCSPtr& tablet_schema, DataDir* data_dir) { +void TabletMeta::save_tablet_schema(const TabletSchemaCSPtr& tablet_schema, std::vector& committed_rs, + DataDir* data_dir) { std::unique_lock wrlock(_meta_lock); _schema = tablet_schema; - _save_meta(data_dir); + for (auto& rs : committed_rs) { + RowsetMetaPB meta_pb; + rs->rowset_meta()->get_full_meta_pb(&meta_pb); + Status res = RowsetMetaManager::save(data_dir->get_meta(), tablet_uid(), meta_pb); + LOG_IF(FATAL, !res.ok()) << "failed to save rowset " << rs->rowset_id() << " to local meta store: " << res; + rs->rowset_meta()->set_skip_tablet_schema(false); + } + + (void)_save_meta(data_dir, false); } -Status TabletMeta::_save_meta(DataDir* data_dir) { +Status TabletMeta::_save_meta(DataDir* data_dir, bool skip_tablet_schema) { LOG_IF(FATAL, _tablet_uid.hi == 0 && _tablet_uid.lo == 0) << "tablet_uid is invalid" << " tablet=" << full_name() << " _tablet_uid=" << _tablet_uid.to_string(); TabletMetaPB tablet_meta_pb; - to_meta_pb(&tablet_meta_pb); + to_meta_pb(&tablet_meta_pb, skip_tablet_schema); Status st = TabletMetaManager::save(data_dir, tablet_meta_pb); LOG_IF(FATAL, !st.ok()) << "fail to save tablet meta:" << st << ". tablet_id=" << tablet_id() << ", schema_hash=" << schema_hash(); + if (!skip_tablet_schema) { + for (auto& rs : _rs_metas) { + rs->set_skip_tablet_schema(false); + } + for (const auto& rs : _inc_rs_metas) { + rs->set_skip_tablet_schema(false); + } + } return st; } @@ -304,6 +322,7 @@ void TabletMeta::init_from_pb(TabletMetaPB* ptablet_meta_pb, bool use_tablet_sch } if (!rs_meta->tablet_schema()) { rs_meta->set_tablet_schema(_schema); + rs_meta->set_skip_tablet_schema(true); } _rs_metas.push_back(std::move(rs_meta)); } @@ -311,6 +330,7 @@ void TabletMeta::init_from_pb(TabletMetaPB* ptablet_meta_pb, bool use_tablet_sch auto rs_meta = std::make_shared(it); if (!rs_meta->tablet_schema()) { rs_meta->set_tablet_schema(_schema); + rs_meta->set_skip_tablet_schema(true); } _inc_rs_metas.push_back(std::move(rs_meta)); } @@ -342,7 +362,7 @@ void TabletMeta::init_from_pb(TabletMetaPB* ptablet_meta_pb, bool use_tablet_sch } } -void TabletMeta::to_meta_pb(TabletMetaPB* tablet_meta_pb) { +void TabletMeta::to_meta_pb(TabletMetaPB* tablet_meta_pb, bool skip_tablet_schema) { tablet_meta_pb->set_table_id(table_id()); tablet_meta_pb->set_partition_id(partition_id()); tablet_meta_pb->set_tablet_id(tablet_id()); @@ -373,12 +393,19 @@ void TabletMeta::to_meta_pb(TabletMetaPB* tablet_meta_pb) { tablet_meta_pb->set_tablet_state(PB_SHUTDOWN); break; } - for (auto& rs : _rs_metas) { - rs->get_full_meta_pb(tablet_meta_pb->add_rs_metas()); + bool skip_schema = false; + if (skip_tablet_schema && _schema != nullptr && rs->tablet_schema() != nullptr) { + skip_schema = (_schema->id() != TabletSchema::invalid_id()) && (_schema->id() == rs->tablet_schema()->id()); + } + rs->get_full_meta_pb(tablet_meta_pb->add_rs_metas(), skip_schema); } for (const auto& rs : _inc_rs_metas) { - rs->get_full_meta_pb(tablet_meta_pb->add_inc_rs_metas()); + bool skip_schema = false; + if (skip_tablet_schema && _schema != nullptr && rs->tablet_schema() != nullptr) { + skip_schema = (_schema->id() != TabletSchema::invalid_id()) && (_schema->id() == rs->tablet_schema()->id()); + } + rs->get_full_meta_pb(tablet_meta_pb->add_inc_rs_metas(), skip_schema); } if (_schema != nullptr) { _schema->to_schema_pb(tablet_meta_pb->mutable_schema()); diff --git a/be/src/storage/tablet_meta.h b/be/src/storage/tablet_meta.h index 24e8497c693ee..269d0c15b8d90 100644 --- a/be/src/storage/tablet_meta.h +++ b/be/src/storage/tablet_meta.h @@ -123,13 +123,13 @@ class TabletMeta { [[nodiscard]] static Status save(const std::string& file_path, const TabletMetaPB& tablet_meta_pb); [[nodiscard]] static Status reset_tablet_uid(const std::string& file_path); static std::string construct_header_file_path(const std::string& schema_hash_path, int64_t tablet_id); - [[nodiscard]] Status save_meta(DataDir* data_dir); + Status save_meta(DataDir* data_dir, bool skip_tablet_schema = false); [[nodiscard]] Status serialize(std::string* meta_binary); [[nodiscard]] Status deserialize(std::string_view data); void init_from_pb(TabletMetaPB* ptablet_meta_pb, bool use_tablet_schema_map = true); - void to_meta_pb(TabletMetaPB* tablet_meta_pb); + void to_meta_pb(TabletMetaPB* tablet_meta_pb, bool skip_tablet_schema = false); void to_json(std::string* json_string, json2pb::Pb2JsonOptions& options); TabletTypePB tablet_type() const { return _tablet_type; } @@ -164,7 +164,8 @@ class TabletMeta { const TabletSchema& tablet_schema() const; void set_tablet_schema(const TabletSchemaCSPtr& tablet_schema) { _schema = tablet_schema; } - void save_tablet_schema(const TabletSchemaCSPtr& tablet_schema, DataDir* data_dir); + void save_tablet_schema(const TabletSchemaCSPtr& tablet_schema, std::vector& committed_rs, + DataDir* data_dir); TabletSchemaCSPtr& tablet_schema_ptr() { return _schema; } const TabletSchemaCSPtr& tablet_schema_ptr() const { return _schema; } @@ -239,7 +240,7 @@ class TabletMeta { private: int64_t _mem_usage() const { return sizeof(TabletMeta); } - Status _save_meta(DataDir* data_dir); + Status _save_meta(DataDir* data_dir, bool skip_tablet_schema = false); // _del_pred_array is ignored to compare. friend bool operator==(const TabletMeta& a, const TabletMeta& b); diff --git a/be/src/storage/tablet_meta_manager.cpp b/be/src/storage/tablet_meta_manager.cpp index 8d38a0f346013..8feb6ca14fbeb 100644 --- a/be/src/storage/tablet_meta_manager.cpp +++ b/be/src/storage/tablet_meta_manager.cpp @@ -1802,4 +1802,18 @@ Status TabletMetaManager::clear_pending_rowset(DataDir* store, WriteBatch* batch return store->get_meta()->OptDeleteRange(META_COLUMN_FAMILY_INDEX, lower, upper, batch); } +Status TabletMetaManager::get_committed_rowset_meta_value(DataDir* store, int64_t tablet_id, uint32_t rowset_seg_id, + std::string* meta_value) { + std::string rowset_key = encode_meta_rowset_key(tablet_id, rowset_seg_id); + RETURN_IF_ERROR(store->get_meta()->get(META_COLUMN_FAMILY_INDEX, rowset_key, meta_value)); + return Status::OK(); +} + +Status TabletMetaManager::get_pending_committed_rowset_meta_value(DataDir* store, int64_t tablet_id, int64_t version, + std::string* meta_value) { + std::string rowset_key = encode_meta_pending_rowset_key(tablet_id, version); + RETURN_IF_ERROR(store->get_meta()->get(META_COLUMN_FAMILY_INDEX, rowset_key, meta_value)); + return Status::OK(); +} + } // namespace starrocks diff --git a/be/src/storage/tablet_meta_manager.h b/be/src/storage/tablet_meta_manager.h index 552aebbfc2bc9..10bb16a4a1733 100644 --- a/be/src/storage/tablet_meta_manager.h +++ b/be/src/storage/tablet_meta_manager.h @@ -264,6 +264,12 @@ class TabletMetaManager { static Status remove_table_persistent_index_meta(DataDir* store, TTableId table_id); static Status remove_tablet_persistent_index_meta(DataDir* store, TTabletId table_id); + + static Status get_committed_rowset_meta_value(DataDir* store, int64_t tablet_id, uint32_t rowset_seg_id, + std::string* meta_value); + + static Status get_pending_committed_rowset_meta_value(DataDir* store, int64_t tablet_id, int64_t version, + std::string* meta_value); }; } // namespace starrocks diff --git a/be/src/storage/tablet_updates.cpp b/be/src/storage/tablet_updates.cpp index 15f9f8673879f..78724811d9fb9 100644 --- a/be/src/storage/tablet_updates.cpp +++ b/be/src/storage/tablet_updates.cpp @@ -171,8 +171,11 @@ Status TabletUpdates::_load_rowsets_and_check_consistency(std::set& un _rowsets.clear(); RETURN_IF_ERROR(TabletMetaManager::rowset_iterate( _tablet.data_dir(), _tablet.tablet_id(), [&](const RowsetMetaSharedPtr& rowset_meta) -> bool { + if (!rowset_meta->tablet_schema()) { + rowset_meta->set_tablet_schema(_tablet.tablet_schema()); + rowset_meta->set_skip_tablet_schema(true); + } RowsetSharedPtr rowset; - auto st = RowsetFactory::create_rowset(_tablet.tablet_schema(), _tablet.schema_hash_path(), rowset_meta, &rowset); if (st.ok()) { @@ -648,7 +651,12 @@ Status TabletUpdates::rowset_commit(int64_t version, const RowsetSharedPtr& rows _ignore_rowset_commit(version, rowset); } else { RowsetMetaPB meta_pb; - rowset->rowset_meta()->get_full_meta_pb(&meta_pb); + bool skip_schema = !_tablet.is_update_schema_running() && rowset->rowset_meta()->skip_tablet_schema(); + if (skip_schema) { + meta_pb = rowset->rowset_meta()->get_meta_pb_without_schema(); + } else { + rowset->rowset_meta()->get_full_meta_pb(&meta_pb); + } st = TabletMetaManager::pending_rowset_commit( _tablet.data_dir(), _tablet.tablet_id(), version, meta_pb, RowsetMetaManager::get_rowset_meta_key(_tablet.tablet_uid(), rowset->rowset_id())); @@ -658,6 +666,9 @@ Status TabletUpdates::rowset_commit(int64_t version, const RowsetSharedPtr& rows << _debug_string(false, true); return st; } + if (!skip_schema) { + rowset->rowset_meta()->set_skip_tablet_schema(false); + } VLOG(2) << "add rowset to pending commits tablet:" << _tablet.tablet_id() << " version:" << version << " txn_id: " << rowset->txn_id() << " #pending:" << _pending_commits.size(); } @@ -728,7 +739,12 @@ Status TabletUpdates::_rowset_commit_unlocked(int64_t version, const RowsetShare rowset->make_commit(version, rowsetid); span->AddEvent("save_meta_begin"); RowsetMetaPB meta_pb; - rowset->rowset_meta()->get_full_meta_pb(&meta_pb); + bool skip_schema = !_tablet.is_update_schema_running() && rowset->rowset_meta()->skip_tablet_schema(); + if (skip_schema) { + meta_pb = rowset->rowset_meta()->get_meta_pb_without_schema(); + } else { + rowset->rowset_meta()->get_full_meta_pb(&meta_pb); + } auto st = TabletMetaManager::rowset_commit( _tablet.data_dir(), _tablet.tablet_id(), _next_log_id, &edit, meta_pb, RowsetMetaManager::get_rowset_meta_key(_tablet.tablet_uid(), rowset->rowset_id())); @@ -737,6 +753,9 @@ Status TabletUpdates::_rowset_commit_unlocked(int64_t version, const RowsetShare LOG(WARNING) << "rowset commit failed: " << st << " " << _debug_string(false, false); return st; } + if (!skip_schema) { + rowset->rowset_meta()->set_skip_tablet_schema(false); + } // apply in-memory state after commit success _next_log_id++; _next_rowset_id += rowsetid_add; @@ -2584,6 +2603,13 @@ void TabletUpdates::remove_expired_versions(int64_t expire_time) { } } + // rewrite rowset meta which without tablet schema to avoid `update_tablet_schema` cost + // too much time. + { + std::unique_lock wrlock(_tablet.get_header_lock()); + rewrite_rs_meta(); + } + // GC works that can be done outside of lock if (num_version_removed > 0) { { @@ -3799,7 +3825,7 @@ Status TabletUpdates::link_from(Tablet* base_tablet, int64_t request_version, Ch // use src_rowset's meta as base, change some fields to new tablet auto& rowset_meta_pb = new_rowset_info.rowset_meta_pb; // reset rowset schema to the latest one - src_rowset.rowset_meta()->get_full_meta_pb(&rowset_meta_pb, _tablet.tablet_schema()); + src_rowset.rowset_meta()->get_full_meta_pb(&rowset_meta_pb, false, _tablet.tablet_schema()); rowset_meta_pb.set_deprecated_rowset_id(0); rowset_meta_pb.set_rowset_id(rid.to_string()); rowset_meta_pb.set_rowset_seg_id(new_rowset_info.rowset_id); @@ -5552,4 +5578,50 @@ void TabletUpdates::_reset_apply_status(const EditVersionInfo& version_info_appl } } +void TabletUpdates::rewrite_rs_meta() { + std::unordered_map pending_rs; + std::vector published_rs; + { + std::lock_guard lg(_lock); + for (auto& [_, rs] : _rowsets) { + if (rs->rowset_meta()->skip_tablet_schema()) { + published_rs.emplace_back(rs); + } + } + + for (auto& [version, rs] : _pending_commits) { + if (rs->rowset_meta()->skip_tablet_schema()) { + pending_rs[version] = rs; + } + } + } + + for (auto& [version, rs] : _pending_commits) { + RowsetMetaPB meta_pb; + rs->rowset_meta()->get_full_meta_pb(&meta_pb); + Status st = TabletMetaManager::pending_rowset_commit( + _tablet.data_dir(), _tablet.tablet_id(), version, meta_pb, + RowsetMetaManager::get_rowset_meta_key(_tablet.tablet_uid(), rs->rowset_id())); + LOG_IF(FATAL, !st.ok()) << "fail to save pending rowset meta:" << st << ". tablet_id=" << _tablet.tablet_id() + << ", rowset_id=" << rs->rowset_id(); + rs->rowset_meta()->set_skip_tablet_schema(false); + } + + auto kv_store = _tablet.data_dir()->get_meta(); + rocksdb::WriteBatch wb; + for (auto& rs : published_rs) { + RowsetMetaPB meta_pb; + rs->rowset_meta()->get_full_meta_pb(&meta_pb); + Status st = TabletMetaManager::put_rowset_meta(_tablet.data_dir(), &wb, _tablet.tablet_id(), meta_pb); + LOG_IF(FATAL, !st.ok()) << "fail to put published rowset meta:" << st << ". tablet_id=" << _tablet.tablet_id() + << ", rowset_id=" << rs->rowset_id(); + } + Status st = kv_store->write_batch(&wb); + LOG_IF(FATAL, !st.ok()) << "fail to write published rowset meta:" << st << ". tablet_id=" << _tablet.tablet_id() + << ", rowset nul=" << published_rs.size(); + for (auto& rs : published_rs) { + rs->rowset_meta()->set_skip_tablet_schema(false); + } +} + } // namespace starrocks diff --git a/be/src/storage/tablet_updates.h b/be/src/storage/tablet_updates.h index c45b1be8d4e65..b34fc395481a1 100644 --- a/be/src/storage/tablet_updates.h +++ b/be/src/storage/tablet_updates.h @@ -378,6 +378,8 @@ class TabletUpdates { } } + void rewrite_rs_meta(); + private: friend class Tablet; friend class PrimaryIndex; diff --git a/be/src/storage/txn_manager.cpp b/be/src/storage/txn_manager.cpp index 9c2cf12c69118..ee182e607249d 100644 --- a/be/src/storage/txn_manager.cpp +++ b/be/src/storage/txn_manager.cpp @@ -46,6 +46,7 @@ #include "storage/data_dir.h" #include "storage/rowset/rowset_meta_manager.h" #include "storage/storage_engine.h" +#include "storage/tablet_manager.h" #include "storage/tablet_meta.h" #include "util/runtime_profile.h" #include "util/scoped_cleanup.h" @@ -266,17 +267,46 @@ Status TxnManager::commit_txn(KVStore* meta, TPartitionId partition_id, TTransac // if not in recovery mode, then should persist the meta to meta env // save meta need access disk, it maybe very slow, so that it is not in global txn lock // it is under a single txn lock + TabletSharedPtr tablet = StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id); + if (tablet == nullptr) { + return Status::InternalError("tablet not exist during commit txn"); + } if (!is_recovery) { + Status st; RowsetMetaPB rowset_meta_pb; - rowset_ptr->rowset_meta()->get_full_meta_pb(&rowset_meta_pb); - Status st = RowsetMetaManager::save(meta, tablet_uid, rowset_meta_pb); + bool skip_schema = config::skip_schema_in_rowset_meta && + !rowset_ptr->rowset_meta()->get_meta_pb_without_schema().has_txn_meta() && + !tablet->is_update_schema_running(); + if (skip_schema) { + // avoid `update_max_version_schema` and `commit_txn` run concurrency, so hold a read + // lock for `schema_lock` is enough + std::shared_lock l(tablet->get_schema_lock()); + skip_schema = tablet->add_committed_rowset(rowset_ptr); + if (skip_schema) { + rowset_ptr->rowset_meta()->set_skip_tablet_schema(true); + rowset_meta_pb = rowset_ptr->rowset_meta()->get_meta_pb_without_schema(); + } else { + rowset_ptr->rowset_meta()->get_full_meta_pb(&rowset_meta_pb); + } + st = RowsetMetaManager::save(meta, tablet_uid, rowset_meta_pb); + } else { + rowset_ptr->rowset_meta()->get_full_meta_pb(&rowset_meta_pb); + st = RowsetMetaManager::save(meta, tablet_uid, rowset_meta_pb); + } if (!st.ok()) { + if (skip_schema) { + tablet->erase_committed_rowset(rowset_ptr); + } LOG(WARNING) << "Fail to save committed rowset. " << "tablet_id: " << tablet_id << ", txn_id: " << transaction_id << ", rowset_id: " << rowset_ptr->rowset_id(); return Status::InternalError( fmt::format("Fail to save committed rowset. tablet_id: {}, txn_id: {}", tablet_id, key.second)); } + } else { + if (rowset_ptr->rowset_meta()->skip_tablet_schema()) { + tablet->add_committed_rowset(rowset_ptr); + } } { @@ -322,6 +352,7 @@ Status TxnManager::publish_txn(TPartitionId partition_id, const TabletSharedPtr& return st; } } + tablet->erase_committed_rowset(rowset); std::unique_lock wrlock(_get_txn_map_lock(transaction_id)); txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id); pair key(partition_id, transaction_id); @@ -490,6 +521,10 @@ Status TxnManager::delete_txn(KVStore* meta, TPartitionId partition_id, TTransac fmt::format("Fail to delete txn because rowset is already published. tablet_id: {}, txn_id: {}", tablet_info.tablet_id, transaction_id)); } else { + TabletSharedPtr tablet = StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id); + if (tablet != nullptr) { + tablet->erase_committed_rowset(load_info.rowset); + } (void)RowsetMetaManager::remove(meta, tablet_uid, load_info.rowset->rowset_id()); #ifndef BE_TEST StorageEngine::instance()->add_unused_rowset(load_info.rowset); @@ -550,6 +585,10 @@ void TxnManager::force_rollback_tablet_related_txns(KVStore* meta, TTabletId tab << ", rowset id: " << load_info.rowset->rowset_id(); (void)RowsetMetaManager::remove(meta, tablet_uid, load_info.rowset->rowset_id()); } + TabletSharedPtr tablet = StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id); + if (tablet != nullptr) { + tablet->erase_committed_rowset(load_info.rowset); + } LOG(INFO) << "remove tablet related txn." << " partition_id: " << it->first.first << ", txn_id: " << it->first.second << ", tablet: " << tablet_info.to_string() << ", rowset: " diff --git a/be/test/storage/tablet_updates_test.cpp b/be/test/storage/tablet_updates_test.cpp index 8e8d78aa82b2e..8d15f9d057a68 100644 --- a/be/test/storage/tablet_updates_test.cpp +++ b/be/test/storage/tablet_updates_test.cpp @@ -19,6 +19,8 @@ #include "script/script.h" #include "storage/local_primary_key_recover.h" #include "storage/primary_key_dump.h" +#include "storage/rowset/rowset_meta_manager.h" +#include "storage/txn_manager.h" #include "util/failpoint/fail_point.h" namespace starrocks { @@ -3732,4 +3734,155 @@ TEST_F(TabletUpdatesTest, test_drop_tablet_with_keep_meta_and_files) { ASSERT_TRUE(_tablet->updates()->is_apply_stop()); } +TEST_F(TabletUpdatesTest, test_skip_schema) { + int N = 100; + srand(GetCurrentTimeMicros()); + _tablet = create_tablet(rand(), rand(), false, rand(), 0); + std::vector keys; + for (int i = 0; i < N; i++) { + keys.push_back(i); + } + _tablet->updates()->stop_apply(true); + auto rs1 = create_rowset(_tablet, keys); + ASSERT_EQ(false, rs1->rowset_meta()->skip_tablet_schema()); + PUniqueId load_id; + load_id.set_hi(1000); + load_id.set_lo(1000); + ASSERT_TRUE(StorageEngine::instance() + ->txn_manager() + ->commit_txn(_tablet->data_dir()->get_meta(), 100, 100, _tablet->tablet_id(), + _tablet->schema_hash(), _tablet->tablet_uid(), load_id, rs1, false) + .ok()); + ASSERT_EQ(true, rs1->rowset_meta()->skip_tablet_schema()); + ASSERT_EQ(1, _tablet->committed_rowset_size()); + ASSERT_TRUE(rs1->tablet_schema() != nullptr); + + { + std::string meta_value; + ASSERT_TRUE(RowsetMetaManager::get_rowset_meta_value(_tablet->data_dir()->get_meta(), _tablet->tablet_uid(), + rs1->rowset_id(), &meta_value) + .ok()); + bool parse_ok = false; + auto rs_meta = RowsetMeta(meta_value, &parse_ok); + ASSERT_EQ(true, parse_ok); + ASSERT_TRUE(rs_meta.tablet_schema() == nullptr); + } + + ASSERT_TRUE(StorageEngine::instance()->txn_manager()->publish_txn(100, _tablet, 100, 2, rs1, 0).ok()); + ASSERT_EQ(0, _tablet->committed_rowset_size()); + + { + std::string meta_value; + ASSERT_TRUE(TabletMetaManager::get_committed_rowset_meta_value(_tablet->data_dir(), _tablet->tablet_id(), + rs1->rowset_meta()->get_rowset_seg_id(), + &meta_value) + .ok()); + bool parse_ok = false; + auto rs_meta = RowsetMeta(meta_value, &parse_ok); + ASSERT_EQ(true, parse_ok); + ASSERT_TRUE(rs_meta.tablet_schema() == nullptr); + } + + auto rs2 = create_rowset(_tablet, keys); + ASSERT_EQ(false, rs2->rowset_meta()->skip_tablet_schema()); + load_id.set_hi(1001); + load_id.set_lo(1001); + ASSERT_TRUE(StorageEngine::instance() + ->txn_manager() + ->commit_txn(_tablet->data_dir()->get_meta(), 101, 101, _tablet->tablet_id(), + _tablet->schema_hash(), _tablet->tablet_uid(), load_id, rs2, false) + .ok()); + ASSERT_EQ(true, rs2->rowset_meta()->skip_tablet_schema()); + ASSERT_EQ(1, _tablet->committed_rowset_size()); + ASSERT_TRUE(rs2->tablet_schema() != nullptr); + ASSERT_TRUE(StorageEngine::instance()->txn_manager()->publish_txn(101, _tablet, 100, 4, rs2, 0).ok()); + ASSERT_EQ(0, _tablet->committed_rowset_size()); + + { + std::string meta_value; + ASSERT_TRUE(TabletMetaManager::get_pending_committed_rowset_meta_value(_tablet->data_dir(), + _tablet->tablet_id(), 4, &meta_value) + .ok()); + bool parse_ok = false; + auto rs_meta = RowsetMeta(meta_value, &parse_ok); + ASSERT_EQ(true, parse_ok); + ASSERT_TRUE(rs_meta.tablet_schema() == nullptr); + } + + _tablet->updates()->rewrite_rs_meta(); + { + std::string rs1_meta_value; + ASSERT_TRUE(TabletMetaManager::get_committed_rowset_meta_value(_tablet->data_dir(), _tablet->tablet_id(), + rs1->rowset_meta()->get_rowset_seg_id(), + &rs1_meta_value) + .ok()); + bool parse_ok = false; + auto rs1_meta = RowsetMeta(rs1_meta_value, &parse_ok); + ASSERT_EQ(true, parse_ok); + ASSERT_TRUE(rs1_meta.tablet_schema() != nullptr); + + parse_ok = false; + std::string rs2_meta_value; + ASSERT_TRUE(TabletMetaManager::get_pending_committed_rowset_meta_value(_tablet->data_dir(), + _tablet->tablet_id(), 4, &rs2_meta_value) + .ok()); + auto rs2_meta = RowsetMeta(rs2_meta_value, &parse_ok); + ASSERT_EQ(true, parse_ok); + ASSERT_TRUE(rs2_meta.tablet_schema() != nullptr); + } + + auto rs3 = create_rowset(_tablet, keys); + ASSERT_EQ(false, rs3->rowset_meta()->skip_tablet_schema()); + _tablet->set_update_schema_running(true); + load_id.set_hi(1002); + load_id.set_lo(1002); + ASSERT_TRUE(StorageEngine::instance() + ->txn_manager() + ->commit_txn(_tablet->data_dir()->get_meta(), 102, 102, _tablet->tablet_id(), + _tablet->schema_hash(), _tablet->tablet_uid(), load_id, rs3, false) + .ok()); + ASSERT_EQ(false, rs3->rowset_meta()->skip_tablet_schema()); + ASSERT_EQ(0, _tablet->committed_rowset_size()); + ASSERT_TRUE(rs3->tablet_schema() != nullptr); + { + std::string meta_value; + ASSERT_TRUE(RowsetMetaManager::get_rowset_meta_value(_tablet->data_dir()->get_meta(), _tablet->tablet_uid(), + rs3->rowset_id(), &meta_value) + .ok()); + bool parse_ok = false; + auto rs_meta = RowsetMeta(meta_value, &parse_ok); + ASSERT_EQ(true, parse_ok); + ASSERT_TRUE(rs_meta.tablet_schema() != nullptr); + } + ASSERT_TRUE(StorageEngine::instance()->txn_manager()->publish_txn(102, _tablet, 102, 3, rs3, 0).ok()); + + _tablet->set_update_schema_running(false); + auto rs4 = create_rowset(_tablet, keys); + ASSERT_EQ(false, rs4->rowset_meta()->skip_tablet_schema()); + load_id.set_hi(1003); + load_id.set_lo(1003); + ASSERT_TRUE(StorageEngine::instance() + ->txn_manager() + ->commit_txn(_tablet->data_dir()->get_meta(), 103, 103, _tablet->tablet_id(), + _tablet->schema_hash(), _tablet->tablet_uid(), load_id, rs4, false) + .ok()); + ASSERT_EQ(true, rs4->rowset_meta()->skip_tablet_schema()); + ASSERT_EQ(1, _tablet->committed_rowset_size()); + ASSERT_TRUE(rs4->tablet_schema() != nullptr); + + { + auto tmp_tablet = create_tablet(rand(), rand(), false, _tablet->tablet_schema()->id() + 1, + _tablet->tablet_schema()->schema_version() + 1); + auto new_schema = tmp_tablet->tablet_schema(); + auto old_schema_id = _tablet->tablet_schema()->id(); + _tablet->update_max_version_schema(new_schema); + ASSERT_EQ(0, _tablet->committed_rowset_size()); + ASSERT_EQ(false, rs4->rowset_meta()->skip_tablet_schema()); + ASSERT_EQ(rs4->tablet_schema()->id(), old_schema_id); + ASSERT_EQ(rs3->tablet_schema()->id(), old_schema_id); + ASSERT_EQ(rs2->tablet_schema()->id(), old_schema_id); + ASSERT_EQ(rs1->tablet_schema()->id(), old_schema_id); + } +} + } // namespace starrocks diff --git a/be/test/storage/tablet_updates_test.h b/be/test/storage/tablet_updates_test.h index 6dd668e8ee611..13b54c5299cc5 100644 --- a/be/test/storage/tablet_updates_test.h +++ b/be/test/storage/tablet_updates_test.h @@ -377,7 +377,9 @@ class TabletUpdatesTest : public testing::Test { return *writer->build(); } - TabletSharedPtr create_tablet(int64_t tablet_id, int32_t schema_hash, bool multi_column_pk = false) { + TabletSharedPtr create_tablet(int64_t tablet_id, int32_t schema_hash, bool multi_column_pk = false, + int64_t schema_id = 0, int32_t schema_version = 0) { + srand(GetCurrentTimeMicros()); TCreateTabletReq request; request.tablet_id = tablet_id; request.__set_version(1); @@ -386,6 +388,8 @@ class TabletUpdatesTest : public testing::Test { request.tablet_schema.short_key_column_count = 1; request.tablet_schema.keys_type = TKeysType::PRIMARY_KEYS; request.tablet_schema.storage_type = TStorageType::COLUMN; + request.tablet_schema.__set_id(schema_id); + request.tablet_schema.__set_schema_version(schema_version); if (multi_column_pk) { TColumn pk1;