From 3f46ddea2696861491843f3b60eb3abe9c7d8293 Mon Sep 17 00:00:00 2001 From: qiyanghe1998 Date: Mon, 18 Jul 2022 18:25:51 +0000 Subject: [PATCH 01/15] add incremental backup thread --- rocksdb_admin/admin_handler.cpp | 16 +++++++++++++++- rocksdb_admin/admin_handler.h | 3 +++ 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/rocksdb_admin/admin_handler.cpp b/rocksdb_admin/admin_handler.cpp index e6c1b333..03fafbdb 100644 --- a/rocksdb_admin/admin_handler.cpp +++ b/rocksdb_admin/admin_handler.cpp @@ -136,6 +136,14 @@ DEFINE_int32(async_delete_dbs_wait_sec, DEFINE_bool(enable_async_incremental_backup_dbs, false, "Enable incremental backup for db files"); +DEFINE_int32(async_incremental_backup_dbs_frequency_sec, + 60, + "How frequently in sec to check the dbs need deleting in async way"); + +DEFINE_int32(async_incremental_backup_dbs_wait_sec, + 60, + "How long in sec to wait between the dbs deletion"); + #if __GNUC__ >= 8 using folly::CPUThreadPoolExecutor; using folly::LifoSemMPMCQueue; @@ -458,7 +466,8 @@ AdminHandler::AdminHandler( , meta_db_(OpenMetaDB()) , allow_overlapping_keys_segments_() , num_current_s3_sst_downloadings_(0) - , stop_db_deletion_thread_(false) { + , stop_db_deletion_thread_(false) + , stop_db_incremental_backup_thread_ (false) { if (db_manager_ == nullptr) { db_manager_ = CreateDBBasedOnConfig(rocksdb_options_); } @@ -510,6 +519,11 @@ AdminHandler::~AdminHandler() { stop_db_deletion_thread_ = true; db_deletion_thread_->join(); } + + if (FLAGS_enable_async_incremental_backup_dbs) { + stop_db_incremental_backup_thread_ = true; + db_incremental_backup_thread_->join(); + } } std::shared_ptr AdminHandler::getDB( diff --git a/rocksdb_admin/admin_handler.h b/rocksdb_admin/admin_handler.h index be68deb4..224dfaac 100644 --- a/rocksdb_admin/admin_handler.h +++ b/rocksdb_admin/admin_handler.h @@ -212,6 +212,9 @@ class AdminHandler : virtual public AdminSvIf { std::unique_ptr db_deletion_thread_; std::atomic stop_db_deletion_thread_; + + std::unique_ptr db_incremental_backup_thread_; + std::atomic stop_db_incremental_backup_thread_; }; } // namespace admin From 13804c026883bc89419e35144a91bbedfe65f356 Mon Sep 17 00:00:00 2001 From: qiyanghe1998 Date: Wed, 20 Jul 2022 17:59:07 +0000 Subject: [PATCH 02/15] copy checkpoint backup logic to backup manager --- rocksdb_admin/admin_handler.cpp | 44 ++- rocksdb_admin/admin_handler.h | 18 +- .../application_db_backup_manager.cpp | 298 ++++++++++++++++++ rocksdb_admin/application_db_backup_manager.h | 87 +++++ rocksdb_admin/application_db_manager.cpp | 4 + rocksdb_admin/application_db_manager.h | 3 + 6 files changed, 443 insertions(+), 11 deletions(-) create mode 100644 rocksdb_admin/application_db_backup_manager.cpp create mode 100644 rocksdb_admin/application_db_backup_manager.h diff --git a/rocksdb_admin/admin_handler.cpp b/rocksdb_admin/admin_handler.cpp index 03fafbdb..ccc39ab9 100644 --- a/rocksdb_admin/admin_handler.cpp +++ b/rocksdb_admin/admin_handler.cpp @@ -448,7 +448,7 @@ void deleteTmpDBs() { namespace admin { AdminHandler::AdminHandler( - std::unique_ptr db_manager, + std::shared_ptr db_manager, RocksDBOptionsGeneratorType rocksdb_options): AdminHandler( std::move(db_manager), [rocksdb_options](const std::string& dataset, const std::string& db) { @@ -456,10 +456,10 @@ AdminHandler::AdminHandler( }) {} AdminHandler::AdminHandler( - std::unique_ptr db_manager, + std::shared_ptr db_manager, RocksDBOptionsGenerator rocksdb_options) : db_admin_lock_() - , db_manager_(std::move(db_manager)) + , db_manager_(db_manager) , rocksdb_options_(std::move(rocksdb_options)) , s3_util_() , s3_util_lock_() @@ -467,10 +467,14 @@ AdminHandler::AdminHandler( , allow_overlapping_keys_segments_() , num_current_s3_sst_downloadings_(0) , stop_db_deletion_thread_(false) - , stop_db_incremental_backup_thread_ (false) { + , stop_db_incremental_backup_thread_(false) { if (db_manager_ == nullptr) { db_manager_ = CreateDBBasedOnConfig(rocksdb_options_); } + + backup_manager_ = std::make_unique( + std::move(db_manager), FLAGS_rocksdb_dir, FLAGS_checkpoint_backup_batch_num_upload); + folly::splitTo( ",", FLAGS_allow_overlapping_keys_segments, std::inserter(allow_overlapping_keys_segments_, @@ -506,7 +510,29 @@ AdminHandler::AdminHandler( } if (FLAGS_enable_async_incremental_backup_dbs) { - LOG(INFO) << "incremental backup gflag is enabled"; + db_incremental_backup_thread_ = std::make_unique([this] { + if (!folly::setThreadName("DBBackuper")) { + LOG(ERROR) << "Failed to set thread name for DB backup thread"; + } + + LOG(INFO) << "Starting DB backup thread ..."; + while (!stop_db_incremental_backup_thread_.load()) { + std::this_thread::sleep_for(std::chrono::seconds(FLAGS_async_incremental_backup_dbs_frequency_sec)); + const auto n = num_current_s3_sst_uploadings_.fetch_add(1); + + if (n >= FLAGS_max_s3_sst_loading_concurrency) { + auto err_str = + folly::stringPrintf("Concurrent uploading/downloading limit hits %d", n); + // SetException(err_str, AdminErrorCode::DB_ADMIN_ERROR, &callback); + LOG(ERROR) << err_str; + common::Stats::get()->Incr(kS3BackupFailure); + } else { + backup_manager_->backupAllDBsToS3(S3UploadAndDownloadExecutor(), meta_db_, db_admin_lock_); + } + num_current_s3_sst_uploadings_.fetch_sub(1); + } + LOG(INFO) << "Stopping DB backup thread ..."; + }); } // Initialize the atomic int variables @@ -2196,4 +2222,12 @@ std::vector AdminHandler::getAllDBNames() { return db_manager_->getAllDBNames(); } +void AdminHandler::setS3Config( + const std::string& s3_bucket, + const std::string& s3_backup_dir, + uint32_t limit_mbs, + bool include_meta) { + backup_manager_->setS3Config(std::move(s3_bucket), std::move(s3_backup_dir), limit_mbs, include_meta); +} + } // namespace admin diff --git a/rocksdb_admin/admin_handler.h b/rocksdb_admin/admin_handler.h index 224dfaac..d8cfcae4 100644 --- a/rocksdb_admin/admin_handler.h +++ b/rocksdb_admin/admin_handler.h @@ -26,10 +26,9 @@ #include #include -#include "common/object_lock.h" -#include "common/s3util.h" #include "folly/SocketAddress.h" -#include "rocksdb_admin/application_db_manager.h" +#include "rocksdb_admin/application_db_backup_manager.h" +#include "rocksdb_admin/application_db_backup_manager.h" #ifdef PINTEREST_INTERNAL // NEVER SET THIS UNLESS PINTEREST INTERNAL USAGE. #include "schemas/gen-cpp2/Admin.h" @@ -53,11 +52,11 @@ class AdminHandler : virtual public AdminSvIf { public: // TODO deprecate after getting rid of all callsites AdminHandler( - std::unique_ptr db_manager, + std::shared_ptr db_manager, RocksDBOptionsGeneratorType rocksdb_options); AdminHandler( - std::unique_ptr db_manager, + std::shared_ptr db_manager, RocksDBOptionsGenerator rocksdb_options); virtual ~AdminHandler(); @@ -153,6 +152,12 @@ class AdminHandler : virtual public AdminSvIf { // Get all the db names held by the AdminHandler std::vector getAllDBNames(); + // Set S3 config for the Backup Manager + void setS3Config(const std::string& s3_bucket, + const std::string& s3_backup_dir, + uint32_t limit_mbs, + bool include_meta); + protected: // Lock to synchronize DB admin operations at per DB granularity. // Put db_admin_lock in protected to provide flexibility @@ -170,7 +175,8 @@ class AdminHandler : virtual public AdminSvIf { const std::string& s3_path, const int64_t last_kafka_msg_timestamp_ms = -1); - std::unique_ptr db_manager_; + std::shared_ptr db_manager_; + std::unique_ptr backup_manager_; RocksDBOptionsGenerator rocksdb_options_; // S3 util used for download std::shared_ptr s3_util_; diff --git a/rocksdb_admin/application_db_backup_manager.cpp b/rocksdb_admin/application_db_backup_manager.cpp new file mode 100644 index 00000000..a80fc402 --- /dev/null +++ b/rocksdb_admin/application_db_backup_manager.cpp @@ -0,0 +1,298 @@ +/// Copyright 2016 Pinterest Inc. +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 + +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. + +#include "rocksdb_admin/application_db_backup_manager.h" + +#include +#include + +#include "boost/filesystem.hpp" +#include "common/file_util.h" +#include "common/stats/stats.h" +#include "common/timer.h" +#include "common/timeutil.h" +#include "rocksdb/utilities/checkpoint.h" +#include "rocksdb_admin/gen-cpp2/rocksdb_admin_types.h" +#include "rocksdb_admin/utils.h" +#include "thrift/lib/cpp2/protocol/Serializer.h" + +using admin::DBMetaData; + +namespace admin { + +const std::string kMetaFilename = "dbmeta"; +const std::string kS3BackupMs = "s3_backup_ms"; +const std::string kS3BackupFailure = "s3_backup_failure"; +const int kS3UtilRecheckSec = 5; + +ApplicationDBBackupManager::ApplicationDBBackupManager( + std::shared_ptr db_manager, + const std::string& rocksdb_dir, + uint32_t checkpoint_backup_batch_num_upload, + const std::string& s3_bucket, + const std::string& s3_backup_dir, + uint32_t limit_mbs, + bool include_meta) + : db_manager_(std::move(db_manager)) + , rocksdb_dir_(rocksdb_dir) + , checkpoint_backup_batch_num_upload_(checkpoint_backup_batch_num_upload) + , s3_bucket_(s3_bucket) + , s3_backup_dir_(s3_backup_dir) + , limit_mbs_(limit_mbs) + , include_meta_(include_meta) + , s3_util_() + , s3_util_lock_() {} + +ApplicationDBBackupManager::ApplicationDBBackupManager( + std::shared_ptr db_manager, + const std::string& rocksdb_dir, + uint32_t checkpoint_backup_batch_num_upload) + : db_manager_(std::move(db_manager)) + , rocksdb_dir_(rocksdb_dir) + , checkpoint_backup_batch_num_upload_(checkpoint_backup_batch_num_upload) + , s3_util_() + , s3_util_lock_() {} + +void ApplicationDBBackupManager::setS3Config( + const std::string& s3_bucket, + const std::string& s3_backup_dir, + uint32_t limit_mbs, + bool include_meta) { + s3_bucket_ = std::move(s3_bucket); + s3_backup_dir_ = std::move(s3_backup_dir); + limit_mbs_ = limit_mbs; + include_meta_ = include_meta; +} + +inline std::string ensure_ends_with_pathsep(const std::string& s) { + if (!s.empty() && s.back() != '/') { + return s + "/"; + } + return s; +} + +// copy from admin_hamdler.cpp +inline bool should_new_s3_client( + const common::S3Util& s3_util, const uint32_t limit_mbs, const std::string& s3_bucket) { + return s3_util.getBucket() != s3_bucket || + s3_util.getRateLimit() != limit_mbs; +} + +// copy from admin_hamdler.cpp +std::shared_ptr ApplicationDBBackupManager::createLocalS3Util( + const uint32_t limit_mbs, + const std::string& s3_bucket) { + // Though it is claimed that AWS s3 sdk is a light weight library. However, + // we couldn't afford to create a new client for every SST file downloading + // request, which is not even on any critical code path. Otherwise, we will + // see latency spike when uploading data to production clusters. + std::shared_ptr local_s3_util; + + { + std::lock_guard guard(s3_util_lock_); + if (s3_util_ == nullptr || should_new_s3_client(*s3_util_, limit_mbs, s3_bucket)) { + // Request with different ratelimit or bucket has to wait for old + // requests to drain. + while (s3_util_ != nullptr && s3_util_.use_count() > 1) { + LOG(INFO) << "There are other downloads happening, wait " + << kS3UtilRecheckSec << " seconds"; + std::this_thread::sleep_for(std::chrono::seconds(kS3UtilRecheckSec)); + } + // Invoke destructor explicitly to make sure Aws::InitAPI() + // and Aps::ShutdownApi() appear in pairs. + s3_util_ = nullptr; + s3_util_ = common::S3Util::BuildS3Util(limit_mbs, s3_bucket); + } + local_s3_util = s3_util_; + } + + return local_s3_util; +} + +bool ApplicationDBBackupManager::backupDBToS3( + const std::shared_ptr& db, + CPUThreadPoolExecutor* executor, + std::unique_ptr& meta_db, + common::ObjectLock& db_admin_lock) { + // ::admin::AdminException e; + + common::Timer timer(kS3BackupMs); + LOG(INFO) << "S3 Backup " << db->db_name() << " to " << s3_backup_dir_; + auto ts = common::timeutil::GetCurrentTimestamp(); + auto local_path = folly::stringPrintf("%ss3_tmp/%s%d/", rocksdb_dir_.c_str(), db->db_name().c_str(), ts); + boost::system::error_code remove_err; + boost::system::error_code create_err; + boost::filesystem::remove_all(local_path, remove_err); + boost::filesystem::create_directories(local_path, create_err); + SCOPE_EXIT { boost::filesystem::remove_all(local_path, remove_err); }; + if (remove_err || create_err) { + // SetException("Cannot remove/create dir for backup: " + local_path, AdminErrorCode::DB_ADMIN_ERROR, &callback); + common::Stats::get()->Incr(kS3BackupFailure); + return false; + } + + db_admin_lock.Lock(db->db_name()); + SCOPE_EXIT { db_admin_lock.Unlock(db->db_name()); }; + + rocksdb::Checkpoint* checkpoint; + auto status = rocksdb::Checkpoint::Create(db->rocksdb(), &checkpoint); + if (!status.ok()) { + // OKOrSetException(status, AdminErrorCode::DB_ADMIN_ERROR, &callback); + LOG(ERROR) << "Error happened when trying to initialize checkpoint: " << status.ToString(); + common::Stats::get()->Incr(kS3BackupFailure); + return false; + } + + auto checkpoint_local_path = local_path + "checkpoint"; + status = checkpoint->CreateCheckpoint(checkpoint_local_path); + if (!status.ok()) { + // OKOrSetException(status, AdminErrorCode::DB_ADMIN_ERROR, &callback); + LOG(ERROR) << "Error happened when trying to create checkpoint: " << status.ToString(); + common::Stats::get()->Incr(kS3BackupFailure); + return false; + } + std::unique_ptr checkpoint_holder(checkpoint); + + std::vector checkpoint_files; + status = rocksdb::Env::Default()->GetChildren(checkpoint_local_path, &checkpoint_files); + if (!status.ok()) { + // OKOrSetException(status, AdminErrorCode::DB_ADMIN_ERROR, &callback); + LOG(ERROR) << "Error happened when trying to list files in the checkpoint: " << status.ToString(); + common::Stats::get()->Incr(kS3BackupFailure); + return false; + } + + // Upload checkpoint to s3 + auto local_s3_util = createLocalS3Util(limit_mbs_, s3_bucket_); + std::string formatted_s3_dir_path = ensure_ends_with_pathsep(s3_backup_dir_); + std::string formatted_checkpoint_local_path = ensure_ends_with_pathsep(checkpoint_local_path); + auto upload_func = [&](const std::string& dest, const std::string& source) { + LOG(INFO) << "Copying " << source << " to " << dest; + auto copy_resp = local_s3_util->putObject(dest, source); + if (!copy_resp.Error().empty()) { + LOG(ERROR) + << "Error happened when uploading files from checkpoint to S3: " + << copy_resp.Error(); + return false; + } + return true; + }; + + if (checkpoint_backup_batch_num_upload_ > 1) { + // Upload checkpoint files to s3 in parallel + std::vector> file_batches(checkpoint_backup_batch_num_upload_); + for (size_t i = 0; i < checkpoint_files.size(); ++i) { + auto& file = checkpoint_files[i]; + if (file == "." || file == "..") { + continue; + } + file_batches[i%checkpoint_backup_batch_num_upload_].push_back(file); + } + + std::vector> futures; + for (auto& files : file_batches) { + auto p = folly::Promise(); + futures.push_back(p.getFuture()); + + executor->add( + [&, files = std::move(files), p = std::move(p)]() mutable { + for (const auto& file : files) { + if (!upload_func(formatted_s3_dir_path + file, formatted_checkpoint_local_path + file)) { + p.setValue(false); + return; + } + } + p.setValue(true); + }); + } + + for (auto& f : futures) { + auto res = std::move(f).get(); + if (!res) { + // SetException("Error happened when uploading files from checkpoint to S3", + // AdminErrorCode::DB_ADMIN_ERROR, + // &callback); + common::Stats::get()->Incr(kS3BackupFailure); + return false; + } + } + } else { + for (const auto& file : checkpoint_files) { + if (file == "." || file == "..") { + continue; + } + if (!upload_func(formatted_s3_dir_path + file, formatted_checkpoint_local_path + file)) { + // If there is error in one file uploading, then we fail the whole backup process + // SetException("Error happened when uploading files from checkpoint to S3", + // AdminErrorCode::DB_ADMIN_ERROR, + // &callback); + common::Stats::get()->Incr(kS3BackupFailure); + return false; + } + } + } + + if (include_meta_) { + DBMetaData meta; + meta.db_name = db->db_name(); + + std::string buffer; + rocksdb::ReadOptions options; + auto s = meta_db->Get(options, db->db_name(), &buffer); + if (s.ok()) { + apache::thrift::CompactSerializer::deserialize(buffer, meta); + } + std::string dbmeta_path; + try { + std::string encoded_meta; + EncodeThriftStruct(meta, &encoded_meta); + dbmeta_path = common::FileUtil::createFileWithContent( + formatted_checkpoint_local_path, kMetaFilename, encoded_meta); + } catch (std::exception& e) { + // SetException("Failed to create meta file, " + std::string(e.what()), + // AdminErrorCode::DB_ADMIN_ERROR, &callback); + common::Stats::get()->Incr(kS3BackupFailure); + return false; + } + if (!upload_func(formatted_s3_dir_path + kMetaFilename, dbmeta_path)) { + // SetException("Error happened when upload meta from checkpoint to S3", + // AdminErrorCode::DB_ADMIN_ERROR, &callback); + common::Stats::get()->Incr(kS3BackupFailure); + return false; + } + } + + // Delete the directory to remove the snapshot. + boost::filesystem::remove_all(local_path); +} + +bool ApplicationDBBackupManager::backupAllDBsToS3( + CPUThreadPoolExecutor* executor, + std::unique_ptr& meta_db, + common::ObjectLock& db_admin_lock) { + if (s3_bucket_.empty() || s3_backup_dir_.empty() || !limit_mbs_) { + LOG(INFO) << "S3 config has not been set, so incremental backup cannot be triggered"; + return false; + } + for (const auto& db : db_manager_->getAllDBs()) { + LOG(INFO) << "Incremental backup for " << db.first; + if(backupDBToS3(db.second, executor, meta_db, db_admin_lock)) { + LOG(INFO) << "Backup for " << db.first << " succeeds"; + } else { + LOG(INFO) << "Backup for " << db.first << " fails"; + } + } +} + +} \ No newline at end of file diff --git a/rocksdb_admin/application_db_backup_manager.h b/rocksdb_admin/application_db_backup_manager.h new file mode 100644 index 00000000..1ed6eb63 --- /dev/null +++ b/rocksdb_admin/application_db_backup_manager.h @@ -0,0 +1,87 @@ +/// Copyright 2016 Pinterest Inc. +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 + +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. + +#pragma once + +#include +#include +#include + +#include "common/object_lock.h" +#include "common/s3util.h" +#include "rocksdb/db.h" +#include "rocksdb_admin/application_db_manager.h" +#include "rocksdb_replicator/thrift/gen-cpp2/Replicator.h" + +#if __GNUC__ >= 8 +#include "folly/executors/CPUThreadPoolExecutor.h" +#include "folly/system/ThreadName.h" +#else +#include "wangle/concurrent/CPUThreadPoolExecutor.h" +#endif + +#if __GNUC__ >= 8 +using folly::CPUThreadPoolExecutor; +#else +using wangle::CPUThreadPoolExecutor; +#endif + +namespace admin { + +class ApplicationDBBackupManager { + public: + ApplicationDBBackupManager(std::shared_ptr db_manager, + const std::string& rocksdb_dir, + uint32_t checkpoint_backup_batch_num_upload, + const std::string& s3_bucket, + const std::string& s3_backup_dir, + uint32_t limit_mbs, + bool include_meta); + + ApplicationDBBackupManager(std::shared_ptr db_manager, + const std::string& rocksdb_dir, + uint32_t checkpoint_backup_batch_num_upload); + + void setS3Config(const std::string& s3_bucket, + const std::string& s3_backup_dir, + uint32_t limit_mbs, + bool include_meta); + + // copy from admin_hamdler.. + std::shared_ptr createLocalS3Util(const uint32_t limit_mbs, + const std::string& s3_bucket); + + bool backupAllDBsToS3(CPUThreadPoolExecutor* executor, + std::unique_ptr& meta_db, + common::ObjectLock& db_admin_lock); + + bool backupDBToS3(const std::shared_ptr& db, + CPUThreadPoolExecutor* executor, + std::unique_ptr& meta_db, + common::ObjectLock& db_admin_lock); + + private: + std::shared_ptr db_manager_; + std::string rocksdb_dir_; + uint32_t checkpoint_backup_batch_num_upload_; + std::string s3_bucket_; + std::string s3_backup_dir_; + uint32_t limit_mbs_; + bool include_meta_; + + std::shared_ptr s3_util_; + mutable std::mutex s3_util_lock_; +}; + +} \ No newline at end of file diff --git a/rocksdb_admin/application_db_manager.cpp b/rocksdb_admin/application_db_manager.cpp index 77ac429f..3aae3cc0 100644 --- a/rocksdb_admin/application_db_manager.cpp +++ b/rocksdb_admin/application_db_manager.cpp @@ -149,6 +149,10 @@ std::string ApplicationDBManager::Introspect() const { return ss.str(); } +std::unordered_map> ApplicationDBManager::getAllDBs() { + return dbs_; +} + ApplicationDBManager::~ApplicationDBManager() { auto itor = dbs_.begin(); while (itor != dbs_.end()) { diff --git a/rocksdb_admin/application_db_manager.h b/rocksdb_admin/application_db_manager.h index 92e9ec81..ec4822d8 100644 --- a/rocksdb_admin/application_db_manager.h +++ b/rocksdb_admin/application_db_manager.h @@ -86,6 +86,9 @@ class ApplicationDBManager { // Introspect ApplicationDBManager internal states std::string Introspect() const; + // Get all the name and the applicationDB + std::unordered_map> getAllDBs(); + ~ApplicationDBManager(); private: From 61c8112d0cf59b107cea0e55e2bd2290f06ee0f8 Mon Sep 17 00:00:00 2001 From: qiyanghe1998 Date: Wed, 20 Jul 2022 21:10:45 +0000 Subject: [PATCH 03/15] fix the s3_backup_dir used for uploading --- rocksdb_admin/application_db_backup_manager.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rocksdb_admin/application_db_backup_manager.cpp b/rocksdb_admin/application_db_backup_manager.cpp index a80fc402..9be713df 100644 --- a/rocksdb_admin/application_db_backup_manager.cpp +++ b/rocksdb_admin/application_db_backup_manager.cpp @@ -175,7 +175,7 @@ bool ApplicationDBBackupManager::backupDBToS3( // Upload checkpoint to s3 auto local_s3_util = createLocalS3Util(limit_mbs_, s3_bucket_); - std::string formatted_s3_dir_path = ensure_ends_with_pathsep(s3_backup_dir_); + std::string formatted_s3_dir_path = folly::stringPrintf("%s/%s/%d", ensure_ends_with_pathsep(checkpoint_local_path).c_str(), db->db_name().c_str(), ts); std::string formatted_checkpoint_local_path = ensure_ends_with_pathsep(checkpoint_local_path); auto upload_func = [&](const std::string& dest, const std::string& source) { LOG(INFO) << "Copying " << source << " to " << dest; From 292e719c8ebdfa948ea8378609cff5eb7a91d9ad Mon Sep 17 00:00:00 2001 From: qiyanghe1998 Date: Fri, 22 Jul 2022 23:19:34 +0000 Subject: [PATCH 04/15] fix backup_dir & add backup anager test --- rocksdb_admin/admin_handler.cpp | 37 +++++++++++++++++-- rocksdb_admin/admin_handler.h | 15 ++++++-- .../application_db_backup_manager.cpp | 30 ++++++++++----- rocksdb_admin/application_db_backup_manager.h | 31 +++++++++------- rocksdb_admin/tests/admin_handler_test.cpp | 23 ++++++++++++ 5 files changed, 106 insertions(+), 30 deletions(-) diff --git a/rocksdb_admin/admin_handler.cpp b/rocksdb_admin/admin_handler.cpp index ccc39ab9..ef4bb95f 100644 --- a/rocksdb_admin/admin_handler.cpp +++ b/rocksdb_admin/admin_handler.cpp @@ -2224,10 +2224,39 @@ std::vector AdminHandler::getAllDBNames() { void AdminHandler::setS3Config( const std::string& s3_bucket, - const std::string& s3_backup_dir, - uint32_t limit_mbs, - bool include_meta) { - backup_manager_->setS3Config(std::move(s3_bucket), std::move(s3_backup_dir), limit_mbs, include_meta); + const std::string& s3_backup_dir_prefix, + const std::string& snapshot_host_port, + const uint32_t limit_mbs, + const bool include_meta) { + backup_manager_->setS3Config(std::move(s3_bucket), std::move(s3_backup_dir_prefix), + std::move(snapshot_host_port), limit_mbs, include_meta); +} + +bool AdminHandler::checkS3Object( + const uint32_t limit_mbs, + const std::string& s3_bucket, + const std::string& s3_path) { + auto local_s3_util = createLocalS3Util(limit_mbs, s3_bucket); + auto resp = local_s3_util->listAllObjects(ensure_ends_with_pathsep(s3_path)); + if (!resp.Error().empty()) { + auto err_msg = folly::stringPrintf( + "Error happened when fetching files in checkpoint from S3: %s under path: %s", + resp.Error().c_str(), s3_path.c_str()); + LOG(ERROR) << err_msg; + common::Stats::get()->Incr(kS3RestoreFailure); + return false; + } + + if (resp.Body().objects.size() > 0) { + return true; + } else { + LOG(INFO) << "No data has been written into " << s3_path << " on S3"; + return false; + } +} + +bool AdminHandler::backupAllDBsToS3() { + return backup_manager_->backupAllDBsToS3(S3UploadAndDownloadExecutor(), meta_db_, db_admin_lock_); } } // namespace admin diff --git a/rocksdb_admin/admin_handler.h b/rocksdb_admin/admin_handler.h index d8cfcae4..8d0cc047 100644 --- a/rocksdb_admin/admin_handler.h +++ b/rocksdb_admin/admin_handler.h @@ -154,9 +154,18 @@ class AdminHandler : virtual public AdminSvIf { // Set S3 config for the Backup Manager void setS3Config(const std::string& s3_bucket, - const std::string& s3_backup_dir, - uint32_t limit_mbs, - bool include_meta); + const std::string& s3_backup_dir_prefix, + const std::string& snapshot_host_port, + const uint32_t limit_mbs, + const bool include_meta); + + // Used for testing incremental backup + bool checkS3Object(const uint32_t limit_mbs = 50, + const std::string& s3_bucket = "", + const std::string& s3_path = ""); + + // Used for testing incremental backup + bool backupAllDBsToS3(); protected: // Lock to synchronize DB admin operations at per DB granularity. diff --git a/rocksdb_admin/application_db_backup_manager.cpp b/rocksdb_admin/application_db_backup_manager.cpp index 9be713df..e9a30119 100644 --- a/rocksdb_admin/application_db_backup_manager.cpp +++ b/rocksdb_admin/application_db_backup_manager.cpp @@ -41,14 +41,16 @@ ApplicationDBBackupManager::ApplicationDBBackupManager( const std::string& rocksdb_dir, uint32_t checkpoint_backup_batch_num_upload, const std::string& s3_bucket, - const std::string& s3_backup_dir, + const std::string& s3_backup_dir_prefix, + const std::string& snapshot_host_port, uint32_t limit_mbs, bool include_meta) : db_manager_(std::move(db_manager)) , rocksdb_dir_(rocksdb_dir) , checkpoint_backup_batch_num_upload_(checkpoint_backup_batch_num_upload) , s3_bucket_(s3_bucket) - , s3_backup_dir_(s3_backup_dir) + , s3_backup_dir_prefix_(s3_backup_dir_prefix) + , snapshot_host_port_(snapshot_host_port) , limit_mbs_(limit_mbs) , include_meta_(include_meta) , s3_util_() @@ -66,11 +68,13 @@ ApplicationDBBackupManager::ApplicationDBBackupManager( void ApplicationDBBackupManager::setS3Config( const std::string& s3_bucket, - const std::string& s3_backup_dir, - uint32_t limit_mbs, - bool include_meta) { + const std::string& s3_backup_dir_prefix, + const std::string& snapshot_host_port, + const uint32_t limit_mbs, + const bool include_meta) { s3_bucket_ = std::move(s3_bucket); - s3_backup_dir_ = std::move(s3_backup_dir); + s3_backup_dir_prefix_ = std::move(s3_backup_dir_prefix); + snapshot_host_port_ = std::move(snapshot_host_port); limit_mbs_ = limit_mbs; include_meta_ = include_meta; } @@ -128,7 +132,8 @@ bool ApplicationDBBackupManager::backupDBToS3( // ::admin::AdminException e; common::Timer timer(kS3BackupMs); - LOG(INFO) << "S3 Backup " << db->db_name() << " to " << s3_backup_dir_; + LOG(INFO) << "S3 Backup " << db->db_name() << " to " << ensure_ends_with_pathsep(s3_backup_dir_prefix_) << db->db_name() + << "/" << ensure_ends_with_pathsep(snapshot_host_port_); auto ts = common::timeutil::GetCurrentTimestamp(); auto local_path = folly::stringPrintf("%ss3_tmp/%s%d/", rocksdb_dir_.c_str(), db->db_name().c_str(), ts); boost::system::error_code remove_err; @@ -175,7 +180,9 @@ bool ApplicationDBBackupManager::backupDBToS3( // Upload checkpoint to s3 auto local_s3_util = createLocalS3Util(limit_mbs_, s3_bucket_); - std::string formatted_s3_dir_path = folly::stringPrintf("%s/%s/%d", ensure_ends_with_pathsep(checkpoint_local_path).c_str(), db->db_name().c_str(), ts); + std::string formatted_s3_dir_path = folly::stringPrintf("%s%s/%s%d/", + ensure_ends_with_pathsep(s3_backup_dir_prefix_).c_str(), db->db_name().c_str(), + ensure_ends_with_pathsep(snapshot_host_port_).c_str(), ts); std::string formatted_checkpoint_local_path = ensure_ends_with_pathsep(checkpoint_local_path); auto upload_func = [&](const std::string& dest, const std::string& source) { LOG(INFO) << "Copying " << source << " to " << dest; @@ -281,18 +288,23 @@ bool ApplicationDBBackupManager::backupAllDBsToS3( CPUThreadPoolExecutor* executor, std::unique_ptr& meta_db, common::ObjectLock& db_admin_lock) { - if (s3_bucket_.empty() || s3_backup_dir_.empty() || !limit_mbs_) { + if (s3_bucket_.empty() || s3_backup_dir_prefix_.empty() || snapshot_host_port_.empty() || !limit_mbs_) { LOG(INFO) << "S3 config has not been set, so incremental backup cannot be triggered"; return false; } + + bool ret = true; for (const auto& db : db_manager_->getAllDBs()) { LOG(INFO) << "Incremental backup for " << db.first; if(backupDBToS3(db.second, executor, meta_db, db_admin_lock)) { LOG(INFO) << "Backup for " << db.first << " succeeds"; } else { LOG(INFO) << "Backup for " << db.first << " fails"; + ret = false; } } + + return ret; } } \ No newline at end of file diff --git a/rocksdb_admin/application_db_backup_manager.h b/rocksdb_admin/application_db_backup_manager.h index 1ed6eb63..6d971a8e 100644 --- a/rocksdb_admin/application_db_backup_manager.h +++ b/rocksdb_admin/application_db_backup_manager.h @@ -42,25 +42,23 @@ namespace admin { class ApplicationDBBackupManager { public: ApplicationDBBackupManager(std::shared_ptr db_manager, - const std::string& rocksdb_dir, - uint32_t checkpoint_backup_batch_num_upload, - const std::string& s3_bucket, - const std::string& s3_backup_dir, - uint32_t limit_mbs, - bool include_meta); + const std::string& rocksdb_dir, + uint32_t checkpoint_backup_batch_num_upload, + const std::string& s3_bucket, + const std::string& s3_backup_dir_prefix, + const std::string& snapshot_host_port, + uint32_t limit_mbs, + bool include_meta); ApplicationDBBackupManager(std::shared_ptr db_manager, const std::string& rocksdb_dir, uint32_t checkpoint_backup_batch_num_upload); void setS3Config(const std::string& s3_bucket, - const std::string& s3_backup_dir, - uint32_t limit_mbs, - bool include_meta); - - // copy from admin_hamdler.. - std::shared_ptr createLocalS3Util(const uint32_t limit_mbs, - const std::string& s3_bucket); + const std::string& s3_backup_dir_prefix, + const std::string& snapshot_host_port, + const uint32_t limit_mbs, + const bool include_meta); bool backupAllDBsToS3(CPUThreadPoolExecutor* executor, std::unique_ptr& meta_db, @@ -72,11 +70,16 @@ class ApplicationDBBackupManager { common::ObjectLock& db_admin_lock); private: + // copy from admin_hamdler.. + std::shared_ptr createLocalS3Util(const uint32_t limit_mbs, + const std::string& s3_bucket); + std::shared_ptr db_manager_; std::string rocksdb_dir_; uint32_t checkpoint_backup_batch_num_upload_; std::string s3_bucket_; - std::string s3_backup_dir_; + std::string s3_backup_dir_prefix_; + std::string snapshot_host_port_; uint32_t limit_mbs_; bool include_meta_; diff --git a/rocksdb_admin/tests/admin_handler_test.cpp b/rocksdb_admin/tests/admin_handler_test.cpp index 8ece7ae9..83fbedd0 100644 --- a/rocksdb_admin/tests/admin_handler_test.cpp +++ b/rocksdb_admin/tests/admin_handler_test.cpp @@ -882,6 +882,29 @@ TEST(AdminHandlerTest, MetaData) { EXPECT_EQ(meta.db_name, db_name); } +TEST_F(AdminHandlerTestBase, ApplicationDBBackupManagerTest) { + // backup an un-initialized DB should error out + const string testdb = generateDBName(); + addDBWithRole(testdb, "LEADER"); + auto meta = handler_->getMetaData(testdb); + verifyMeta(meta, testdb, true, "", ""); + + // use incremental backup + handler_->writeMetaData(testdb, "fakes3bucket", "fakes3path"); + + std::string s3_bucket = FLAGS_s3_bucket; + std::string s3_backup_dir_prefix = "backup/cluster1/"; + std::string snapshot_host_port = "host_8080"; + const uint32_t limit_mbs = 100; + const bool include_meta = false; + std::string s3_path = s3_backup_dir_prefix + testdb + "/" + snapshot_host_port + "/"; + + handler_->setS3Config(s3_bucket, s3_backup_dir_prefix, snapshot_host_port, limit_mbs, include_meta); + EXPECT_TRUE(handler_->backupAllDBsToS3()); + EXPECT_TRUE(handler_->checkS3Object(limit_mbs, s3_bucket, s3_path)); +} + + } // namespace admin int main(int argc, char** argv) { From 189c6a01e8460bbacf7963366378ff564cd6dd96 Mon Sep 17 00:00:00 2001 From: qiyanghe1998 Date: Fri, 29 Jul 2022 18:08:45 +0000 Subject: [PATCH 05/15] fix shared_ptr init problem & admin_handler test & clean up --- rocksdb_admin/admin_handler.cpp | 167 ++++-------------- rocksdb_admin/admin_handler.h | 39 ++-- .../application_db_backup_manager.cpp | 121 ++++++------- rocksdb_admin/application_db_backup_manager.h | 62 ++++--- rocksdb_admin/tests/admin_handler_test.cpp | 22 ++- 5 files changed, 164 insertions(+), 247 deletions(-) diff --git a/rocksdb_admin/admin_handler.cpp b/rocksdb_admin/admin_handler.cpp index ef4bb95f..554725cd 100644 --- a/rocksdb_admin/admin_handler.cpp +++ b/rocksdb_admin/admin_handler.cpp @@ -57,18 +57,11 @@ #include "rocksdb_replicator/rocksdb_replicator.h" #include "rocksdb_replicator/thrift/gen-cpp2/Replicator.h" #include "thrift/lib/cpp2/protocol/Serializer.h" -#if __GNUC__ >= 8 -#include "folly/executors/CPUThreadPoolExecutor.h" -#include "folly/system/ThreadName.h" -#else -#include "wangle/concurrent/CPUThreadPoolExecutor.h" -#endif DEFINE_string(hdfs_name_node, "hdfs://hbasebak-infra-namenode-prod1c01-001:8020", "The hdfs name node used for backup"); -DEFINE_string(rocksdb_dir, "/tmp/", - "The dir for local rocksdb instances"); +DEFINE_string(rocksdb_dir, "/tmp/", "The dir for local rocksdb instances"); DEFINE_int32(num_hdfs_access_threads, 8, "The number of threads for backup or restore to/from HDFS"); @@ -133,27 +126,6 @@ DEFINE_int32(async_delete_dbs_frequency_sec, DEFINE_int32(async_delete_dbs_wait_sec, 60, "How long in sec to wait between the dbs deletion"); - -DEFINE_bool(enable_async_incremental_backup_dbs, false, "Enable incremental backup for db files"); - -DEFINE_int32(async_incremental_backup_dbs_frequency_sec, - 60, - "How frequently in sec to check the dbs need deleting in async way"); - -DEFINE_int32(async_incremental_backup_dbs_wait_sec, - 60, - "How long in sec to wait between the dbs deletion"); - -#if __GNUC__ >= 8 -using folly::CPUThreadPoolExecutor; -using folly::LifoSemMPMCQueue; -using folly::QueueBehaviorIfFull; -#else -using wangle::CPUThreadPoolExecutor; -using wangle::LifoSemMPMCQueue; -using wangle::QueueBehaviorIfFull; -#endif - namespace { const std::string kMetaFilename = "dbmeta"; @@ -407,16 +379,6 @@ bool DeserializeKafkaPayload( } } -CPUThreadPoolExecutor* S3UploadAndDownloadExecutor() { - static CPUThreadPoolExecutor executor( - FLAGS_num_s3_upload_download_threads, - std::make_unique>( - FLAGS_max_s3_upload_download_task_queue_size), - std::make_shared("s3-upload-download")); - - return &executor; -} - // The dbs moved to db_tmp/ shouldnt be re-used or re-opened, so we can // delete them via boost filesystem operations rather than rocksdb::DestroyDB() void deleteTmpDBs() { @@ -458,7 +420,7 @@ AdminHandler::AdminHandler( AdminHandler::AdminHandler( std::shared_ptr db_manager, RocksDBOptionsGenerator rocksdb_options) - : db_admin_lock_() + : db_admin_lock_(std::make_shared>()) , db_manager_(db_manager) , rocksdb_options_(std::move(rocksdb_options)) , s3_util_() @@ -467,13 +429,18 @@ AdminHandler::AdminHandler( , allow_overlapping_keys_segments_() , num_current_s3_sst_downloadings_(0) , stop_db_deletion_thread_(false) - , stop_db_incremental_backup_thread_(false) { + , executor_(new CPUThreadPoolExecutor(FLAGS_num_s3_upload_download_threads, + std::make_unique>( + FLAGS_max_s3_upload_download_task_queue_size), + std::make_shared("s3-upload-download"))) { if (db_manager_ == nullptr) { db_manager_ = CreateDBBasedOnConfig(rocksdb_options_); } - backup_manager_ = std::make_unique( - std::move(db_manager), FLAGS_rocksdb_dir, FLAGS_checkpoint_backup_batch_num_upload); + if (FLAGS_enable_async_incremental_backup_dbs) { + backup_manager_ = std::make_unique(db_manager, executor_, meta_db_, + db_admin_lock_, FLAGS_rocksdb_dir, FLAGS_checkpoint_backup_batch_num_upload); + } folly::splitTo( ",", FLAGS_allow_overlapping_keys_segments, @@ -509,32 +476,6 @@ AdminHandler::AdminHandler( }); } - if (FLAGS_enable_async_incremental_backup_dbs) { - db_incremental_backup_thread_ = std::make_unique([this] { - if (!folly::setThreadName("DBBackuper")) { - LOG(ERROR) << "Failed to set thread name for DB backup thread"; - } - - LOG(INFO) << "Starting DB backup thread ..."; - while (!stop_db_incremental_backup_thread_.load()) { - std::this_thread::sleep_for(std::chrono::seconds(FLAGS_async_incremental_backup_dbs_frequency_sec)); - const auto n = num_current_s3_sst_uploadings_.fetch_add(1); - - if (n >= FLAGS_max_s3_sst_loading_concurrency) { - auto err_str = - folly::stringPrintf("Concurrent uploading/downloading limit hits %d", n); - // SetException(err_str, AdminErrorCode::DB_ADMIN_ERROR, &callback); - LOG(ERROR) << err_str; - common::Stats::get()->Incr(kS3BackupFailure); - } else { - backup_manager_->backupAllDBsToS3(S3UploadAndDownloadExecutor(), meta_db_, db_admin_lock_); - } - num_current_s3_sst_uploadings_.fetch_sub(1); - } - LOG(INFO) << "Stopping DB backup thread ..."; - }); - } - // Initialize the atomic int variables num_current_s3_sst_downloadings_.store(0); num_current_s3_sst_uploadings_.store(0); @@ -545,11 +486,6 @@ AdminHandler::~AdminHandler() { stop_db_deletion_thread_ = true; db_deletion_thread_->join(); } - - if (FLAGS_enable_async_incremental_backup_dbs) { - stop_db_incremental_backup_thread_ = true; - db_incremental_backup_thread_->join(); - } } std::shared_ptr AdminHandler::getDB( @@ -627,8 +563,8 @@ void AdminHandler::async_tm_addDB( std::unique_ptr>> callback, std::unique_ptr request) { - db_admin_lock_.Lock(request->db_name); - SCOPE_EXIT { db_admin_lock_.Unlock(request->db_name); }; + db_admin_lock_->Lock(request->db_name); + SCOPE_EXIT { db_admin_lock_->Unlock(request->db_name); }; AdminException e; auto db = getDB(request->db_name, &e); @@ -731,8 +667,8 @@ bool AdminHandler::backupDBHelper(const std::string& db_name, bool include_meta, AdminException* e) { CHECK(env_holder != nullptr); - db_admin_lock_.Lock(db_name); - SCOPE_EXIT { db_admin_lock_.Unlock(db_name); }; + db_admin_lock_->Lock(db_name); + SCOPE_EXIT { db_admin_lock_->Unlock(db_name); }; auto db = getDB(db_name, e); if (db == nullptr) { @@ -802,8 +738,8 @@ bool AdminHandler::restoreDBHelper(const std::string& db_name, const uint32_t restore_rate_limit, AdminException* e) { assert(env_holder != nullptr); - db_admin_lock_.Lock(db_name); - SCOPE_EXIT { db_admin_lock_.Unlock(db_name); }; + db_admin_lock_->Lock(db_name); + SCOPE_EXIT { db_admin_lock_->Unlock(db_name); }; auto db = db_manager_->getDB(db_name, nullptr); if (db) { @@ -1022,8 +958,8 @@ void AdminHandler::async_tm_backupDBToS3( } if (FLAGS_enable_checkpoint_backup) { - db_admin_lock_.Lock(request->db_name); - SCOPE_EXIT { db_admin_lock_.Unlock(request->db_name); }; + db_admin_lock_->Lock(request->db_name); + SCOPE_EXIT { db_admin_lock_->Unlock(request->db_name); }; auto db = getDB(request->db_name, &e); if (db == nullptr) { @@ -1093,7 +1029,7 @@ void AdminHandler::async_tm_backupDBToS3( auto p = folly::Promise(); futures.push_back(p.getFuture()); - S3UploadAndDownloadExecutor()->add( + executor_->add( [&, files = std::move(files), p = std::move(p)]() mutable { for (const auto& file : files) { if (!upload_func(formatted_s3_dir_path + file, formatted_checkpoint_local_path + file)) { @@ -1274,7 +1210,7 @@ void AdminHandler::async_tm_restoreDBFromS3( auto p = folly::Promise(); futures.push_back(p.getFuture()); - S3UploadAndDownloadExecutor()->add( + executor_->add( [&, files = std::move(files), p = std::move(p)]() mutable { for (const auto& file : files) { if (!download_func(file)) { @@ -1450,8 +1386,8 @@ void AdminHandler::async_tm_closeDB( std::unique_ptr>> callback, std::unique_ptr request) { - db_admin_lock_.Lock(request->db_name); - SCOPE_EXIT { db_admin_lock_.Unlock(request->db_name); }; + db_admin_lock_->Lock(request->db_name); + SCOPE_EXIT { db_admin_lock_->Unlock(request->db_name); }; AdminException e; if (removeDB(request->db_name, &e) == nullptr) { @@ -1466,8 +1402,8 @@ void AdminHandler::async_tm_changeDBRoleAndUpStream( std::unique_ptr>> callback, std::unique_ptr request) { - db_admin_lock_.Lock(request->db_name); - SCOPE_EXIT { db_admin_lock_.Unlock(request->db_name); }; + db_admin_lock_->Lock(request->db_name); + SCOPE_EXIT { db_admin_lock_->Unlock(request->db_name); }; AdminException e; replicator::ReplicaRole new_role; @@ -1533,8 +1469,8 @@ void AdminHandler::async_tm_clearDB( std::unique_ptr>> callback, std::unique_ptr request) { - db_admin_lock_.Lock(request->db_name); - SCOPE_EXIT { db_admin_lock_.Unlock(request->db_name); }; + db_admin_lock_->Lock(request->db_name); + SCOPE_EXIT { db_admin_lock_->Unlock(request->db_name); }; bool need_to_reopen = false; replicator::ReplicaRole db_role; @@ -1666,8 +1602,8 @@ void AdminHandler::async_tm_addS3SstFilesToDB( admin::AdminException e; e.errorCode = AdminErrorCode::DB_ADMIN_ERROR; - db_admin_lock_.Lock(request->db_name); - SCOPE_EXIT { db_admin_lock_.Unlock(request->db_name); }; + db_admin_lock_->Lock(request->db_name); + SCOPE_EXIT { db_admin_lock_->Unlock(request->db_name); }; auto db = getDB(request->db_name, nullptr); if (db == nullptr) { @@ -1897,8 +1833,8 @@ void AdminHandler::async_tm_startMessageIngestion( << "serverset path: " << kafka_broker_serverset_path << ", " << "replay_timestamp_ms: " << replay_timestamp_ms; - db_admin_lock_.Lock(db_name); - SCOPE_EXIT { db_admin_lock_.Unlock(db_name); }; + db_admin_lock_->Lock(db_name); + SCOPE_EXIT { db_admin_lock_->Unlock(db_name); }; auto db = getDB(db_name, &e); if (db == nullptr) { e.message = db_name + " doesn't exist."; @@ -2120,8 +2056,8 @@ void AdminHandler::async_tm_stopMessageIngestion( const auto db_name = request->db_name; LOG(ERROR) << "Called stopMessageIngestion for " << db_name; - db_admin_lock_.Lock(db_name); - SCOPE_EXIT { db_admin_lock_.Unlock(db_name); }; + db_admin_lock_->Lock(db_name); + SCOPE_EXIT { db_admin_lock_->Unlock(db_name); }; auto db = getDB(db_name, &e); if (db == nullptr) { e.message = db_name + " doesn't exist."; @@ -2166,8 +2102,8 @@ void AdminHandler::async_tm_setDBOptions( for (auto& option_pair : request->options) { options.emplace(std::move(option_pair)); } - db_admin_lock_.Lock(request->db_name); - SCOPE_EXIT { db_admin_lock_.Unlock(request->db_name); }; + db_admin_lock_->Lock(request->db_name); + SCOPE_EXIT { db_admin_lock_->Unlock(request->db_name); }; ::admin::AdminException e; auto db = getDB(request->db_name, &e); if (db == nullptr) { @@ -2222,41 +2158,8 @@ std::vector AdminHandler::getAllDBNames() { return db_manager_->getAllDBNames(); } -void AdminHandler::setS3Config( - const std::string& s3_bucket, - const std::string& s3_backup_dir_prefix, - const std::string& snapshot_host_port, - const uint32_t limit_mbs, - const bool include_meta) { - backup_manager_->setS3Config(std::move(s3_bucket), std::move(s3_backup_dir_prefix), - std::move(snapshot_host_port), limit_mbs, include_meta); -} - -bool AdminHandler::checkS3Object( - const uint32_t limit_mbs, - const std::string& s3_bucket, - const std::string& s3_path) { - auto local_s3_util = createLocalS3Util(limit_mbs, s3_bucket); - auto resp = local_s3_util->listAllObjects(ensure_ends_with_pathsep(s3_path)); - if (!resp.Error().empty()) { - auto err_msg = folly::stringPrintf( - "Error happened when fetching files in checkpoint from S3: %s under path: %s", - resp.Error().c_str(), s3_path.c_str()); - LOG(ERROR) << err_msg; - common::Stats::get()->Incr(kS3RestoreFailure); - return false; - } - - if (resp.Body().objects.size() > 0) { - return true; - } else { - LOG(INFO) << "No data has been written into " << s3_path << " on S3"; - return false; - } -} - bool AdminHandler::backupAllDBsToS3() { - return backup_manager_->backupAllDBsToS3(S3UploadAndDownloadExecutor(), meta_db_, db_admin_lock_); + return backup_manager_->backupAllDBsToS3(); } } // namespace admin diff --git a/rocksdb_admin/admin_handler.h b/rocksdb_admin/admin_handler.h index 8d0cc047..f4119f0b 100644 --- a/rocksdb_admin/admin_handler.h +++ b/rocksdb_admin/admin_handler.h @@ -28,7 +28,7 @@ #include "folly/SocketAddress.h" #include "rocksdb_admin/application_db_backup_manager.h" -#include "rocksdb_admin/application_db_backup_manager.h" +#include "rocksdb_admin/application_db_manager.h" #ifdef PINTEREST_INTERNAL // NEVER SET THIS UNLESS PINTEREST INTERNAL USAGE. #include "schemas/gen-cpp2/Admin.h" @@ -36,6 +36,22 @@ #include "rocksdb_admin/gen-cpp2/Admin.h" #endif #include "rocksdb/status.h" +#if __GNUC__ >= 8 +#include "folly/executors/CPUThreadPoolExecutor.h" +#include "folly/system/ThreadName.h" +#else +#include "wangle/concurrent/CPUThreadPoolExecutor.h" +#endif + +#if __GNUC__ >= 8 +using folly::CPUThreadPoolExecutor; +using folly::LifoSemMPMCQueue; +using folly::QueueBehaviorIfFull; +#else +using wangle::CPUThreadPoolExecutor; +using wangle::LifoSemMPMCQueue; +using wangle::QueueBehaviorIfFull; +#endif class KafkaWatcher; @@ -151,18 +167,6 @@ class AdminHandler : virtual public AdminSvIf { // Get all the db names held by the AdminHandler std::vector getAllDBNames(); - - // Set S3 config for the Backup Manager - void setS3Config(const std::string& s3_bucket, - const std::string& s3_backup_dir_prefix, - const std::string& snapshot_host_port, - const uint32_t limit_mbs, - const bool include_meta); - - // Used for testing incremental backup - bool checkS3Object(const uint32_t limit_mbs = 50, - const std::string& s3_bucket = "", - const std::string& s3_path = ""); // Used for testing incremental backup bool backupAllDBsToS3(); @@ -171,7 +175,7 @@ class AdminHandler : virtual public AdminSvIf { // Lock to synchronize DB admin operations at per DB granularity. // Put db_admin_lock in protected to provide flexibility // of overriding some admin functions - common::ObjectLock db_admin_lock_; + std::shared_ptr> db_admin_lock_; private: std::unique_ptr removeDB(const std::string& db_name, @@ -192,7 +196,7 @@ class AdminHandler : virtual public AdminSvIf { // Lock for protecting the s3 util mutable std::mutex s3_util_lock_; // db that contains meta data for all local rocksdb instances - std::unique_ptr meta_db_; + std::shared_ptr meta_db_; // segments which allow for overlapping keys when adding SST files std::unordered_set allow_overlapping_keys_segments_; // number of the current concurrenty s3 downloadings @@ -204,6 +208,8 @@ class AdminHandler : virtual public AdminSvIf { kafka_watcher_map_; // Lock for synchronizing access to kafka_watcher_map_ std::mutex kafka_watcher_lock_; + // + std::shared_ptr executor_; bool backupDBHelper(const std::string& db_name, const std::string& backup_dir, @@ -227,9 +233,6 @@ class AdminHandler : virtual public AdminSvIf { std::unique_ptr db_deletion_thread_; std::atomic stop_db_deletion_thread_; - - std::unique_ptr db_incremental_backup_thread_; - std::atomic stop_db_incremental_backup_thread_; }; } // namespace admin diff --git a/rocksdb_admin/application_db_backup_manager.cpp b/rocksdb_admin/application_db_backup_manager.cpp index e9a30119..86d33d42 100644 --- a/rocksdb_admin/application_db_backup_manager.cpp +++ b/rocksdb_admin/application_db_backup_manager.cpp @@ -29,6 +29,22 @@ using admin::DBMetaData; +DEFINE_bool(enable_async_incremental_backup_dbs, false, "Enable incremental backup for db files"); + +DEFINE_int32(async_incremental_backup_dbs_frequency_sec, 5, + "How frequently in sec to check the dbs need deleting in async way"); + +DEFINE_int32(async_incremental_backup_dbs_wait_sec, 60, + "How long in sec to wait between the dbs deletion"); + +DEFINE_string(s3_incre_backup_bucket, "pinterest-jackson", "The s3 bucket"); + +DEFINE_string(s3_incre_backup_prefix, "tmp/backup_test/incremental_backup_test/", + "The s3 key prefix for backup"); + +DEFINE_int32(incre_backup_limit_mbs, 100, "the rate limit for s3 client"); + +DEFINE_bool(incre_backup_include_meta, false, "whether to backup meta data on s3"); namespace admin { const std::string kMetaFilename = "dbmeta"; @@ -38,45 +54,41 @@ const int kS3UtilRecheckSec = 5; ApplicationDBBackupManager::ApplicationDBBackupManager( std::shared_ptr db_manager, + std::shared_ptr executor, + std::shared_ptr meta_db, + std::shared_ptr> db_admin_lock, const std::string& rocksdb_dir, - uint32_t checkpoint_backup_batch_num_upload, - const std::string& s3_bucket, - const std::string& s3_backup_dir_prefix, - const std::string& snapshot_host_port, - uint32_t limit_mbs, - bool include_meta) - : db_manager_(std::move(db_manager)) - , rocksdb_dir_(rocksdb_dir) - , checkpoint_backup_batch_num_upload_(checkpoint_backup_batch_num_upload) - , s3_bucket_(s3_bucket) - , s3_backup_dir_prefix_(s3_backup_dir_prefix) - , snapshot_host_port_(snapshot_host_port) - , limit_mbs_(limit_mbs) - , include_meta_(include_meta) - , s3_util_() - , s3_util_lock_() {} + const int32_t checkpoint_backup_batch_num_upload) + : db_manager_(db_manager) + , executor_(executor) + , meta_db_(meta_db) + , db_admin_lock_(db_admin_lock) + , rocksdb_dir_(rocksdb_dir) + , checkpoint_backup_batch_num_upload_(checkpoint_backup_batch_num_upload) + , s3_util_() + , s3_util_lock_() + , stop_db_incremental_backup_thread_(false) { + db_incremental_backup_thread_ = std::make_unique([this] { + if (!folly::setThreadName("DBBackuper")) { + LOG(ERROR) << "Failed to set thread name for DB backup thread"; + } -ApplicationDBBackupManager::ApplicationDBBackupManager( - std::shared_ptr db_manager, - const std::string& rocksdb_dir, - uint32_t checkpoint_backup_batch_num_upload) - : db_manager_(std::move(db_manager)) - , rocksdb_dir_(rocksdb_dir) - , checkpoint_backup_batch_num_upload_(checkpoint_backup_batch_num_upload) - , s3_util_() - , s3_util_lock_() {} - -void ApplicationDBBackupManager::setS3Config( - const std::string& s3_bucket, - const std::string& s3_backup_dir_prefix, - const std::string& snapshot_host_port, - const uint32_t limit_mbs, - const bool include_meta) { - s3_bucket_ = std::move(s3_bucket); - s3_backup_dir_prefix_ = std::move(s3_backup_dir_prefix); - snapshot_host_port_ = std::move(snapshot_host_port); - limit_mbs_ = limit_mbs; - include_meta_ = include_meta; + LOG(INFO) << "Starting DB backup thread ..."; + while (!stop_db_incremental_backup_thread_.load()) { + std::this_thread::sleep_for(std::chrono::seconds(FLAGS_async_incremental_backup_dbs_frequency_sec)); + backupAllDBsToS3(); + } + LOG(INFO) << "Stopping DB backup thread ..."; + }); +} + +ApplicationDBBackupManager::~ApplicationDBBackupManager() { + stop_db_incremental_backup_thread_ = true; + db_incremental_backup_thread_->join(); +} + +void ApplicationDBBackupManager::stopBackup() { + stop_db_incremental_backup_thread_ = true; } inline std::string ensure_ends_with_pathsep(const std::string& s) { @@ -124,16 +136,11 @@ std::shared_ptr ApplicationDBBackupManager::createLocalS3Util( return local_s3_util; } -bool ApplicationDBBackupManager::backupDBToS3( - const std::shared_ptr& db, - CPUThreadPoolExecutor* executor, - std::unique_ptr& meta_db, - common::ObjectLock& db_admin_lock) { +bool ApplicationDBBackupManager::backupDBToS3(const std::shared_ptr& db) { // ::admin::AdminException e; common::Timer timer(kS3BackupMs); - LOG(INFO) << "S3 Backup " << db->db_name() << " to " << ensure_ends_with_pathsep(s3_backup_dir_prefix_) << db->db_name() - << "/" << ensure_ends_with_pathsep(snapshot_host_port_); + LOG(INFO) << "S3 Backup " << db->db_name() << " to " << ensure_ends_with_pathsep(FLAGS_s3_incre_backup_prefix) << db->db_name(); auto ts = common::timeutil::GetCurrentTimestamp(); auto local_path = folly::stringPrintf("%ss3_tmp/%s%d/", rocksdb_dir_.c_str(), db->db_name().c_str(), ts); boost::system::error_code remove_err; @@ -147,8 +154,8 @@ bool ApplicationDBBackupManager::backupDBToS3( return false; } - db_admin_lock.Lock(db->db_name()); - SCOPE_EXIT { db_admin_lock.Unlock(db->db_name()); }; + db_admin_lock_->Lock(db->db_name()); + SCOPE_EXIT { db_admin_lock_->Unlock(db->db_name()); }; rocksdb::Checkpoint* checkpoint; auto status = rocksdb::Checkpoint::Create(db->rocksdb(), &checkpoint); @@ -179,10 +186,9 @@ bool ApplicationDBBackupManager::backupDBToS3( } // Upload checkpoint to s3 - auto local_s3_util = createLocalS3Util(limit_mbs_, s3_bucket_); - std::string formatted_s3_dir_path = folly::stringPrintf("%s%s/%s%d/", - ensure_ends_with_pathsep(s3_backup_dir_prefix_).c_str(), db->db_name().c_str(), - ensure_ends_with_pathsep(snapshot_host_port_).c_str(), ts); + auto local_s3_util = createLocalS3Util(FLAGS_incre_backup_limit_mbs, FLAGS_s3_incre_backup_bucket); + std::string formatted_s3_dir_path = folly::stringPrintf("%s%s/%d/", + ensure_ends_with_pathsep(FLAGS_s3_incre_backup_prefix).c_str(), db->db_name().c_str(), ts); std::string formatted_checkpoint_local_path = ensure_ends_with_pathsep(checkpoint_local_path); auto upload_func = [&](const std::string& dest, const std::string& source) { LOG(INFO) << "Copying " << source << " to " << dest; @@ -212,7 +218,7 @@ bool ApplicationDBBackupManager::backupDBToS3( auto p = folly::Promise(); futures.push_back(p.getFuture()); - executor->add( + executor_->add( [&, files = std::move(files), p = std::move(p)]() mutable { for (const auto& file : files) { if (!upload_func(formatted_s3_dir_path + file, formatted_checkpoint_local_path + file)) { @@ -250,13 +256,13 @@ bool ApplicationDBBackupManager::backupDBToS3( } } - if (include_meta_) { + if (FLAGS_incre_backup_include_meta) { DBMetaData meta; meta.db_name = db->db_name(); std::string buffer; rocksdb::ReadOptions options; - auto s = meta_db->Get(options, db->db_name(), &buffer); + auto s = meta_db_->Get(options, db->db_name(), &buffer); if (s.ok()) { apache::thrift::CompactSerializer::deserialize(buffer, meta); } @@ -284,11 +290,8 @@ bool ApplicationDBBackupManager::backupDBToS3( boost::filesystem::remove_all(local_path); } -bool ApplicationDBBackupManager::backupAllDBsToS3( - CPUThreadPoolExecutor* executor, - std::unique_ptr& meta_db, - common::ObjectLock& db_admin_lock) { - if (s3_bucket_.empty() || s3_backup_dir_prefix_.empty() || snapshot_host_port_.empty() || !limit_mbs_) { +bool ApplicationDBBackupManager::backupAllDBsToS3() { + if (FLAGS_s3_incre_backup_bucket.empty() || FLAGS_s3_incre_backup_prefix.empty() || !FLAGS_incre_backup_limit_mbs) { LOG(INFO) << "S3 config has not been set, so incremental backup cannot be triggered"; return false; } @@ -296,7 +299,7 @@ bool ApplicationDBBackupManager::backupAllDBsToS3( bool ret = true; for (const auto& db : db_manager_->getAllDBs()) { LOG(INFO) << "Incremental backup for " << db.first; - if(backupDBToS3(db.second, executor, meta_db, db_admin_lock)) { + if(backupDBToS3(db.second)) { LOG(INFO) << "Backup for " << db.first << " succeeds"; } else { LOG(INFO) << "Backup for " << db.first << " fails"; diff --git a/rocksdb_admin/application_db_backup_manager.h b/rocksdb_admin/application_db_backup_manager.h index 6d971a8e..bcae3fc5 100644 --- a/rocksdb_admin/application_db_backup_manager.h +++ b/rocksdb_admin/application_db_backup_manager.h @@ -37,37 +37,40 @@ using folly::CPUThreadPoolExecutor; using wangle::CPUThreadPoolExecutor; #endif +DECLARE_bool(enable_async_incremental_backup_dbs); + +DECLARE_int32(async_incremental_backup_dbs_frequency_sec); + +DECLARE_int32(async_incremental_backup_dbs_wait_sec); + +DECLARE_string(s3_incre_backup_bucket); + +DECLARE_string(s3_incre_backup_prefix); + +DECLARE_int32(incre_backup_limit_mbs); + +DECLARE_bool(incre_backup_include_meta); + namespace admin { class ApplicationDBBackupManager { public: - ApplicationDBBackupManager(std::shared_ptr db_manager, - const std::string& rocksdb_dir, - uint32_t checkpoint_backup_batch_num_upload, - const std::string& s3_bucket, - const std::string& s3_backup_dir_prefix, - const std::string& snapshot_host_port, - uint32_t limit_mbs, - bool include_meta); - ApplicationDBBackupManager(std::shared_ptr db_manager, - const std::string& rocksdb_dir, - uint32_t checkpoint_backup_batch_num_upload); + ApplicationDBBackupManager( + std::shared_ptr db_manager, + std::shared_ptr executor, + std::shared_ptr meta_db, + std::shared_ptr> db_admin_lock, + const std::string& rocksdb_dir, + const int32_t checkpoint_backup_batch_num_upload); - void setS3Config(const std::string& s3_bucket, - const std::string& s3_backup_dir_prefix, - const std::string& snapshot_host_port, - const uint32_t limit_mbs, - const bool include_meta); + ~ApplicationDBBackupManager(); - bool backupAllDBsToS3(CPUThreadPoolExecutor* executor, - std::unique_ptr& meta_db, - common::ObjectLock& db_admin_lock); + void stopBackup(); - bool backupDBToS3(const std::shared_ptr& db, - CPUThreadPoolExecutor* executor, - std::unique_ptr& meta_db, - common::ObjectLock& db_admin_lock); + bool backupAllDBsToS3(); + + bool backupDBToS3(const std::shared_ptr& db); private: // copy from admin_hamdler.. @@ -75,16 +78,17 @@ class ApplicationDBBackupManager { const std::string& s3_bucket); std::shared_ptr db_manager_; + std::shared_ptr executor_; + std::shared_ptr meta_db_; + std::shared_ptr> db_admin_lock_; std::string rocksdb_dir_; - uint32_t checkpoint_backup_batch_num_upload_; - std::string s3_bucket_; - std::string s3_backup_dir_prefix_; - std::string snapshot_host_port_; - uint32_t limit_mbs_; - bool include_meta_; + int32_t checkpoint_backup_batch_num_upload_; std::shared_ptr s3_util_; mutable std::mutex s3_util_lock_; + + std::unique_ptr db_incremental_backup_thread_; + std::atomic stop_db_incremental_backup_thread_; }; } \ No newline at end of file diff --git a/rocksdb_admin/tests/admin_handler_test.cpp b/rocksdb_admin/tests/admin_handler_test.cpp index 83fbedd0..be151465 100644 --- a/rocksdb_admin/tests/admin_handler_test.cpp +++ b/rocksdb_admin/tests/admin_handler_test.cpp @@ -21,6 +21,7 @@ #include #include +#include "common/s3util.h" #include "boost/filesystem.hpp" #include "folly/SocketAddress.h" #include "gtest/gtest.h" @@ -142,6 +143,13 @@ namespace admin { class AdminHandlerTestBase : public testing::Test { public: AdminHandlerTestBase() { + const testing::TestInfo* const test_info = + testing::UnitTest::GetInstance()->current_test_info(); + std:string back_up_test_name = "ApplicationDBBackupManagerTest"; + if (strcmp(test_info->name(), back_up_test_name.c_str())) { + FLAGS_enable_async_incremental_backup_dbs = true; + FLAGS_async_incremental_backup_dbs_frequency_sec = 1; + } // setup local , s3 paths for test FLAGS_rocksdb_dir = testDir(); FLAGS_s3_backup_prefix = @@ -892,16 +900,12 @@ TEST_F(AdminHandlerTestBase, ApplicationDBBackupManagerTest) { // use incremental backup handler_->writeMetaData(testdb, "fakes3bucket", "fakes3path"); - std::string s3_bucket = FLAGS_s3_bucket; - std::string s3_backup_dir_prefix = "backup/cluster1/"; - std::string snapshot_host_port = "host_8080"; - const uint32_t limit_mbs = 100; - const bool include_meta = false; - std::string s3_path = s3_backup_dir_prefix + testdb + "/" + snapshot_host_port + "/"; + std::this_thread::sleep_for(std::chrono::seconds(FLAGS_async_incremental_backup_dbs_frequency_sec * 2)); - handler_->setS3Config(s3_bucket, s3_backup_dir_prefix, snapshot_host_port, limit_mbs, include_meta); - EXPECT_TRUE(handler_->backupAllDBsToS3()); - EXPECT_TRUE(handler_->checkS3Object(limit_mbs, s3_bucket, s3_path)); + std::string s3_path = FLAGS_s3_incre_backup_prefix + testdb + "/"; + auto local_s3_util = common::S3Util::BuildS3Util(FLAGS_incre_backup_limit_mbs, FLAGS_s3_incre_backup_bucket); + auto resp = local_s3_util->listAllObjects(s3_path); + EXPECT_TRUE(resp.Body().objects.size() > 0); } From a8e4fda87690a6daf63fe9ba76180dff2d634a6f Mon Sep 17 00:00:00 2001 From: qiyanghe1998 Date: Fri, 29 Jul 2022 23:40:09 +0000 Subject: [PATCH 06/15] make variables from shared_ptr to raw_ptr in backup manager & clean code --- rocksdb_admin/admin_handler.cpp | 80 ++++++++++--------- rocksdb_admin/admin_handler.h | 10 +-- .../application_db_backup_manager.cpp | 6 +- rocksdb_admin/application_db_backup_manager.h | 12 +-- 4 files changed, 54 insertions(+), 54 deletions(-) diff --git a/rocksdb_admin/admin_handler.cpp b/rocksdb_admin/admin_handler.cpp index 554725cd..4a048113 100644 --- a/rocksdb_admin/admin_handler.cpp +++ b/rocksdb_admin/admin_handler.cpp @@ -379,6 +379,16 @@ bool DeserializeKafkaPayload( } } +CPUThreadPoolExecutor* S3UploadAndDownloadExecutor() { + static CPUThreadPoolExecutor executor( + FLAGS_num_s3_upload_download_threads, + std::make_unique>( + FLAGS_max_s3_upload_download_task_queue_size), + std::make_shared("s3-upload-download")); + + return &executor; +} + // The dbs moved to db_tmp/ shouldnt be re-used or re-opened, so we can // delete them via boost filesystem operations rather than rocksdb::DestroyDB() void deleteTmpDBs() { @@ -410,7 +420,7 @@ void deleteTmpDBs() { namespace admin { AdminHandler::AdminHandler( - std::shared_ptr db_manager, + std::unique_ptr db_manager, RocksDBOptionsGeneratorType rocksdb_options): AdminHandler( std::move(db_manager), [rocksdb_options](const std::string& dataset, const std::string& db) { @@ -418,28 +428,24 @@ AdminHandler::AdminHandler( }) {} AdminHandler::AdminHandler( - std::shared_ptr db_manager, + std::unique_ptr db_manager, RocksDBOptionsGenerator rocksdb_options) - : db_admin_lock_(std::make_shared>()) - , db_manager_(db_manager) + : db_admin_lock_() + , db_manager_(std::move(db_manager)) , rocksdb_options_(std::move(rocksdb_options)) , s3_util_() , s3_util_lock_() , meta_db_(OpenMetaDB()) , allow_overlapping_keys_segments_() , num_current_s3_sst_downloadings_(0) - , stop_db_deletion_thread_(false) - , executor_(new CPUThreadPoolExecutor(FLAGS_num_s3_upload_download_threads, - std::make_unique>( - FLAGS_max_s3_upload_download_task_queue_size), - std::make_shared("s3-upload-download"))) { + , stop_db_deletion_thread_(false) { if (db_manager_ == nullptr) { db_manager_ = CreateDBBasedOnConfig(rocksdb_options_); } if (FLAGS_enable_async_incremental_backup_dbs) { - backup_manager_ = std::make_unique(db_manager, executor_, meta_db_, - db_admin_lock_, FLAGS_rocksdb_dir, FLAGS_checkpoint_backup_batch_num_upload); + backup_manager_ = std::make_unique(db_manager_.get(), S3UploadAndDownloadExecutor(), + meta_db_, &db_admin_lock_, FLAGS_rocksdb_dir, FLAGS_checkpoint_backup_batch_num_upload); } folly::splitTo( @@ -563,8 +569,8 @@ void AdminHandler::async_tm_addDB( std::unique_ptr>> callback, std::unique_ptr request) { - db_admin_lock_->Lock(request->db_name); - SCOPE_EXIT { db_admin_lock_->Unlock(request->db_name); }; + db_admin_lock_.Lock(request->db_name); + SCOPE_EXIT { db_admin_lock_.Unlock(request->db_name); }; AdminException e; auto db = getDB(request->db_name, &e); @@ -667,8 +673,8 @@ bool AdminHandler::backupDBHelper(const std::string& db_name, bool include_meta, AdminException* e) { CHECK(env_holder != nullptr); - db_admin_lock_->Lock(db_name); - SCOPE_EXIT { db_admin_lock_->Unlock(db_name); }; + db_admin_lock_.Lock(db_name); + SCOPE_EXIT { db_admin_lock_.Unlock(db_name); }; auto db = getDB(db_name, e); if (db == nullptr) { @@ -738,8 +744,8 @@ bool AdminHandler::restoreDBHelper(const std::string& db_name, const uint32_t restore_rate_limit, AdminException* e) { assert(env_holder != nullptr); - db_admin_lock_->Lock(db_name); - SCOPE_EXIT { db_admin_lock_->Unlock(db_name); }; + db_admin_lock_.Lock(db_name); + SCOPE_EXIT { db_admin_lock_.Unlock(db_name); }; auto db = db_manager_->getDB(db_name, nullptr); if (db) { @@ -958,8 +964,8 @@ void AdminHandler::async_tm_backupDBToS3( } if (FLAGS_enable_checkpoint_backup) { - db_admin_lock_->Lock(request->db_name); - SCOPE_EXIT { db_admin_lock_->Unlock(request->db_name); }; + db_admin_lock_.Lock(request->db_name); + SCOPE_EXIT { db_admin_lock_.Unlock(request->db_name); }; auto db = getDB(request->db_name, &e); if (db == nullptr) { @@ -1029,7 +1035,7 @@ void AdminHandler::async_tm_backupDBToS3( auto p = folly::Promise(); futures.push_back(p.getFuture()); - executor_->add( + S3UploadAndDownloadExecutor()->add( [&, files = std::move(files), p = std::move(p)]() mutable { for (const auto& file : files) { if (!upload_func(formatted_s3_dir_path + file, formatted_checkpoint_local_path + file)) { @@ -1210,7 +1216,7 @@ void AdminHandler::async_tm_restoreDBFromS3( auto p = folly::Promise(); futures.push_back(p.getFuture()); - executor_->add( + S3UploadAndDownloadExecutor()->add( [&, files = std::move(files), p = std::move(p)]() mutable { for (const auto& file : files) { if (!download_func(file)) { @@ -1386,8 +1392,8 @@ void AdminHandler::async_tm_closeDB( std::unique_ptr>> callback, std::unique_ptr request) { - db_admin_lock_->Lock(request->db_name); - SCOPE_EXIT { db_admin_lock_->Unlock(request->db_name); }; + db_admin_lock_.Lock(request->db_name); + SCOPE_EXIT { db_admin_lock_.Unlock(request->db_name); }; AdminException e; if (removeDB(request->db_name, &e) == nullptr) { @@ -1402,8 +1408,8 @@ void AdminHandler::async_tm_changeDBRoleAndUpStream( std::unique_ptr>> callback, std::unique_ptr request) { - db_admin_lock_->Lock(request->db_name); - SCOPE_EXIT { db_admin_lock_->Unlock(request->db_name); }; + db_admin_lock_.Lock(request->db_name); + SCOPE_EXIT { db_admin_lock_.Unlock(request->db_name); }; AdminException e; replicator::ReplicaRole new_role; @@ -1469,8 +1475,8 @@ void AdminHandler::async_tm_clearDB( std::unique_ptr>> callback, std::unique_ptr request) { - db_admin_lock_->Lock(request->db_name); - SCOPE_EXIT { db_admin_lock_->Unlock(request->db_name); }; + db_admin_lock_.Lock(request->db_name); + SCOPE_EXIT { db_admin_lock_.Unlock(request->db_name); }; bool need_to_reopen = false; replicator::ReplicaRole db_role; @@ -1602,8 +1608,8 @@ void AdminHandler::async_tm_addS3SstFilesToDB( admin::AdminException e; e.errorCode = AdminErrorCode::DB_ADMIN_ERROR; - db_admin_lock_->Lock(request->db_name); - SCOPE_EXIT { db_admin_lock_->Unlock(request->db_name); }; + db_admin_lock_.Lock(request->db_name); + SCOPE_EXIT { db_admin_lock_.Unlock(request->db_name); }; auto db = getDB(request->db_name, nullptr); if (db == nullptr) { @@ -1833,8 +1839,8 @@ void AdminHandler::async_tm_startMessageIngestion( << "serverset path: " << kafka_broker_serverset_path << ", " << "replay_timestamp_ms: " << replay_timestamp_ms; - db_admin_lock_->Lock(db_name); - SCOPE_EXIT { db_admin_lock_->Unlock(db_name); }; + db_admin_lock_.Lock(db_name); + SCOPE_EXIT { db_admin_lock_.Unlock(db_name); }; auto db = getDB(db_name, &e); if (db == nullptr) { e.message = db_name + " doesn't exist."; @@ -2056,8 +2062,8 @@ void AdminHandler::async_tm_stopMessageIngestion( const auto db_name = request->db_name; LOG(ERROR) << "Called stopMessageIngestion for " << db_name; - db_admin_lock_->Lock(db_name); - SCOPE_EXIT { db_admin_lock_->Unlock(db_name); }; + db_admin_lock_.Lock(db_name); + SCOPE_EXIT { db_admin_lock_.Unlock(db_name); }; auto db = getDB(db_name, &e); if (db == nullptr) { e.message = db_name + " doesn't exist."; @@ -2102,8 +2108,8 @@ void AdminHandler::async_tm_setDBOptions( for (auto& option_pair : request->options) { options.emplace(std::move(option_pair)); } - db_admin_lock_->Lock(request->db_name); - SCOPE_EXIT { db_admin_lock_->Unlock(request->db_name); }; + db_admin_lock_.Lock(request->db_name); + SCOPE_EXIT { db_admin_lock_.Unlock(request->db_name); }; ::admin::AdminException e; auto db = getDB(request->db_name, &e); if (db == nullptr) { @@ -2158,8 +2164,4 @@ std::vector AdminHandler::getAllDBNames() { return db_manager_->getAllDBNames(); } -bool AdminHandler::backupAllDBsToS3() { - return backup_manager_->backupAllDBsToS3(); -} - } // namespace admin diff --git a/rocksdb_admin/admin_handler.h b/rocksdb_admin/admin_handler.h index f4119f0b..d338a4bc 100644 --- a/rocksdb_admin/admin_handler.h +++ b/rocksdb_admin/admin_handler.h @@ -68,11 +68,11 @@ class AdminHandler : virtual public AdminSvIf { public: // TODO deprecate after getting rid of all callsites AdminHandler( - std::shared_ptr db_manager, + std::unique_ptr db_manager, RocksDBOptionsGeneratorType rocksdb_options); AdminHandler( - std::shared_ptr db_manager, + std::unique_ptr db_manager, RocksDBOptionsGenerator rocksdb_options); virtual ~AdminHandler(); @@ -175,7 +175,7 @@ class AdminHandler : virtual public AdminSvIf { // Lock to synchronize DB admin operations at per DB granularity. // Put db_admin_lock in protected to provide flexibility // of overriding some admin functions - std::shared_ptr> db_admin_lock_; + common::ObjectLock db_admin_lock_; private: std::unique_ptr removeDB(const std::string& db_name, @@ -188,7 +188,7 @@ class AdminHandler : virtual public AdminSvIf { const std::string& s3_path, const int64_t last_kafka_msg_timestamp_ms = -1); - std::shared_ptr db_manager_; + std::unique_ptr db_manager_; std::unique_ptr backup_manager_; RocksDBOptionsGenerator rocksdb_options_; // S3 util used for download @@ -208,8 +208,6 @@ class AdminHandler : virtual public AdminSvIf { kafka_watcher_map_; // Lock for synchronizing access to kafka_watcher_map_ std::mutex kafka_watcher_lock_; - // - std::shared_ptr executor_; bool backupDBHelper(const std::string& db_name, const std::string& backup_dir, diff --git a/rocksdb_admin/application_db_backup_manager.cpp b/rocksdb_admin/application_db_backup_manager.cpp index 86d33d42..fd5eade5 100644 --- a/rocksdb_admin/application_db_backup_manager.cpp +++ b/rocksdb_admin/application_db_backup_manager.cpp @@ -53,10 +53,10 @@ const std::string kS3BackupFailure = "s3_backup_failure"; const int kS3UtilRecheckSec = 5; ApplicationDBBackupManager::ApplicationDBBackupManager( - std::shared_ptr db_manager, - std::shared_ptr executor, + ApplicationDBManager* db_manager, + CPUThreadPoolExecutor* executor, std::shared_ptr meta_db, - std::shared_ptr> db_admin_lock, + common::ObjectLock* db_admin_lock, const std::string& rocksdb_dir, const int32_t checkpoint_backup_batch_num_upload) : db_manager_(db_manager) diff --git a/rocksdb_admin/application_db_backup_manager.h b/rocksdb_admin/application_db_backup_manager.h index bcae3fc5..cfab5049 100644 --- a/rocksdb_admin/application_db_backup_manager.h +++ b/rocksdb_admin/application_db_backup_manager.h @@ -57,10 +57,10 @@ class ApplicationDBBackupManager { public: ApplicationDBBackupManager( - std::shared_ptr db_manager, - std::shared_ptr executor, + ApplicationDBManager* db_manager, + CPUThreadPoolExecutor* executor, std::shared_ptr meta_db, - std::shared_ptr> db_admin_lock, + common::ObjectLock* db_admin_lock, const std::string& rocksdb_dir, const int32_t checkpoint_backup_batch_num_upload); @@ -77,10 +77,10 @@ class ApplicationDBBackupManager { std::shared_ptr createLocalS3Util(const uint32_t limit_mbs, const std::string& s3_bucket); - std::shared_ptr db_manager_; - std::shared_ptr executor_; + ApplicationDBManager* db_manager_; + CPUThreadPoolExecutor* executor_; std::shared_ptr meta_db_; - std::shared_ptr> db_admin_lock_; + common::ObjectLock* db_admin_lock_; std::string rocksdb_dir_; int32_t checkpoint_backup_batch_num_upload_; From a799e578d0eca87b393d79b7e57cebd165f319fe Mon Sep 17 00:00:00 2001 From: qiyanghe1998 Date: Mon, 1 Aug 2022 18:19:07 +0000 Subject: [PATCH 07/15] fix meta_db shared_ptr & clean code --- rocksdb_admin/admin_handler.cpp | 19 ++++++++++++++++- rocksdb_admin/admin_handler.h | 21 +------------------ .../application_db_backup_manager.cpp | 2 +- rocksdb_admin/application_db_backup_manager.h | 4 ++-- 4 files changed, 22 insertions(+), 24 deletions(-) diff --git a/rocksdb_admin/admin_handler.cpp b/rocksdb_admin/admin_handler.cpp index 4a048113..4dc1beaf 100644 --- a/rocksdb_admin/admin_handler.cpp +++ b/rocksdb_admin/admin_handler.cpp @@ -57,6 +57,12 @@ #include "rocksdb_replicator/rocksdb_replicator.h" #include "rocksdb_replicator/thrift/gen-cpp2/Replicator.h" #include "thrift/lib/cpp2/protocol/Serializer.h" +#if __GNUC__ >= 8 +#include "folly/executors/CPUThreadPoolExecutor.h" +#include "folly/system/ThreadName.h" +#else +#include "wangle/concurrent/CPUThreadPoolExecutor.h" +#endif DEFINE_string(hdfs_name_node, "hdfs://hbasebak-infra-namenode-prod1c01-001:8020", "The hdfs name node used for backup"); @@ -126,6 +132,17 @@ DEFINE_int32(async_delete_dbs_frequency_sec, DEFINE_int32(async_delete_dbs_wait_sec, 60, "How long in sec to wait between the dbs deletion"); + +#if __GNUC__ >= 8 +using folly::CPUThreadPoolExecutor; +using folly::LifoSemMPMCQueue; +using folly::QueueBehaviorIfFull; +#else +using wangle::CPUThreadPoolExecutor; +using wangle::LifoSemMPMCQueue; +using wangle::QueueBehaviorIfFull; +#endif + namespace { const std::string kMetaFilename = "dbmeta"; @@ -445,7 +462,7 @@ AdminHandler::AdminHandler( if (FLAGS_enable_async_incremental_backup_dbs) { backup_manager_ = std::make_unique(db_manager_.get(), S3UploadAndDownloadExecutor(), - meta_db_, &db_admin_lock_, FLAGS_rocksdb_dir, FLAGS_checkpoint_backup_batch_num_upload); + meta_db_.get(), &db_admin_lock_, FLAGS_rocksdb_dir, FLAGS_checkpoint_backup_batch_num_upload); } folly::splitTo( diff --git a/rocksdb_admin/admin_handler.h b/rocksdb_admin/admin_handler.h index d338a4bc..19cf6add 100644 --- a/rocksdb_admin/admin_handler.h +++ b/rocksdb_admin/admin_handler.h @@ -36,22 +36,6 @@ #include "rocksdb_admin/gen-cpp2/Admin.h" #endif #include "rocksdb/status.h" -#if __GNUC__ >= 8 -#include "folly/executors/CPUThreadPoolExecutor.h" -#include "folly/system/ThreadName.h" -#else -#include "wangle/concurrent/CPUThreadPoolExecutor.h" -#endif - -#if __GNUC__ >= 8 -using folly::CPUThreadPoolExecutor; -using folly::LifoSemMPMCQueue; -using folly::QueueBehaviorIfFull; -#else -using wangle::CPUThreadPoolExecutor; -using wangle::LifoSemMPMCQueue; -using wangle::QueueBehaviorIfFull; -#endif class KafkaWatcher; @@ -167,9 +151,6 @@ class AdminHandler : virtual public AdminSvIf { // Get all the db names held by the AdminHandler std::vector getAllDBNames(); - - // Used for testing incremental backup - bool backupAllDBsToS3(); protected: // Lock to synchronize DB admin operations at per DB granularity. @@ -196,7 +177,7 @@ class AdminHandler : virtual public AdminSvIf { // Lock for protecting the s3 util mutable std::mutex s3_util_lock_; // db that contains meta data for all local rocksdb instances - std::shared_ptr meta_db_; + std::unique_ptr meta_db_; // segments which allow for overlapping keys when adding SST files std::unordered_set allow_overlapping_keys_segments_; // number of the current concurrenty s3 downloadings diff --git a/rocksdb_admin/application_db_backup_manager.cpp b/rocksdb_admin/application_db_backup_manager.cpp index fd5eade5..b78dd08e 100644 --- a/rocksdb_admin/application_db_backup_manager.cpp +++ b/rocksdb_admin/application_db_backup_manager.cpp @@ -55,7 +55,7 @@ const int kS3UtilRecheckSec = 5; ApplicationDBBackupManager::ApplicationDBBackupManager( ApplicationDBManager* db_manager, CPUThreadPoolExecutor* executor, - std::shared_ptr meta_db, + rocksdb::DB* meta_db, common::ObjectLock* db_admin_lock, const std::string& rocksdb_dir, const int32_t checkpoint_backup_batch_num_upload) diff --git a/rocksdb_admin/application_db_backup_manager.h b/rocksdb_admin/application_db_backup_manager.h index cfab5049..1e733f5f 100644 --- a/rocksdb_admin/application_db_backup_manager.h +++ b/rocksdb_admin/application_db_backup_manager.h @@ -59,7 +59,7 @@ class ApplicationDBBackupManager { ApplicationDBBackupManager( ApplicationDBManager* db_manager, CPUThreadPoolExecutor* executor, - std::shared_ptr meta_db, + rocksdb::DB* meta_db, common::ObjectLock* db_admin_lock, const std::string& rocksdb_dir, const int32_t checkpoint_backup_batch_num_upload); @@ -79,7 +79,7 @@ class ApplicationDBBackupManager { ApplicationDBManager* db_manager_; CPUThreadPoolExecutor* executor_; - std::shared_ptr meta_db_; + rocksdb::DB* meta_db_; common::ObjectLock* db_admin_lock_; std::string rocksdb_dir_; int32_t checkpoint_backup_batch_num_upload_; From 1f678a717ccbd0bda8ee49f012eb322a2a0de436 Mon Sep 17 00:00:00 2001 From: qiyanghe1998 Date: Mon, 1 Aug 2022 23:48:37 +0000 Subject: [PATCH 08/15] temp --- rocksdb_admin/admin_handler.cpp | 6 ++ .../application_db_backup_manager.cpp | 94 +++++++++++++++++-- rocksdb_admin/application_db_backup_manager.h | 4 + 3 files changed, 97 insertions(+), 7 deletions(-) diff --git a/rocksdb_admin/admin_handler.cpp b/rocksdb_admin/admin_handler.cpp index 4dc1beaf..8b45615f 100644 --- a/rocksdb_admin/admin_handler.cpp +++ b/rocksdb_admin/admin_handler.cpp @@ -460,10 +460,16 @@ AdminHandler::AdminHandler( db_manager_ = CreateDBBasedOnConfig(rocksdb_options_); } +<<<<<<< HEAD if (FLAGS_enable_async_incremental_backup_dbs) { backup_manager_ = std::make_unique(db_manager_.get(), S3UploadAndDownloadExecutor(), meta_db_.get(), &db_admin_lock_, FLAGS_rocksdb_dir, FLAGS_checkpoint_backup_batch_num_upload); } +======= + backup_manager_ = std::make_unique( + std::move(db_manager), FLAGS_rocksdb_dir, FLAGS_checkpoint_backup_batch_num_upload, + FLAGS_s3_direct_io); +>>>>>>> temp folly::splitTo( ",", FLAGS_allow_overlapping_keys_segments, diff --git a/rocksdb_admin/application_db_backup_manager.cpp b/rocksdb_admin/application_db_backup_manager.cpp index b78dd08e..13d21346 100644 --- a/rocksdb_admin/application_db_backup_manager.cpp +++ b/rocksdb_admin/application_db_backup_manager.cpp @@ -45,12 +45,18 @@ DEFINE_string(s3_incre_backup_prefix, "tmp/backup_test/incremental_backup_test/" DEFINE_int32(incre_backup_limit_mbs, 100, "the rate limit for s3 client"); DEFINE_bool(incre_backup_include_meta, false, "whether to backup meta data on s3"); -namespace admin { + +namespace { const std::string kMetaFilename = "dbmeta"; const std::string kS3BackupMs = "s3_backup_ms"; const std::string kS3BackupFailure = "s3_backup_failure"; const int kS3UtilRecheckSec = 5; +const std::string backupDesc = "backup_descriptor"; + +} + +namespace admin { ApplicationDBBackupManager::ApplicationDBBackupManager( ApplicationDBManager* db_manager, @@ -136,9 +142,42 @@ std::shared_ptr ApplicationDBBackupManager::createLocalS3Util( return local_s3_util; } +// convert the string to a backup_descriptor map, if any error happens, it will return an empty map +void parseBackupDescriptor(const std::string& contents, std::unordered_map* file_to_ts) { + // Example: + // contents = "file1=1;file2=2;file3=3;" + size_t pos = 0; + + while (pos < contents.size()) { + size_t eq_pos = contents.find("=", pos); + if (eq_pos == std::string::npos) { + file_to_ts->clear(); + return; + } + std::string key = contents.substr(pos, eq_pos - pos); + if (key.empty()) { + file_to_ts->clear(); + return; + } + + size_t semi_pos = contents.find(";", eq_pos); + if (semi_pos == std::string::npos) { + file_to_ts->clear(); + return; + } + std::string value_string = contents.substr(eq_pos + 1, semi_pos - eq_pos - 1); + if (value_string.empty()) { + file_to_ts->clear(); + return; + } + int64_t value = std::stoll(value_string, nullptr, 10); + + file_to_ts->emplace(key, value); + } +} + bool ApplicationDBBackupManager::backupDBToS3(const std::shared_ptr& db) { // ::admin::AdminException e; - common::Timer timer(kS3BackupMs); LOG(INFO) << "S3 Backup " << db->db_name() << " to " << ensure_ends_with_pathsep(FLAGS_s3_incre_backup_prefix) << db->db_name(); auto ts = common::timeutil::GetCurrentTimestamp(); @@ -185,9 +224,50 @@ bool ApplicationDBBackupManager::backupDBToS3(const std::shared_ptrdb_name()); + if (itor != db_backups_.end()) { + last_ts = itor->second[(int)itor->second.size()-1]; + } + + // if there exists last backup, we need to get the backup_descriptor auto local_s3_util = createLocalS3Util(FLAGS_incre_backup_limit_mbs, FLAGS_s3_incre_backup_bucket); - std::string formatted_s3_dir_path = folly::stringPrintf("%s%s/%d/", + + if (last_ts >= 0) { + auto local_path_last_backup = folly::stringPrintf("%ss3_tmp/%s%d/", rocksdb_dir_.c_str(), db->db_name().c_str(), last_ts); + std::string formatted_s3_last_backup_dir_path = folly::stringPrintf("%s%s/%d/", + ensure_ends_with_pathsep(FLAGS_s3_incre_backup_prefix).c_str(), db->db_name().c_str(), last_ts); + bool need_download = !(boost::filesystem::is_directory(local_path_last_backup)); + + if (need_download) + auto resp = local_s3_util->getObject(formatted_s3_last_backup_dir_path + backupDesc, + local_path_last_backup + backupDesc, s3_direct_io_); + + std::string last_backup_desc_contents; + common::FileUtil::readFileToString(local_path_last_backup + backupDesc, &last_backup_desc_contents); + std::unordered_map file_to_ts; + parseBackupDescriptor(last_backup_desc_contents, &file_to_ts); + if (file_to_ts.empty()) { + LOG(INFO) << "The backup of " << db->db_name() << " on timestamp " << last_ts << " is broken"; + } + + // Remove all the duplicate files + for (size_t i = 0; i < checkpoint_files.size(); ++i) { + auto& file = checkpoint_files[i]; + if (file == "." || file == "..") { + continue; + } else if (file_to_ts.find(file) != file_to_ts.end()) { + // if (file == ".") + } + } + // checkpoint_files.erase(std::remove_if(checkpoint_files.begin(), checkpoint_files.end(), [](string file){ + // return file_to_ts.find(file) != file_to_ts.end();}), checkpoint_files.end()); + + } + + // Upload checkpoint to s3 + std::string formatted_s3_dir_path_upload = folly::stringPrintf("%s%s/%d/", ensure_ends_with_pathsep(FLAGS_s3_incre_backup_prefix).c_str(), db->db_name().c_str(), ts); std::string formatted_checkpoint_local_path = ensure_ends_with_pathsep(checkpoint_local_path); auto upload_func = [&](const std::string& dest, const std::string& source) { @@ -221,7 +301,7 @@ bool ApplicationDBBackupManager::backupDBToS3(const std::shared_ptradd( [&, files = std::move(files), p = std::move(p)]() mutable { for (const auto& file : files) { - if (!upload_func(formatted_s3_dir_path + file, formatted_checkpoint_local_path + file)) { + if (!upload_func(formatted_s3_dir_path_upload + file, formatted_checkpoint_local_path + file)) { p.setValue(false); return; } @@ -245,7 +325,7 @@ bool ApplicationDBBackupManager::backupDBToS3(const std::shared_ptrIncr(kS3BackupFailure); return false; } - if (!upload_func(formatted_s3_dir_path + kMetaFilename, dbmeta_path)) { + if (!upload_func(formatted_s3_dir_path_upload + kMetaFilename, dbmeta_path)) { // SetException("Error happened when upload meta from checkpoint to S3", // AdminErrorCode::DB_ADMIN_ERROR, &callback); common::Stats::get()->Incr(kS3BackupFailure); diff --git a/rocksdb_admin/application_db_backup_manager.h b/rocksdb_admin/application_db_backup_manager.h index 1e733f5f..0c6d45a2 100644 --- a/rocksdb_admin/application_db_backup_manager.h +++ b/rocksdb_admin/application_db_backup_manager.h @@ -84,6 +84,10 @@ class ApplicationDBBackupManager { std::string rocksdb_dir_; int32_t checkpoint_backup_batch_num_upload_; + // used to store all backups for each db. + // I only store the timestamps since I use that to name different backups. + std::unordered_map> db_backups_; + std::shared_ptr s3_util_; mutable std::mutex s3_util_lock_; From f1c22cec03d026324b491980c056b9dba5492ef1 Mon Sep 17 00:00:00 2001 From: qiyanghe1998 Date: Tue, 2 Aug 2022 22:06:24 +0000 Subject: [PATCH 09/15] finish backup-descriptor --- rocksdb_admin/admin_handler.cpp | 6 -- .../application_db_backup_manager.cpp | 74 ++++++++++++++----- 2 files changed, 55 insertions(+), 25 deletions(-) diff --git a/rocksdb_admin/admin_handler.cpp b/rocksdb_admin/admin_handler.cpp index 8b45615f..4dc1beaf 100644 --- a/rocksdb_admin/admin_handler.cpp +++ b/rocksdb_admin/admin_handler.cpp @@ -460,16 +460,10 @@ AdminHandler::AdminHandler( db_manager_ = CreateDBBasedOnConfig(rocksdb_options_); } -<<<<<<< HEAD if (FLAGS_enable_async_incremental_backup_dbs) { backup_manager_ = std::make_unique(db_manager_.get(), S3UploadAndDownloadExecutor(), meta_db_.get(), &db_admin_lock_, FLAGS_rocksdb_dir, FLAGS_checkpoint_backup_batch_num_upload); } -======= - backup_manager_ = std::make_unique( - std::move(db_manager), FLAGS_rocksdb_dir, FLAGS_checkpoint_backup_batch_num_upload, - FLAGS_s3_direct_io); ->>>>>>> temp folly::splitTo( ",", FLAGS_allow_overlapping_keys_segments, diff --git a/rocksdb_admin/application_db_backup_manager.cpp b/rocksdb_admin/application_db_backup_manager.cpp index 13d21346..cfc6d193 100644 --- a/rocksdb_admin/application_db_backup_manager.cpp +++ b/rocksdb_admin/application_db_backup_manager.cpp @@ -46,6 +46,8 @@ DEFINE_int32(incre_backup_limit_mbs, 100, "the rate limit for s3 client"); DEFINE_bool(incre_backup_include_meta, false, "whether to backup meta data on s3"); +DECLARE_bool(s3_direct_io); + namespace { const std::string kMetaFilename = "dbmeta"; @@ -104,6 +106,17 @@ inline std::string ensure_ends_with_pathsep(const std::string& s) { return s; } +inline bool ends_with(const std::string& str, const std::string& suffix) { + return str.size() >= suffix.size() && + 0 == str.compare(str.size()-suffix.size(), suffix.size(), suffix); +} + +// No checksum for current implementation, it needs to re-implement if checksum is enabled in sst names. +inline int64_t parse_sst_id(const std::string& sst_name) { + // 123.sst (w/o checksum) + return std::stoll(sst_name.substr(sst_name.find_first_of('.'))); +} + // copy from admin_hamdler.cpp inline bool should_new_s3_client( const common::S3Util& s3_util, const uint32_t limit_mbs, const std::string& s3_bucket) { @@ -143,15 +156,14 @@ std::shared_ptr ApplicationDBBackupManager::createLocalS3Util( } // convert the string to a backup_descriptor map, if any error happens, it will return an empty map -void parseBackupDescriptor(const std::string& contents, std::unordered_map* file_to_ts) { +void parseBackupDesc(const std::string& contents, std::unordered_map* file_to_ts) { // Example: // contents = "file1=1;file2=2;file3=3;" size_t pos = 0; while (pos < contents.size()) { size_t eq_pos = contents.find("=", pos); - if (eq_pos == std::string::npos) { - file_to_ts->clear(); + if (eq_pos == std::string::npos) { // finish parsing return; } std::string key = contents.substr(pos, eq_pos - pos); @@ -176,6 +188,15 @@ void parseBackupDescriptor(const std::string& contents, std::unordered_map& file_to_ts, int64_t sst_id_min, int64_t sst_id_max, std::string& contents) { + contents.clear(); + for (auto& item : file_to_ts) { + contents += item.first + "=" + std::to_string(item.second) + ";"; + } + contents += std::to_string(sst_id_min) + "-" + std::to_string(sst_id_max) + ";"; +} + bool ApplicationDBBackupManager::backupDBToS3(const std::shared_ptr& db) { // ::admin::AdminException e; common::Timer timer(kS3BackupMs); @@ -233,6 +254,13 @@ bool ApplicationDBBackupManager::backupDBToS3(const std::shared_ptrdb_name().c_str(), ts); + std::string formatted_checkpoint_local_path = ensure_ends_with_pathsep(checkpoint_local_path); + + std::unordered_map file_to_ts; + std::unordered_map new_file_to_ts; + const string sst_suffix = ".sst"; if (last_ts >= 0) { auto local_path_last_backup = folly::stringPrintf("%ss3_tmp/%s%d/", rocksdb_dir_.c_str(), db->db_name().c_str(), last_ts); @@ -242,34 +270,42 @@ bool ApplicationDBBackupManager::backupDBToS3(const std::shared_ptrgetObject(formatted_s3_last_backup_dir_path + backupDesc, - local_path_last_backup + backupDesc, s3_direct_io_); + local_path_last_backup + backupDesc, FLAGS_s3_direct_io); std::string last_backup_desc_contents; common::FileUtil::readFileToString(local_path_last_backup + backupDesc, &last_backup_desc_contents); - std::unordered_map file_to_ts; - parseBackupDescriptor(last_backup_desc_contents, &file_to_ts); + + parseBackupDesc(last_backup_desc_contents, &file_to_ts); if (file_to_ts.empty()) { LOG(INFO) << "The backup of " << db->db_name() << " on timestamp " << last_ts << " is broken"; } // Remove all the duplicate files - for (size_t i = 0; i < checkpoint_files.size(); ++i) { - auto& file = checkpoint_files[i]; - if (file == "." || file == "..") { - continue; - } else if (file_to_ts.find(file) != file_to_ts.end()) { - // if (file == ".") - } + checkpoint_files.erase(std::remove_if(checkpoint_files.begin(), checkpoint_files.end(), [&](string file){ + return file_to_ts.find(file) != file_to_ts.end() && ends_with(file, sst_suffix);}), checkpoint_files.end()); + + new_file_to_ts = std::move(file_to_ts); + } + + // Get the min_id and max_id for backup descriptor + int64_t sst_id_min = INT32_MAX; + int64_t sst_id_max = 0; + for (auto& file : checkpoint_files) { + if (ends_with(file, sst_suffix)) { + int64_t file_id = parse_sst_id(file); + new_file_to_ts.emplace(file, file_id); + sst_id_max = std::max(file_id, sst_id_max); + sst_id_min = std::min(file_id, sst_id_min); } - // checkpoint_files.erase(std::remove_if(checkpoint_files.begin(), checkpoint_files.end(), [](string file){ - // return file_to_ts.find(file) != file_to_ts.end();}), checkpoint_files.end()); - } + // Create the new backup descriptor file + std::string backup_desc_contents; + makeUpBackupDescString(new_file_to_ts, sst_id_min, sst_id_max, backup_desc_contents); + common::FileUtil::createFileWithContent(formatted_checkpoint_local_path, backupDesc, backup_desc_contents); + checkpoint_files.emplace_back(backupDesc); + // Upload checkpoint to s3 - std::string formatted_s3_dir_path_upload = folly::stringPrintf("%s%s/%d/", - ensure_ends_with_pathsep(FLAGS_s3_incre_backup_prefix).c_str(), db->db_name().c_str(), ts); - std::string formatted_checkpoint_local_path = ensure_ends_with_pathsep(checkpoint_local_path); auto upload_func = [&](const std::string& dest, const std::string& source) { LOG(INFO) << "Copying " << source << " to " << dest; auto copy_resp = local_s3_util->putObject(dest, source); From c4340a2a5f62a018fe563669da32ca5de555c409 Mon Sep 17 00:00:00 2001 From: qiyanghe1998 Date: Fri, 5 Aug 2022 21:58:18 +0000 Subject: [PATCH 10/15] add tessts for backup descriptor --- .../application_db_backup_manager.cpp | 132 +++++++++++------- rocksdb_admin/application_db_backup_manager.h | 7 + rocksdb_admin/tests/admin_handler_test.cpp | 116 ++++++++++++++- 3 files changed, 199 insertions(+), 56 deletions(-) diff --git a/rocksdb_admin/application_db_backup_manager.cpp b/rocksdb_admin/application_db_backup_manager.cpp index 75122e2d..47afb2ab 100644 --- a/rocksdb_admin/application_db_backup_manager.cpp +++ b/rocksdb_admin/application_db_backup_manager.cpp @@ -116,10 +116,20 @@ inline bool ends_with(const std::string& str, const std::string& suffix) { 0 == str.compare(str.size()-suffix.size(), suffix.size(), suffix); } +inline int64_t remove_leading_zero(std::string& num) { + size_t nonzero_pos = num.find_first_not_of('0'); + if (nonzero_pos == std::string::npos) { + num.erase(0, num.size()-1); + } else { + num.erase(0, nonzero_pos); + } + return stoll(num); +} + // No checksum for current implementation, it needs to re-implement if checksum is enabled in sst names. -inline int64_t parse_sst_id(const std::string& sst_name) { +inline std::string parse_sst_id(const std::string& sst_name) { // 123.sst (w/o checksum) - return std::stoll(sst_name.substr(sst_name.find_first_of('.'))); + return sst_name.substr(0, sst_name.find_first_of('.')); } // copy from admin_hamdler.cpp @@ -161,40 +171,37 @@ std::shared_ptr ApplicationDBBackupManager::createLocalS3Util( } // convert the string to a backup_descriptor map, if any error happens, it will return an empty map -void parseBackupDesc(const std::string& contents, std::unordered_map* file_to_ts) { +// TODO: add sst_id_min and sst_id_max parsing +void ApplicationDBBackupManager::parseBackupDesc(const std::string& contents, + std::unordered_map* file_to_ts) { // Example: - // contents = "file1=1;file2=2;file3=3;" + // contents = "file1=1;file2=2;file3=3;sst_id_min,sstd_id_max;" size_t pos = 0; while (pos < contents.size()) { size_t eq_pos = contents.find("=", pos); - if (eq_pos == std::string::npos) { // finish parsing - return; - } + // finish parsing + if (eq_pos == std::string::npos) return; + std::string key = contents.substr(pos, eq_pos - pos); - if (key.empty()) { - file_to_ts->clear(); - return; - } + if (key.empty()) return; size_t semi_pos = contents.find(";", eq_pos); - if (semi_pos == std::string::npos) { - file_to_ts->clear(); - return; - } + if (semi_pos == std::string::npos) return; + std::string value_string = contents.substr(eq_pos + 1, semi_pos - eq_pos - 1); - if (value_string.empty()) { - file_to_ts->clear(); - return; - } - int64_t value = std::stoll(value_string, nullptr, 10); - + if (value_string.empty()) return; + + int64_t value = remove_leading_zero(value_string); file_to_ts->emplace(key, value); + + pos = semi_pos+1; } } // convert file_to_ts map, ssd_id_min, ssd_id_max to a string. -void makeUpBackupDescString(const std::unordered_map& file_to_ts, int64_t sst_id_min, int64_t sst_id_max, std::string& contents) { +void ApplicationDBBackupManager::makeUpBackupDescString(const std::unordered_map& + file_to_ts, int64_t sst_id_min, int64_t sst_id_max, std::string& contents) { contents.clear(); for (auto& item : file_to_ts) { contents += item.first + "=" + std::to_string(item.second) + ";"; @@ -208,14 +215,14 @@ bool ApplicationDBBackupManager::backupDBToS3(const std::shared_ptrdb_name() << " to " << ensure_ends_with_pathsep(FLAGS_s3_incre_backup_prefix) << db->db_name(); auto ts = common::timeutil::GetCurrentTimestamp(); - auto local_path = folly::stringPrintf("%ss3_tmp/%s%d/", rocksdb_dir_.c_str(), db->db_name().c_str(), ts); + auto local_path = folly::stringPrintf("%ss3_tmp/%s%ld/", rocksdb_dir_.c_str(), db->db_name().c_str(), ts); boost::system::error_code remove_err; boost::system::error_code create_err; boost::filesystem::remove_all(local_path, remove_err); boost::filesystem::create_directories(local_path, create_err); SCOPE_EXIT { boost::filesystem::remove_all(local_path, remove_err); }; if (remove_err || create_err) { - // SetException("Cannot remove/create dir for backup: " + local_path, AdminErrorCode::DB_ADMIN_ERROR, &callback); + LOG(ERROR) << "Cannot remove/create dir for backup: " << local_path; common::Stats::get()->Incr(kS3BackupFailure); return false; } @@ -226,7 +233,6 @@ bool ApplicationDBBackupManager::backupDBToS3(const std::shared_ptrrocksdb(), &checkpoint); if (!status.ok()) { - // OKOrSetException(status, AdminErrorCode::DB_ADMIN_ERROR, &callback); LOG(ERROR) << "Error happened when trying to initialize checkpoint: " << status.ToString(); common::Stats::get()->Incr(kS3BackupFailure); return false; @@ -235,7 +241,6 @@ bool ApplicationDBBackupManager::backupDBToS3(const std::shared_ptrCreateCheckpoint(checkpoint_local_path); if (!status.ok()) { - // OKOrSetException(status, AdminErrorCode::DB_ADMIN_ERROR, &callback); LOG(ERROR) << "Error happened when trying to create checkpoint: " << status.ToString(); common::Stats::get()->Incr(kS3BackupFailure); return false; @@ -245,7 +250,6 @@ bool ApplicationDBBackupManager::backupDBToS3(const std::shared_ptr checkpoint_files; status = rocksdb::Env::Default()->GetChildren(checkpoint_local_path, &checkpoint_files); if (!status.ok()) { - // OKOrSetException(status, AdminErrorCode::DB_ADMIN_ERROR, &callback); LOG(ERROR) << "Error happened when trying to list files in the checkpoint: " << status.ToString(); common::Stats::get()->Incr(kS3BackupFailure); return false; @@ -260,37 +264,51 @@ bool ApplicationDBBackupManager::backupDBToS3(const std::shared_ptrdb_name().c_str(), ts); std::string formatted_checkpoint_local_path = ensure_ends_with_pathsep(checkpoint_local_path); std::unordered_map file_to_ts; - std::unordered_map new_file_to_ts; const string sst_suffix = ".sst"; if (last_ts >= 0) { - auto local_path_last_backup = folly::stringPrintf("%ss3_tmp/%s%d/", rocksdb_dir_.c_str(), db->db_name().c_str(), last_ts); - std::string formatted_s3_last_backup_dir_path = folly::stringPrintf("%s%s/%d/", + LOG(INFO) << "find the last backup on " << last_ts; + std::string local_path_last_backup = folly::stringPrintf("%ss3_tmp/%s%ld/", rocksdb_dir_.c_str(), db->db_name().c_str(), last_ts) + "checkpoint/"; + std::string formatted_s3_last_backup_dir_path = folly::stringPrintf("%s%s/%ld/", ensure_ends_with_pathsep(FLAGS_s3_incre_backup_prefix).c_str(), db->db_name().c_str(), last_ts); - bool need_download = !(boost::filesystem::is_directory(local_path_last_backup)); - if (need_download) + boost::system::error_code remove_last_backup_err; + boost::system::error_code create_last_backup_err; + // if we have not removed the last backup, we do not need to download it + bool need_download = !boost::filesystem::exists(local_path_last_backup + backupDesc); + boost::filesystem::create_directories(local_path_last_backup, create_last_backup_err); + SCOPE_EXIT { boost::filesystem::remove_all(local_path_last_backup, remove_last_backup_err); }; + if (create_err) { + // SetException("Cannot remove/create dir for backup: " + local_path, AdminErrorCode::DB_ADMIN_ERROR, &callback); + common::Stats::get()->Incr(kS3BackupFailure); + return false; + } + + if (need_download) { auto resp = local_s3_util->getObject(formatted_s3_last_backup_dir_path + backupDesc, local_path_last_backup + backupDesc, FLAGS_s3_direct_io); + } else { + assert(boost::filesystem::exists(local_path_last_backup + backupDesc)); + } std::string last_backup_desc_contents; common::FileUtil::readFileToString(local_path_last_backup + backupDesc, &last_backup_desc_contents); parseBackupDesc(last_backup_desc_contents, &file_to_ts); if (file_to_ts.empty()) { - LOG(INFO) << "The backup of " << db->db_name() << " on timestamp " << last_ts << " is broken"; - } + LOG(INFO) << "The last backup of " << db->db_name() << " on timestamp " << last_ts << " is broken"; + } // Remove all the duplicate files checkpoint_files.erase(std::remove_if(checkpoint_files.begin(), checkpoint_files.end(), [&](string file){ return file_to_ts.find(file) != file_to_ts.end() && ends_with(file, sst_suffix);}), checkpoint_files.end()); - - new_file_to_ts = std::move(file_to_ts); + + LOG(INFO) << "finish parsing last backup"; } // Get the min_id and max_id for backup descriptor @@ -298,16 +316,17 @@ bool ApplicationDBBackupManager::backupDBToS3(const std::shared_ptrIncr(kS3BackupFailure); return false; } @@ -368,9 +385,7 @@ bool ApplicationDBBackupManager::backupDBToS3(const std::shared_ptrIncr(kS3BackupFailure); return false; } @@ -394,21 +409,25 @@ bool ApplicationDBBackupManager::backupDBToS3(const std::shared_ptrIncr(kS3BackupFailure); return false; } if (!upload_func(formatted_s3_dir_path_upload + kMetaFilename, dbmeta_path)) { - // SetException("Error happened when upload meta from checkpoint to S3", - // AdminErrorCode::DB_ADMIN_ERROR, &callback); + LOG(ERROR) << "Error happened when upload meta from checkpoint to S3"; common::Stats::get()->Incr(kS3BackupFailure); return false; } } - // Delete the directory to remove the snapshot. - boost::filesystem::remove_all(local_path); + auto it = db_backups_.find(db->db_name()); + if (it != db_backups_.end()) { + it->second.emplace_back(ts); + } else { + db_backups_.emplace(db->db_name(), std::vector(1, ts)); + } + + return true; } bool ApplicationDBBackupManager::backupAllDBsToS3() { @@ -431,4 +450,9 @@ bool ApplicationDBBackupManager::backupAllDBsToS3() { return ret; } +std::vector ApplicationDBBackupManager::getTimeStamp(const std::string& db) { + if (db_backups_.find(db) != db_backups_.end()) return db_backups_[db]; + else return std::vector(); +} + } \ No newline at end of file diff --git a/rocksdb_admin/application_db_backup_manager.h b/rocksdb_admin/application_db_backup_manager.h index 70cc358d..4e5111c3 100644 --- a/rocksdb_admin/application_db_backup_manager.h +++ b/rocksdb_admin/application_db_backup_manager.h @@ -71,6 +71,13 @@ class ApplicationDBBackupManager { bool backupDBToS3(const std::shared_ptr& db); + std::vector getTimeStamp(const std::string& db); + + void parseBackupDesc(const std::string& contents, std::unordered_map* file_to_ts); + + void makeUpBackupDescString(const std::unordered_map& file_to_ts, int64_t sst_id_min, + int64_t sst_id_max, std::string& contents); + private: // copy from admin_hamdler.. std::shared_ptr createLocalS3Util(const uint32_t limit_mbs, diff --git a/rocksdb_admin/tests/admin_handler_test.cpp b/rocksdb_admin/tests/admin_handler_test.cpp index b76ac7b4..bdfa533b 100644 --- a/rocksdb_admin/tests/admin_handler_test.cpp +++ b/rocksdb_admin/tests/admin_handler_test.cpp @@ -21,8 +21,11 @@ #include #include +#include "common/file_util.h" #include "common/s3util.h" +#include "common/segment_utils.h" #include "boost/filesystem.hpp" +#include "boost/range/iterator_range.hpp" #include "folly/SocketAddress.h" #include "gtest/gtest.h" #include "rocksdb/options.h" @@ -62,7 +65,9 @@ using apache::thrift::RpcOptions; using apache::thrift::ThriftServer; using apache::thrift::async::TAsyncSocket; using common::ThriftClientPool; +using rocksdb::CompactRangeOptions; using rocksdb::FlushOptions; +using rocksdb::IngestExternalFileOptions; using rocksdb::Options; using rocksdb::ReadOptions; using rocksdb::Status; @@ -92,6 +97,7 @@ DEFINE_bool( DECLARE_string(rocksdb_dir); DECLARE_bool(enable_checkpoint_backup); DECLARE_bool(rocksdb_allow_overlapping_keys); +DECLARE_bool(s3_direct_io); string generateRandIntAsStr() { static thread_local unsigned int seed = time(nullptr); @@ -137,10 +143,14 @@ class AdminHandlerTestBase : public testing::Test { AdminHandlerTestBase() { const testing::TestInfo* const test_info = testing::UnitTest::GetInstance()->current_test_info(); - std:string back_up_test_name = "ApplicationDBBackupManagerTest"; - if (strcmp(test_info->name(), back_up_test_name.c_str())) { + std::string back_up_test_name = "ApplicationDBBackupManagerTest"; + std::string backup_desc_test_name = "BackupDescriptorTest"; + if (0 == strcmp(test_info->name(), back_up_test_name.c_str())) { FLAGS_enable_async_incremental_backup_dbs = true; FLAGS_async_incremental_backup_dbs_frequency_sec = 1; + } else if (0 == strcmp(test_info->name(), backup_desc_test_name.c_str())) { + FLAGS_enable_async_incremental_backup_dbs = true; + FLAGS_async_incremental_backup_dbs_frequency_sec = 3; } // setup local , s3 paths for test FLAGS_rocksdb_dir = testDir(); @@ -919,6 +929,108 @@ TEST(AdminHandlerTest, InitS3Tmp) { EXPECT_FALSE(fs::exists(test_directory)); } +TEST_F(AdminHandlerTestBase, BackupDescriptorTest) { + int32_t range = 1000; + const string testdb = generateDBName(); + addDBWithRole(testdb, "LEADER"); + auto meta = handler_->getMetaData(testdb); + verifyMeta(meta, testdb, true, "", ""); + + // use incremental backup + handler_->writeMetaData(testdb, "fakes3bucket", "fakes3path"); + + AdminException e; + auto db_ = handler_->getDB(testdb, &e)->rocksdb(); + auto local_s3_util = common::S3Util::BuildS3Util(FLAGS_incre_backup_limit_mbs, FLAGS_s3_incre_backup_bucket); + + for (int32_t i = 0; i < range; ++i) { + std::string kv_tmp = std::to_string(i); + writeToDB(testdb, kv_tmp, kv_tmp + "_" + kv_tmp); + } + flushDB(testdb); + // wait for the first incremental backup + std::this_thread::sleep_for(std::chrono::seconds(FLAGS_async_incremental_backup_dbs_frequency_sec)); + + uint64_t manifest_size = 0; + std::vector files; + db_->GetLiveFiles(files, &manifest_size, true); + + // some overlap between two batches of kv pairs for compaction test later + for (int32_t i = 0; i < range; ++i) { + std::string kv_tmp = std::to_string(i + range/2); + writeToDB(testdb, kv_tmp, kv_tmp + "_" + kv_tmp + "_" + kv_tmp); + } + flushDB(testdb); + // wait for the second incremental backup + std::this_thread::sleep_for(std::chrono::seconds(FLAGS_async_incremental_backup_dbs_frequency_sec)); + + auto timestamps = handler_->backup_manager_->getTimeStamp(testdb); + EXPECT_TRUE(timestamps.size() >= 2); + std::string s3_path = FLAGS_s3_incre_backup_prefix + testdb + "/"; + std::string restore_path = "/restore_tmp/" + testdb + "/"; + + boost::system::error_code remove_err; + boost::system::error_code create_err; + fs::create_directories(restore_path, create_err); + SCOPE_EXIT { fs::remove_all(restore_path, remove_err); }; + + // download the latest backup + auto responses = local_s3_util->getObjects(s3_path + std::to_string(timestamps[1]) + "/", restore_path, "/", + FLAGS_s3_direct_io); + EXPECT_TRUE(responses.Error().empty() && responses.Body().size() > 0); + + std::string last_backup_desc_contents; + std::unordered_map file_to_ts; + const std::string backupDesc = "backup_descriptor"; + common::FileUtil::readFileToString(restore_path + backupDesc, &last_backup_desc_contents); + handler_->backup_manager_->parseBackupDesc(last_backup_desc_contents, &file_to_ts); + + // download necessary from previous backup + for (auto& it : file_to_ts) { + if (it.second != timestamps[1]) { + auto response = local_s3_util->getObject(s3_path + std::to_string(it.second) + "/" + it.first, restore_path + it.first, + FLAGS_s3_direct_io); + EXPECT_TRUE(response.Error().empty()); + } + } + boost::system::error_code remove_backup_desc_err; + boost::filesystem::remove(restore_path + backupDesc); + + rocksdb::DB* restore_db_; + Options options_restore; + auto status = rocksdb::DB::Open(options_restore, restore_path, &restore_db_); + EXPECT_TRUE(status.ok()); + + ReadOptions db_readoptions = ReadOptions(); + auto it_db_ = db_->NewIterator(db_readoptions); + + ReadOptions restore_db_readoptions = ReadOptions(); + auto it_restore_db_ = restore_db_->NewIterator(restore_db_readoptions); + + // compare local db with restore db + for (it_db_->SeekToFirst(), it_restore_db_->SeekToFirst(); it_db_->Valid() && + it_restore_db_->Valid(); it_db_->Next(), it_restore_db_->Next()) { + EXPECT_TRUE(it_db_->key() == it_restore_db_->key()); + EXPECT_TRUE(it_db_->value() == it_restore_db_->value()); + } + EXPECT_FALSE(it_db_->Valid() || it_restore_db_->Valid()); + + // compact local db and then compare again + rocksdb::Slice key_begin("0"); + rocksdb::Slice key_end(std::to_string(range - 1 + range/2)); + auto s = db_->CompactRange(CompactRangeOptions(), &key_begin, &key_end); + EXPECT_TRUE(s.ok()); + + it_db_ = db_->NewIterator(db_readoptions); + it_restore_db_ = restore_db_->NewIterator(restore_db_readoptions); + for (it_db_->SeekToFirst(), it_restore_db_->SeekToFirst(); it_db_->Valid() && + it_restore_db_->Valid(); it_db_->Next(), it_restore_db_->Next()) { + EXPECT_TRUE(it_db_->key() == it_restore_db_->key()); + EXPECT_TRUE(it_db_->value() == it_restore_db_->value()); + } + EXPECT_FALSE(it_db_->Valid() || it_restore_db_->Valid()); +} + } // namespace admin int main(int argc, char** argv) { From 64cdf4cd25e277dec8ca9af24c61e34d46d5e878 Mon Sep 17 00:00:00 2001 From: qiyanghe1998 Date: Fri, 5 Aug 2022 23:40:49 +0000 Subject: [PATCH 11/15] stoll -> stol --- rocksdb_admin/application_db_backup_manager.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rocksdb_admin/application_db_backup_manager.cpp b/rocksdb_admin/application_db_backup_manager.cpp index 47afb2ab..bc62c402 100644 --- a/rocksdb_admin/application_db_backup_manager.cpp +++ b/rocksdb_admin/application_db_backup_manager.cpp @@ -123,7 +123,7 @@ inline int64_t remove_leading_zero(std::string& num) { } else { num.erase(0, nonzero_pos); } - return stoll(num); + return std::stol(num); } // No checksum for current implementation, it needs to re-implement if checksum is enabled in sst names. From 018a465ceee69cd90cbeb5303f9e99ffe49987d4 Mon Sep 17 00:00:00 2001 From: qiyanghe1998 Date: Tue, 9 Aug 2022 22:46:56 +0000 Subject: [PATCH 12/15] use json for backup desc & clean up --- .../application_db_backup_manager.cpp | 103 ++++++++---------- rocksdb_admin/application_db_backup_manager.h | 6 +- rocksdb_admin/tests/admin_handler_test.cpp | 20 +++- 3 files changed, 62 insertions(+), 67 deletions(-) diff --git a/rocksdb_admin/application_db_backup_manager.cpp b/rocksdb_admin/application_db_backup_manager.cpp index bc62c402..b3a03bac 100644 --- a/rocksdb_admin/application_db_backup_manager.cpp +++ b/rocksdb_admin/application_db_backup_manager.cpp @@ -58,8 +58,10 @@ namespace { const std::string kMetaFilename = "dbmeta"; const std::string kS3BackupMs = "s3_backup_ms"; const std::string kS3BackupFailure = "s3_backup_failure"; -const int kS3UtilRecheckSec = 5; +const int32_t kS3UtilRecheckSec = 5; const std::string backupDesc = "backup_descriptor"; +const std::string lastBackupTs = "last_backup_ts"; +const std::string fileToTs = "file_to_ts"; } @@ -116,7 +118,7 @@ inline bool ends_with(const std::string& str, const std::string& suffix) { 0 == str.compare(str.size()-suffix.size(), suffix.size(), suffix); } -inline int64_t remove_leading_zero(std::string& num) { +inline int64_t remove_leading_zero(std::string num) { size_t nonzero_pos = num.find_first_not_of('0'); if (nonzero_pos == std::string::npos) { num.erase(0, num.size()-1); @@ -170,43 +172,16 @@ std::shared_ptr ApplicationDBBackupManager::createLocalS3Util( return local_s3_util; } -// convert the string to a backup_descriptor map, if any error happens, it will return an empty map -// TODO: add sst_id_min and sst_id_max parsing -void ApplicationDBBackupManager::parseBackupDesc(const std::string& contents, - std::unordered_map* file_to_ts) { - // Example: - // contents = "file1=1;file2=2;file3=3;sst_id_min,sstd_id_max;" - size_t pos = 0; - - while (pos < contents.size()) { - size_t eq_pos = contents.find("=", pos); - // finish parsing - if (eq_pos == std::string::npos) return; - - std::string key = contents.substr(pos, eq_pos - pos); - if (key.empty()) return; - - size_t semi_pos = contents.find(";", eq_pos); - if (semi_pos == std::string::npos) return; - - std::string value_string = contents.substr(eq_pos + 1, semi_pos - eq_pos - 1); - if (value_string.empty()) return; - - int64_t value = remove_leading_zero(value_string); - file_to_ts->emplace(key, value); - - pos = semi_pos+1; - } -} - -// convert file_to_ts map, ssd_id_min, ssd_id_max to a string. -void ApplicationDBBackupManager::makeUpBackupDescString(const std::unordered_map& - file_to_ts, int64_t sst_id_min, int64_t sst_id_max, std::string& contents) { +// convert file_to_ts map, last_ts to a json value. +void ApplicationDBBackupManager::makeUpBackupDescJson(const std::unordered_map& + file_to_ts, int64_t last_ts, Json::Value& contents) { contents.clear(); + Json::Value temp_map(Json::objectValue); for (auto& item : file_to_ts) { - contents += item.first + "=" + std::to_string(item.second) + ";"; + temp_map[item.first] = item.second; } - contents += std::to_string(sst_id_min) + "-" + std::to_string(sst_id_max) + ";"; + contents[fileToTs] = temp_map; + contents[lastBackupTs] = last_ts; } bool ApplicationDBBackupManager::backupDBToS3(const std::shared_ptr& db) { @@ -215,7 +190,8 @@ bool ApplicationDBBackupManager::backupDBToS3(const std::shared_ptrdb_name() << " to " << ensure_ends_with_pathsep(FLAGS_s3_incre_backup_prefix) << db->db_name(); auto ts = common::timeutil::GetCurrentTimestamp(); - auto local_path = folly::stringPrintf("%ss3_tmp/%s%ld/", rocksdb_dir_.c_str(), db->db_name().c_str(), ts); + std::string s3_incre_path = rocksdb_dir_ + "s3_incre/"; + auto local_path = folly::stringPrintf("%s/%s%ld/", s3_incre_path.c_str(), db->db_name().c_str(), ts); boost::system::error_code remove_err; boost::system::error_code create_err; boost::filesystem::remove_all(local_path, remove_err); @@ -273,14 +249,13 @@ bool ApplicationDBBackupManager::backupDBToS3(const std::shared_ptr= 0) { LOG(INFO) << "find the last backup on " << last_ts; - std::string local_path_last_backup = folly::stringPrintf("%ss3_tmp/%s%ld/", rocksdb_dir_.c_str(), db->db_name().c_str(), last_ts) + "checkpoint/"; + std::string local_path_last_backup = folly::stringPrintf("%s/%s%ld/", s3_incre_path.c_str(), db->db_name().c_str(), last_ts) + "checkpoint/"; std::string formatted_s3_last_backup_dir_path = folly::stringPrintf("%s%s/%ld/", ensure_ends_with_pathsep(FLAGS_s3_incre_backup_prefix).c_str(), db->db_name().c_str(), last_ts); boost::system::error_code remove_last_backup_err; boost::system::error_code create_last_backup_err; // if we have not removed the last backup, we do not need to download it - bool need_download = !boost::filesystem::exists(local_path_last_backup + backupDesc); boost::filesystem::create_directories(local_path_last_backup, create_last_backup_err); SCOPE_EXIT { boost::filesystem::remove_all(local_path_last_backup, remove_last_backup_err); }; if (create_err) { @@ -289,17 +264,29 @@ bool ApplicationDBBackupManager::backupDBToS3(const std::shared_ptrgetObject(formatted_s3_last_backup_dir_path + backupDesc, - local_path_last_backup + backupDesc, FLAGS_s3_direct_io); - } else { - assert(boost::filesystem::exists(local_path_last_backup + backupDesc)); + auto resp = local_s3_util->getObject(formatted_s3_last_backup_dir_path + backupDesc, + local_path_last_backup + backupDesc, FLAGS_s3_direct_io); + if (!resp.Error().empty()) { + LOG(ERROR) << "Error happened when downloading the backup descriptor from S3 to local: " + << resp.Error(); + return false; } + Json::Reader reader; + Json::Value last_backup_desc; std::string last_backup_desc_contents; common::FileUtil::readFileToString(local_path_last_backup + backupDesc, &last_backup_desc_contents); - - parseBackupDesc(last_backup_desc_contents, &file_to_ts); + if (!reader.parse(last_backup_desc_contents, last_backup_desc) || !last_backup_desc.isObject()) { + LOG(ERROR) << "Error happened when parsing the backup descriptor: " + << resp.Error(); + return false; + } + + const auto& file_map = last_backup_desc[fileToTs]; + for (Json::Value::const_iterator it = file_map.begin(); it != file_map.end(); ++it) { + file_to_ts.emplace(it.key().asString(), it->asInt64()); + } + if (file_to_ts.empty()) { LOG(INFO) << "The last backup of " << db->db_name() << " on timestamp " << last_ts << " is broken"; } @@ -311,24 +298,17 @@ bool ApplicationDBBackupManager::backupDBToS3(const std::shared_ptrIncr(kS3BackupFailure); + return false; + } + auto it = db_backups_.find(db->db_name()); if (it != db_backups_.end()) { it->second.emplace_back(ts); diff --git a/rocksdb_admin/application_db_backup_manager.h b/rocksdb_admin/application_db_backup_manager.h index 4e5111c3..0e6edaf8 100644 --- a/rocksdb_admin/application_db_backup_manager.h +++ b/rocksdb_admin/application_db_backup_manager.h @@ -18,6 +18,7 @@ #include #include +#include "common/jsoncpp/include/json/json.h" #include "common/object_lock.h" #include "common/s3util.h" #include "rocksdb/db.h" @@ -73,10 +74,7 @@ class ApplicationDBBackupManager { std::vector getTimeStamp(const std::string& db); - void parseBackupDesc(const std::string& contents, std::unordered_map* file_to_ts); - - void makeUpBackupDescString(const std::unordered_map& file_to_ts, int64_t sst_id_min, - int64_t sst_id_max, std::string& contents); + void makeUpBackupDescJson(const std::unordered_map& file_to_ts, int64_t last_ts, Json::Value& contents); private: // copy from admin_hamdler.. diff --git a/rocksdb_admin/tests/admin_handler_test.cpp b/rocksdb_admin/tests/admin_handler_test.cpp index bdfa533b..71402cf1 100644 --- a/rocksdb_admin/tests/admin_handler_test.cpp +++ b/rocksdb_admin/tests/admin_handler_test.cpp @@ -983,13 +983,21 @@ TEST_F(AdminHandlerTestBase, BackupDescriptorTest) { std::unordered_map file_to_ts; const std::string backupDesc = "backup_descriptor"; common::FileUtil::readFileToString(restore_path + backupDesc, &last_backup_desc_contents); - handler_->backup_manager_->parseBackupDesc(last_backup_desc_contents, &file_to_ts); - // download necessary from previous backup - for (auto& it : file_to_ts) { - if (it.second != timestamps[1]) { - auto response = local_s3_util->getObject(s3_path + std::to_string(it.second) + "/" + it.first, restore_path + it.first, - FLAGS_s3_direct_io); + Json::Reader reader; + Json::Value last_backup_desc; + EXPECT_TRUE(reader.parse(last_backup_desc_contents, last_backup_desc)); + EXPECT_TRUE(last_backup_desc.isObject()); + + const std::string fileToTs = "file_to_ts"; + const auto& file_map = last_backup_desc[fileToTs]; + + // filter repeated files from previous backup + for (Json::Value::const_iterator it = file_map.begin(); it != file_map.end(); ++it) { + auto key = it.key().asString(); + auto value = it->asInt64(); + if (value != timestamps[1]) { + auto response = local_s3_util->getObject(s3_path + std::to_string(value) + "/" + key, restore_path + key, FLAGS_s3_direct_io); EXPECT_TRUE(response.Error().empty()); } } From d3af48c79cf7e8c0f49ab82749b82967ae751bcd Mon Sep 17 00:00:00 2001 From: qiyanghe1998 Date: Tue, 9 Aug 2022 22:57:46 +0000 Subject: [PATCH 13/15] clean up --- rocksdb_admin/application_db_backup_manager.cpp | 16 ---------------- rocksdb_admin/application_db_backup_manager.h | 1 + rocksdb_admin/tests/admin_handler_test.cpp | 1 - 3 files changed, 1 insertion(+), 17 deletions(-) diff --git a/rocksdb_admin/application_db_backup_manager.cpp b/rocksdb_admin/application_db_backup_manager.cpp index b3a03bac..c24370a8 100644 --- a/rocksdb_admin/application_db_backup_manager.cpp +++ b/rocksdb_admin/application_db_backup_manager.cpp @@ -118,22 +118,6 @@ inline bool ends_with(const std::string& str, const std::string& suffix) { 0 == str.compare(str.size()-suffix.size(), suffix.size(), suffix); } -inline int64_t remove_leading_zero(std::string num) { - size_t nonzero_pos = num.find_first_not_of('0'); - if (nonzero_pos == std::string::npos) { - num.erase(0, num.size()-1); - } else { - num.erase(0, nonzero_pos); - } - return std::stol(num); -} - -// No checksum for current implementation, it needs to re-implement if checksum is enabled in sst names. -inline std::string parse_sst_id(const std::string& sst_name) { - // 123.sst (w/o checksum) - return sst_name.substr(0, sst_name.find_first_of('.')); -} - // copy from admin_hamdler.cpp inline bool should_new_s3_client( const common::S3Util& s3_util, const uint32_t limit_mbs, const std::string& s3_bucket) { diff --git a/rocksdb_admin/application_db_backup_manager.h b/rocksdb_admin/application_db_backup_manager.h index 0e6edaf8..52f041e3 100644 --- a/rocksdb_admin/application_db_backup_manager.h +++ b/rocksdb_admin/application_db_backup_manager.h @@ -90,6 +90,7 @@ class ApplicationDBBackupManager { // used to store all backups for each db. // I only store the timestamps since I use that to name different backups. + // May add checksum in the future. std::unordered_map> db_backups_; std::shared_ptr s3_util_; diff --git a/rocksdb_admin/tests/admin_handler_test.cpp b/rocksdb_admin/tests/admin_handler_test.cpp index 71402cf1..eb75016b 100644 --- a/rocksdb_admin/tests/admin_handler_test.cpp +++ b/rocksdb_admin/tests/admin_handler_test.cpp @@ -23,7 +23,6 @@ #include "common/file_util.h" #include "common/s3util.h" -#include "common/segment_utils.h" #include "boost/filesystem.hpp" #include "boost/range/iterator_range.hpp" #include "folly/SocketAddress.h" From 2c694fe85920beb7d76044420c97940d2db5c06e Mon Sep 17 00:00:00 2001 From: qiyanghe1998 Date: Wed, 10 Aug 2022 22:43:10 +0000 Subject: [PATCH 14/15] add commonts & clean up --- rocksdb_admin/application_db_backup_manager.cpp | 4 +++- rocksdb_admin/tests/admin_handler_test.cpp | 6 +----- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/rocksdb_admin/application_db_backup_manager.cpp b/rocksdb_admin/application_db_backup_manager.cpp index c24370a8..b901f9ae 100644 --- a/rocksdb_admin/application_db_backup_manager.cpp +++ b/rocksdb_admin/application_db_backup_manager.cpp @@ -385,7 +385,9 @@ bool ApplicationDBBackupManager::backupDBToS3(const std::shared_ptr files; - db_->GetLiveFiles(files, &manifest_size, true); - // some overlap between two batches of kv pairs for compaction test later for (int32_t i = 0; i < range; ++i) { std::string kv_tmp = std::to_string(i + range/2); @@ -991,7 +987,7 @@ TEST_F(AdminHandlerTestBase, BackupDescriptorTest) { const std::string fileToTs = "file_to_ts"; const auto& file_map = last_backup_desc[fileToTs]; - // filter repeated files from previous backup + // download files from previous backup for (Json::Value::const_iterator it = file_map.begin(); it != file_map.end(); ++it) { auto key = it.key().asString(); auto value = it->asInt64(); From e49626f74629fd8d37ca369e67dd31ca0c1836df Mon Sep 17 00:00:00 2001 From: qiyanghe1998 Date: Fri, 12 Aug 2022 00:35:50 +0000 Subject: [PATCH 15/15] add backup starter & fix backup-descriptor est slice --- .../application_db_backup_manager.cpp | 86 +++++++++++++------ rocksdb_admin/tests/admin_handler_test.cpp | 33 ++++--- 2 files changed, 83 insertions(+), 36 deletions(-) diff --git a/rocksdb_admin/application_db_backup_manager.cpp b/rocksdb_admin/application_db_backup_manager.cpp index b901f9ae..8237bc75 100644 --- a/rocksdb_admin/application_db_backup_manager.cpp +++ b/rocksdb_admin/application_db_backup_manager.cpp @@ -62,6 +62,7 @@ const int32_t kS3UtilRecheckSec = 5; const std::string backupDesc = "backup_descriptor"; const std::string lastBackupTs = "last_backup_ts"; const std::string fileToTs = "file_to_ts"; +const std::string backupStarter = "backup_starter"; } @@ -171,6 +172,12 @@ void ApplicationDBBackupManager::makeUpBackupDescJson(const std::unordered_map& db) { // ::admin::AdminException e; + auto status = db->rocksdb()->Flush(rocksdb::FlushOptions()); + if(!status.ok()) { + LOG(ERROR) << db->db_name() << " can't flush successfully before backup"; + return false; + } + common::Timer timer(kS3BackupMs); LOG(INFO) << "S3 Backup " << db->db_name() << " to " << ensure_ends_with_pathsep(FLAGS_s3_incre_backup_prefix) << db->db_name(); auto ts = common::timeutil::GetCurrentTimestamp(); @@ -191,7 +198,7 @@ bool ApplicationDBBackupManager::backupDBToS3(const std::shared_ptrUnlock(db->db_name()); }; rocksdb::Checkpoint* checkpoint; - auto status = rocksdb::Checkpoint::Create(db->rocksdb(), &checkpoint); + status = rocksdb::Checkpoint::Create(db->rocksdb(), &checkpoint); if (!status.ok()) { LOG(ERROR) << "Error happened when trying to initialize checkpoint: " << status.ToString(); common::Stats::get()->Incr(kS3BackupFailure); @@ -217,52 +224,69 @@ bool ApplicationDBBackupManager::backupDBToS3(const std::shared_ptrdb_name()); - if (itor != db_backups_.end()) { - last_ts = itor->second[(int)itor->second.size()-1]; - } // if there exists last backup, we need to get the backup_descriptor auto local_s3_util = createLocalS3Util(FLAGS_incre_backup_limit_mbs, FLAGS_s3_incre_backup_bucket); - std::string formatted_s3_dir_path_upload = folly::stringPrintf("%s%s/%ld/", - ensure_ends_with_pathsep(FLAGS_s3_incre_backup_prefix).c_str(), db->db_name().c_str(), ts); + std::string formatted_s3_prefix = folly::stringPrintf("%s%s/", ensure_ends_with_pathsep(FLAGS_s3_incre_backup_prefix).c_str(), + db->db_name().c_str()); + std::string formatted_s3_dir_path_upload = formatted_s3_prefix + std::to_string(ts) + "/"; std::string formatted_checkpoint_local_path = ensure_ends_with_pathsep(checkpoint_local_path); + std::string local_path_last_backup = folly::stringPrintf("%s/%s%ld/", s3_incre_path.c_str(), db->db_name().c_str(), last_ts) + "checkpoint/"; + + boost::system::error_code remove_last_backup_err; + boost::system::error_code create_last_backup_err; + + boost::filesystem::create_directories(local_path_last_backup, create_last_backup_err); + SCOPE_EXIT { boost::filesystem::remove_all(local_path_last_backup, remove_last_backup_err); }; + if (create_err) { + LOG(ERROR) << "Cannot remove/create dir for backup: " + local_path_last_backup; + common::Stats::get()->Incr(kS3BackupFailure); + return false; + } + + auto starter_resp = local_s3_util->getObject(formatted_s3_prefix + backupStarter, + local_path_last_backup + backupStarter, FLAGS_s3_direct_io); + if (!starter_resp.Error().empty()) { + // No previous starter file, this is the first backup + LOG(ERROR) << "Error happened when downloading the backup starter from S3 to local: " + << starter_resp.Error(); + } else { + Json::Reader starter_reader; + Json::Value starter; + std::string starter_content; + common::FileUtil::readFileToString(local_path_last_backup + backupStarter, &starter_content); + if (!starter_reader.parse(starter_content, starter) || !starter.isNumeric()) { + LOG(ERROR) << "Error happened when parsing the backup starter: "; // TODO: add error messages + } else { + last_ts = starter.asInt64(); + } + } std::unordered_map file_to_ts; const string sst_suffix = ".sst"; if (last_ts >= 0) { LOG(INFO) << "find the last backup on " << last_ts; - std::string local_path_last_backup = folly::stringPrintf("%s/%s%ld/", s3_incre_path.c_str(), db->db_name().c_str(), last_ts) + "checkpoint/"; std::string formatted_s3_last_backup_dir_path = folly::stringPrintf("%s%s/%ld/", ensure_ends_with_pathsep(FLAGS_s3_incre_backup_prefix).c_str(), db->db_name().c_str(), last_ts); boost::system::error_code remove_last_backup_err; boost::system::error_code create_last_backup_err; - // if we have not removed the last backup, we do not need to download it - boost::filesystem::create_directories(local_path_last_backup, create_last_backup_err); - SCOPE_EXIT { boost::filesystem::remove_all(local_path_last_backup, remove_last_backup_err); }; - if (create_err) { - // SetException("Cannot remove/create dir for backup: " + local_path, AdminErrorCode::DB_ADMIN_ERROR, &callback); - common::Stats::get()->Incr(kS3BackupFailure); - return false; - } - auto resp = local_s3_util->getObject(formatted_s3_last_backup_dir_path + backupDesc, + auto descriptor_resp = local_s3_util->getObject(formatted_s3_last_backup_dir_path + backupDesc, local_path_last_backup + backupDesc, FLAGS_s3_direct_io); - if (!resp.Error().empty()) { + if (!descriptor_resp.Error().empty()) { LOG(ERROR) << "Error happened when downloading the backup descriptor from S3 to local: " - << resp.Error(); + << descriptor_resp.Error(); return false; } - Json::Reader reader; + Json::Reader descriptor_reader; Json::Value last_backup_desc; std::string last_backup_desc_contents; common::FileUtil::readFileToString(local_path_last_backup + backupDesc, &last_backup_desc_contents); - if (!reader.parse(last_backup_desc_contents, last_backup_desc) || !last_backup_desc.isObject()) { - LOG(ERROR) << "Error happened when parsing the backup descriptor: " - << resp.Error(); + if (!descriptor_reader.parse(last_backup_desc_contents, last_backup_desc) || !last_backup_desc.isObject()) { + LOG(ERROR) << "Error happened when parsing the backup descriptor: "; // TODO: add error messages return false; } @@ -294,6 +318,10 @@ bool ApplicationDBBackupManager::backupDBToS3(const std::shared_ptrputObject(dest, source); @@ -385,9 +413,7 @@ bool ApplicationDBBackupManager::backupDBToS3(const std::shared_ptrIncr(kS3BackupFailure); + return false; + } + auto it = db_backups_.find(db->db_name()); if (it != db_backups_.end()) { it->second.emplace_back(ts); diff --git a/rocksdb_admin/tests/admin_handler_test.cpp b/rocksdb_admin/tests/admin_handler_test.cpp index a5e392ec..dd8a278e 100644 --- a/rocksdb_admin/tests/admin_handler_test.cpp +++ b/rocksdb_admin/tests/admin_handler_test.cpp @@ -943,7 +943,7 @@ TEST_F(AdminHandlerTestBase, BackupDescriptorTest) { auto local_s3_util = common::S3Util::BuildS3Util(FLAGS_incre_backup_limit_mbs, FLAGS_s3_incre_backup_bucket); for (int32_t i = 0; i < range; ++i) { - std::string kv_tmp = std::to_string(i); + std::string kv_tmp = folly::stringPrintf("%04d", i); writeToDB(testdb, kv_tmp, kv_tmp + "_" + kv_tmp); } flushDB(testdb); @@ -952,25 +952,35 @@ TEST_F(AdminHandlerTestBase, BackupDescriptorTest) { // some overlap between two batches of kv pairs for compaction test later for (int32_t i = 0; i < range; ++i) { - std::string kv_tmp = std::to_string(i + range/2); + std::string kv_tmp = folly::stringPrintf("%04d", i + range/2); writeToDB(testdb, kv_tmp, kv_tmp + "_" + kv_tmp + "_" + kv_tmp); } flushDB(testdb); // wait for the second incremental backup std::this_thread::sleep_for(std::chrono::seconds(FLAGS_async_incremental_backup_dbs_frequency_sec)); - auto timestamps = handler_->backup_manager_->getTimeStamp(testdb); - EXPECT_TRUE(timestamps.size() >= 2); std::string s3_path = FLAGS_s3_incre_backup_prefix + testdb + "/"; std::string restore_path = "/restore_tmp/" + testdb + "/"; - + boost::system::error_code remove_err; boost::system::error_code create_err; fs::create_directories(restore_path, create_err); SCOPE_EXIT { fs::remove_all(restore_path, remove_err); }; + const std::string backupStarter = "backup_starter"; + auto starter_resp = local_s3_util->getObject(s3_path + backupStarter, + restore_path + backupStarter, FLAGS_s3_direct_io); + EXPECT_TRUE(starter_resp.Error().empty()); + + Json::Reader starter_reader; + Json::Value starter; + std::string starter_content; + common::FileUtil::readFileToString(restore_path + backupStarter, &starter_content); + EXPECT_TRUE(starter_reader.parse(starter_content, starter) || starter.isNumeric()); + int64_t last_ts = starter.asInt64(); + // download the latest backup - auto responses = local_s3_util->getObjects(s3_path + std::to_string(timestamps[1]) + "/", restore_path, "/", + auto responses = local_s3_util->getObjects(s3_path + std::to_string(last_ts) + "/", restore_path, "/", FLAGS_s3_direct_io); EXPECT_TRUE(responses.Error().empty() && responses.Body().size() > 0); @@ -991,13 +1001,16 @@ TEST_F(AdminHandlerTestBase, BackupDescriptorTest) { for (Json::Value::const_iterator it = file_map.begin(); it != file_map.end(); ++it) { auto key = it.key().asString(); auto value = it->asInt64(); - if (value != timestamps[1]) { + if (value != last_ts) { auto response = local_s3_util->getObject(s3_path + std::to_string(value) + "/" + key, restore_path + key, FLAGS_s3_direct_io); EXPECT_TRUE(response.Error().empty()); } } boost::system::error_code remove_backup_desc_err; - boost::filesystem::remove(restore_path + backupDesc); + boost::filesystem::remove(restore_path + backupDesc, remove_backup_desc_err); + + boost::system::error_code remove_backup_starter_err; + boost::filesystem::remove(restore_path + backupStarter, remove_backup_starter_err); rocksdb::DB* restore_db_; Options options_restore; @@ -1019,8 +1032,8 @@ TEST_F(AdminHandlerTestBase, BackupDescriptorTest) { EXPECT_FALSE(it_db_->Valid() || it_restore_db_->Valid()); // compact local db and then compare again - rocksdb::Slice key_begin("0"); - rocksdb::Slice key_end(std::to_string(range - 1 + range/2)); + rocksdb::Slice key_begin(folly::stringPrintf("%04d", 0)); + rocksdb::Slice key_end(folly::stringPrintf("%04d", range - 1 + range/2)); auto s = db_->CompactRange(CompactRangeOptions(), &key_begin, &key_end); EXPECT_TRUE(s.ok());