diff --git a/rocksdb_admin/application_db_backup_manager.cpp b/rocksdb_admin/application_db_backup_manager.cpp index 45d572ec..8237bc75 100644 --- a/rocksdb_admin/application_db_backup_manager.cpp +++ b/rocksdb_admin/application_db_backup_manager.cpp @@ -23,7 +23,6 @@ #include "common/timer.h" #include "common/timeutil.h" #include "rocksdb/utilities/checkpoint.h" - #ifdef PINTEREST_INTERNAL // NEVER SET THIS UNLESS PINTEREST INTERNAL USAGE. #include "schemas/gen-cpp2/rocksdb_admin_types.h" @@ -51,12 +50,23 @@ DEFINE_string(s3_incre_backup_prefix, "tmp/backup_test/incremental_backup_test/" DEFINE_int32(incre_backup_limit_mbs, 100, "the rate limit for s3 client"); DEFINE_bool(incre_backup_include_meta, false, "whether to backup meta data on s3"); -namespace admin { + +DECLARE_bool(s3_direct_io); + +namespace { const std::string kMetaFilename = "dbmeta"; const std::string kS3BackupMs = "s3_backup_ms"; const std::string kS3BackupFailure = "s3_backup_failure"; -const int kS3UtilRecheckSec = 5; +const int32_t kS3UtilRecheckSec = 5; +const std::string backupDesc = "backup_descriptor"; +const std::string lastBackupTs = "last_backup_ts"; +const std::string fileToTs = "file_to_ts"; +const std::string backupStarter = "backup_starter"; + +} + +namespace admin { ApplicationDBBackupManager::ApplicationDBBackupManager( ApplicationDBManager* db_manager, @@ -104,6 +114,11 @@ inline std::string ensure_ends_with_pathsep(const std::string& s) { return s; } +inline bool ends_with(const std::string& str, const std::string& suffix) { + return str.size() >= suffix.size() && + 0 == str.compare(str.size()-suffix.size(), suffix.size(), suffix); +} + // copy from admin_hamdler.cpp inline bool should_new_s3_client( const common::S3Util& s3_util, const uint32_t limit_mbs, const std::string& s3_bucket) { @@ -142,20 +157,39 @@ std::shared_ptr ApplicationDBBackupManager::createLocalS3Util( return local_s3_util; } +// convert file_to_ts map, last_ts to a json value. +void ApplicationDBBackupManager::makeUpBackupDescJson(const std::unordered_map& + file_to_ts, int64_t last_ts, Json::Value& contents) { + contents.clear(); + Json::Value temp_map(Json::objectValue); + for (auto& item : file_to_ts) { + temp_map[item.first] = item.second; + } + contents[fileToTs] = temp_map; + contents[lastBackupTs] = last_ts; +} + bool ApplicationDBBackupManager::backupDBToS3(const std::shared_ptr& db) { // ::admin::AdminException e; + auto status = db->rocksdb()->Flush(rocksdb::FlushOptions()); + if(!status.ok()) { + LOG(ERROR) << db->db_name() << " can't flush successfully before backup"; + return false; + } + common::Timer timer(kS3BackupMs); LOG(INFO) << "S3 Backup " << db->db_name() << " to " << ensure_ends_with_pathsep(FLAGS_s3_incre_backup_prefix) << db->db_name(); auto ts = common::timeutil::GetCurrentTimestamp(); - auto local_path = folly::stringPrintf("%ss3_tmp/%s%d/", rocksdb_dir_.c_str(), db->db_name().c_str(), ts); + std::string s3_incre_path = rocksdb_dir_ + "s3_incre/"; + auto local_path = folly::stringPrintf("%s/%s%ld/", s3_incre_path.c_str(), db->db_name().c_str(), ts); boost::system::error_code remove_err; boost::system::error_code create_err; boost::filesystem::remove_all(local_path, remove_err); boost::filesystem::create_directories(local_path, create_err); SCOPE_EXIT { boost::filesystem::remove_all(local_path, remove_err); }; if (remove_err || create_err) { - // SetException("Cannot remove/create dir for backup: " + local_path, AdminErrorCode::DB_ADMIN_ERROR, &callback); + LOG(ERROR) << "Cannot remove/create dir for backup: " << local_path; common::Stats::get()->Incr(kS3BackupFailure); return false; } @@ -164,9 +198,8 @@ bool ApplicationDBBackupManager::backupDBToS3(const std::shared_ptrUnlock(db->db_name()); }; rocksdb::Checkpoint* checkpoint; - auto status = rocksdb::Checkpoint::Create(db->rocksdb(), &checkpoint); + status = rocksdb::Checkpoint::Create(db->rocksdb(), &checkpoint); if (!status.ok()) { - // OKOrSetException(status, AdminErrorCode::DB_ADMIN_ERROR, &callback); LOG(ERROR) << "Error happened when trying to initialize checkpoint: " << status.ToString(); common::Stats::get()->Incr(kS3BackupFailure); return false; @@ -175,7 +208,6 @@ bool ApplicationDBBackupManager::backupDBToS3(const std::shared_ptrCreateCheckpoint(checkpoint_local_path); if (!status.ok()) { - // OKOrSetException(status, AdminErrorCode::DB_ADMIN_ERROR, &callback); LOG(ERROR) << "Error happened when trying to create checkpoint: " << status.ToString(); common::Stats::get()->Incr(kS3BackupFailure); return false; @@ -185,17 +217,111 @@ bool ApplicationDBBackupManager::backupDBToS3(const std::shared_ptr checkpoint_files; status = rocksdb::Env::Default()->GetChildren(checkpoint_local_path, &checkpoint_files); if (!status.ok()) { - // OKOrSetException(status, AdminErrorCode::DB_ADMIN_ERROR, &callback); LOG(ERROR) << "Error happened when trying to list files in the checkpoint: " << status.ToString(); common::Stats::get()->Incr(kS3BackupFailure); return false; } - // Upload checkpoint to s3 + // find the timestamp of last backup + int64_t last_ts = -1; + + // if there exists last backup, we need to get the backup_descriptor auto local_s3_util = createLocalS3Util(FLAGS_incre_backup_limit_mbs, FLAGS_s3_incre_backup_bucket); - std::string formatted_s3_dir_path = folly::stringPrintf("%s%s/%d/", - ensure_ends_with_pathsep(FLAGS_s3_incre_backup_prefix).c_str(), db->db_name().c_str(), ts); + std::string formatted_s3_prefix = folly::stringPrintf("%s%s/", ensure_ends_with_pathsep(FLAGS_s3_incre_backup_prefix).c_str(), + db->db_name().c_str()); + std::string formatted_s3_dir_path_upload = formatted_s3_prefix + std::to_string(ts) + "/"; std::string formatted_checkpoint_local_path = ensure_ends_with_pathsep(checkpoint_local_path); + std::string local_path_last_backup = folly::stringPrintf("%s/%s%ld/", s3_incre_path.c_str(), db->db_name().c_str(), last_ts) + "checkpoint/"; + + boost::system::error_code remove_last_backup_err; + boost::system::error_code create_last_backup_err; + + boost::filesystem::create_directories(local_path_last_backup, create_last_backup_err); + SCOPE_EXIT { boost::filesystem::remove_all(local_path_last_backup, remove_last_backup_err); }; + if (create_err) { + LOG(ERROR) << "Cannot remove/create dir for backup: " + local_path_last_backup; + common::Stats::get()->Incr(kS3BackupFailure); + return false; + } + + auto starter_resp = local_s3_util->getObject(formatted_s3_prefix + backupStarter, + local_path_last_backup + backupStarter, FLAGS_s3_direct_io); + if (!starter_resp.Error().empty()) { + // No previous starter file, this is the first backup + LOG(ERROR) << "Error happened when downloading the backup starter from S3 to local: " + << starter_resp.Error(); + } else { + Json::Reader starter_reader; + Json::Value starter; + std::string starter_content; + common::FileUtil::readFileToString(local_path_last_backup + backupStarter, &starter_content); + if (!starter_reader.parse(starter_content, starter) || !starter.isNumeric()) { + LOG(ERROR) << "Error happened when parsing the backup starter: "; // TODO: add error messages + } else { + last_ts = starter.asInt64(); + } + } + + std::unordered_map file_to_ts; + const string sst_suffix = ".sst"; + + if (last_ts >= 0) { + LOG(INFO) << "find the last backup on " << last_ts; + std::string formatted_s3_last_backup_dir_path = folly::stringPrintf("%s%s/%ld/", + ensure_ends_with_pathsep(FLAGS_s3_incre_backup_prefix).c_str(), db->db_name().c_str(), last_ts); + + boost::system::error_code remove_last_backup_err; + boost::system::error_code create_last_backup_err; + + auto descriptor_resp = local_s3_util->getObject(formatted_s3_last_backup_dir_path + backupDesc, + local_path_last_backup + backupDesc, FLAGS_s3_direct_io); + if (!descriptor_resp.Error().empty()) { + LOG(ERROR) << "Error happened when downloading the backup descriptor from S3 to local: " + << descriptor_resp.Error(); + return false; + } + + Json::Reader descriptor_reader; + Json::Value last_backup_desc; + std::string last_backup_desc_contents; + common::FileUtil::readFileToString(local_path_last_backup + backupDesc, &last_backup_desc_contents); + if (!descriptor_reader.parse(last_backup_desc_contents, last_backup_desc) || !last_backup_desc.isObject()) { + LOG(ERROR) << "Error happened when parsing the backup descriptor: "; // TODO: add error messages + return false; + } + + const auto& file_map = last_backup_desc[fileToTs]; + for (Json::Value::const_iterator it = file_map.begin(); it != file_map.end(); ++it) { + file_to_ts.emplace(it.key().asString(), it->asInt64()); + } + + if (file_to_ts.empty()) { + LOG(INFO) << "The last backup of " << db->db_name() << " on timestamp " << last_ts << " is broken"; + } + + // Remove all the duplicate files + checkpoint_files.erase(std::remove_if(checkpoint_files.begin(), checkpoint_files.end(), [&](string file){ + return file_to_ts.find(file) != file_to_ts.end() && ends_with(file, sst_suffix);}), checkpoint_files.end()); + + LOG(INFO) << "finish parsing last backup"; + } + + // Put new files into the map + for (auto& file : checkpoint_files) { + if (ends_with(file, sst_suffix)) { + file_to_ts.emplace(file, ts); + } + } + + // Create the new backup descriptor file + Json::Value backup_desc_contents(Json::objectValue); + makeUpBackupDescJson(file_to_ts, last_ts, backup_desc_contents); + common::FileUtil::createFileWithContent(formatted_checkpoint_local_path, backupDesc, backup_desc_contents.toStyledString()); + + // Create the new backup starter file + Json::Value ts_starter(ts); + common::FileUtil::createFileWithContent(formatted_checkpoint_local_path, backupStarter, ts_starter.toStyledString()); + auto upload_func = [&](const std::string& dest, const std::string& source) { LOG(INFO) << "Copying " << source << " to " << dest; auto copy_resp = local_s3_util->putObject(dest, source); @@ -227,7 +353,7 @@ bool ApplicationDBBackupManager::backupDBToS3(const std::shared_ptradd( [&, files = std::move(files), p = std::move(p)]() mutable { for (const auto& file : files) { - if (!upload_func(formatted_s3_dir_path + file, formatted_checkpoint_local_path + file)) { + if (!upload_func(formatted_s3_dir_path_upload + file, formatted_checkpoint_local_path + file)) { p.setValue(false); return; } @@ -239,9 +365,7 @@ bool ApplicationDBBackupManager::backupDBToS3(const std::shared_ptrIncr(kS3BackupFailure); return false; } @@ -251,11 +375,9 @@ bool ApplicationDBBackupManager::backupDBToS3(const std::shared_ptrIncr(kS3BackupFailure); return false; } @@ -279,21 +401,42 @@ bool ApplicationDBBackupManager::backupDBToS3(const std::shared_ptrIncr(kS3BackupFailure); return false; } - if (!upload_func(formatted_s3_dir_path + kMetaFilename, dbmeta_path)) { - // SetException("Error happened when upload meta from checkpoint to S3", - // AdminErrorCode::DB_ADMIN_ERROR, &callback); + if (!upload_func(formatted_s3_dir_path_upload + kMetaFilename, dbmeta_path)) { + LOG(ERROR) << "Error happened when upload meta from checkpoint to S3"; common::Stats::get()->Incr(kS3BackupFailure); return false; } } - // Delete the directory to remove the snapshot. - boost::filesystem::remove_all(local_path); + // make sure the all other files has been uploaded successfully before uploading the backup descriptor + // TODO: We may store the all the timestamp on the backup starter so that we can check each one quickly. + if (!upload_func(formatted_s3_dir_path_upload + backupDesc, + formatted_checkpoint_local_path + backupDesc)) { + LOG(ERROR) << "Error happened when upload backup descriptor from checkpoint to S3"; + common::Stats::get()->Incr(kS3BackupFailure); + return false; + } + + // upload the starter file + if (!upload_func(formatted_s3_prefix + backupStarter, + formatted_checkpoint_local_path + backupStarter)) { + LOG(ERROR) << "Error happened when upload backup starter from checkpoint to S3"; + common::Stats::get()->Incr(kS3BackupFailure); + return false; + } + + auto it = db_backups_.find(db->db_name()); + if (it != db_backups_.end()) { + it->second.emplace_back(ts); + } else { + db_backups_.emplace(db->db_name(), std::vector(1, ts)); + } + + return true; } bool ApplicationDBBackupManager::backupAllDBsToS3() { @@ -316,4 +459,9 @@ bool ApplicationDBBackupManager::backupAllDBsToS3() { return ret; } +std::vector ApplicationDBBackupManager::getTimeStamp(const std::string& db) { + if (db_backups_.find(db) != db_backups_.end()) return db_backups_[db]; + else return std::vector(); +} + } \ No newline at end of file diff --git a/rocksdb_admin/application_db_backup_manager.h b/rocksdb_admin/application_db_backup_manager.h index bc436a71..52f041e3 100644 --- a/rocksdb_admin/application_db_backup_manager.h +++ b/rocksdb_admin/application_db_backup_manager.h @@ -18,6 +18,7 @@ #include #include +#include "common/jsoncpp/include/json/json.h" #include "common/object_lock.h" #include "common/s3util.h" #include "rocksdb/db.h" @@ -71,6 +72,10 @@ class ApplicationDBBackupManager { bool backupDBToS3(const std::shared_ptr& db); + std::vector getTimeStamp(const std::string& db); + + void makeUpBackupDescJson(const std::unordered_map& file_to_ts, int64_t last_ts, Json::Value& contents); + private: // copy from admin_hamdler.. std::shared_ptr createLocalS3Util(const uint32_t limit_mbs, @@ -83,6 +88,11 @@ class ApplicationDBBackupManager { std::string rocksdb_dir_; int32_t checkpoint_backup_batch_num_upload_; + // used to store all backups for each db. + // I only store the timestamps since I use that to name different backups. + // May add checksum in the future. + std::unordered_map> db_backups_; + std::shared_ptr s3_util_; mutable std::mutex s3_util_lock_; diff --git a/rocksdb_admin/tests/admin_handler_test.cpp b/rocksdb_admin/tests/admin_handler_test.cpp index b76ac7b4..dd8a278e 100644 --- a/rocksdb_admin/tests/admin_handler_test.cpp +++ b/rocksdb_admin/tests/admin_handler_test.cpp @@ -21,8 +21,10 @@ #include #include +#include "common/file_util.h" #include "common/s3util.h" #include "boost/filesystem.hpp" +#include "boost/range/iterator_range.hpp" #include "folly/SocketAddress.h" #include "gtest/gtest.h" #include "rocksdb/options.h" @@ -62,7 +64,9 @@ using apache::thrift::RpcOptions; using apache::thrift::ThriftServer; using apache::thrift::async::TAsyncSocket; using common::ThriftClientPool; +using rocksdb::CompactRangeOptions; using rocksdb::FlushOptions; +using rocksdb::IngestExternalFileOptions; using rocksdb::Options; using rocksdb::ReadOptions; using rocksdb::Status; @@ -92,6 +96,7 @@ DEFINE_bool( DECLARE_string(rocksdb_dir); DECLARE_bool(enable_checkpoint_backup); DECLARE_bool(rocksdb_allow_overlapping_keys); +DECLARE_bool(s3_direct_io); string generateRandIntAsStr() { static thread_local unsigned int seed = time(nullptr); @@ -137,10 +142,14 @@ class AdminHandlerTestBase : public testing::Test { AdminHandlerTestBase() { const testing::TestInfo* const test_info = testing::UnitTest::GetInstance()->current_test_info(); - std:string back_up_test_name = "ApplicationDBBackupManagerTest"; - if (strcmp(test_info->name(), back_up_test_name.c_str())) { + std::string back_up_test_name = "ApplicationDBBackupManagerTest"; + std::string backup_desc_test_name = "BackupDescriptorTest"; + if (0 == strcmp(test_info->name(), back_up_test_name.c_str())) { FLAGS_enable_async_incremental_backup_dbs = true; FLAGS_async_incremental_backup_dbs_frequency_sec = 1; + } else if (0 == strcmp(test_info->name(), backup_desc_test_name.c_str())) { + FLAGS_enable_async_incremental_backup_dbs = true; + FLAGS_async_incremental_backup_dbs_frequency_sec = 3; } // setup local , s3 paths for test FLAGS_rocksdb_dir = testDir(); @@ -919,6 +928,125 @@ TEST(AdminHandlerTest, InitS3Tmp) { EXPECT_FALSE(fs::exists(test_directory)); } +TEST_F(AdminHandlerTestBase, BackupDescriptorTest) { + int32_t range = 1000; + const string testdb = generateDBName(); + addDBWithRole(testdb, "LEADER"); + auto meta = handler_->getMetaData(testdb); + verifyMeta(meta, testdb, true, "", ""); + + // use incremental backup + handler_->writeMetaData(testdb, "fakes3bucket", "fakes3path"); + + AdminException e; + auto db_ = handler_->getDB(testdb, &e)->rocksdb(); + auto local_s3_util = common::S3Util::BuildS3Util(FLAGS_incre_backup_limit_mbs, FLAGS_s3_incre_backup_bucket); + + for (int32_t i = 0; i < range; ++i) { + std::string kv_tmp = folly::stringPrintf("%04d", i); + writeToDB(testdb, kv_tmp, kv_tmp + "_" + kv_tmp); + } + flushDB(testdb); + // wait for the first incremental backup + std::this_thread::sleep_for(std::chrono::seconds(FLAGS_async_incremental_backup_dbs_frequency_sec)); + + // some overlap between two batches of kv pairs for compaction test later + for (int32_t i = 0; i < range; ++i) { + std::string kv_tmp = folly::stringPrintf("%04d", i + range/2); + writeToDB(testdb, kv_tmp, kv_tmp + "_" + kv_tmp + "_" + kv_tmp); + } + flushDB(testdb); + // wait for the second incremental backup + std::this_thread::sleep_for(std::chrono::seconds(FLAGS_async_incremental_backup_dbs_frequency_sec)); + + std::string s3_path = FLAGS_s3_incre_backup_prefix + testdb + "/"; + std::string restore_path = "/restore_tmp/" + testdb + "/"; + + boost::system::error_code remove_err; + boost::system::error_code create_err; + fs::create_directories(restore_path, create_err); + SCOPE_EXIT { fs::remove_all(restore_path, remove_err); }; + + const std::string backupStarter = "backup_starter"; + auto starter_resp = local_s3_util->getObject(s3_path + backupStarter, + restore_path + backupStarter, FLAGS_s3_direct_io); + EXPECT_TRUE(starter_resp.Error().empty()); + + Json::Reader starter_reader; + Json::Value starter; + std::string starter_content; + common::FileUtil::readFileToString(restore_path + backupStarter, &starter_content); + EXPECT_TRUE(starter_reader.parse(starter_content, starter) || starter.isNumeric()); + int64_t last_ts = starter.asInt64(); + + // download the latest backup + auto responses = local_s3_util->getObjects(s3_path + std::to_string(last_ts) + "/", restore_path, "/", + FLAGS_s3_direct_io); + EXPECT_TRUE(responses.Error().empty() && responses.Body().size() > 0); + + std::string last_backup_desc_contents; + std::unordered_map file_to_ts; + const std::string backupDesc = "backup_descriptor"; + common::FileUtil::readFileToString(restore_path + backupDesc, &last_backup_desc_contents); + + Json::Reader reader; + Json::Value last_backup_desc; + EXPECT_TRUE(reader.parse(last_backup_desc_contents, last_backup_desc)); + EXPECT_TRUE(last_backup_desc.isObject()); + + const std::string fileToTs = "file_to_ts"; + const auto& file_map = last_backup_desc[fileToTs]; + + // download files from previous backup + for (Json::Value::const_iterator it = file_map.begin(); it != file_map.end(); ++it) { + auto key = it.key().asString(); + auto value = it->asInt64(); + if (value != last_ts) { + auto response = local_s3_util->getObject(s3_path + std::to_string(value) + "/" + key, restore_path + key, FLAGS_s3_direct_io); + EXPECT_TRUE(response.Error().empty()); + } + } + boost::system::error_code remove_backup_desc_err; + boost::filesystem::remove(restore_path + backupDesc, remove_backup_desc_err); + + boost::system::error_code remove_backup_starter_err; + boost::filesystem::remove(restore_path + backupStarter, remove_backup_starter_err); + + rocksdb::DB* restore_db_; + Options options_restore; + auto status = rocksdb::DB::Open(options_restore, restore_path, &restore_db_); + EXPECT_TRUE(status.ok()); + + ReadOptions db_readoptions = ReadOptions(); + auto it_db_ = db_->NewIterator(db_readoptions); + + ReadOptions restore_db_readoptions = ReadOptions(); + auto it_restore_db_ = restore_db_->NewIterator(restore_db_readoptions); + + // compare local db with restore db + for (it_db_->SeekToFirst(), it_restore_db_->SeekToFirst(); it_db_->Valid() && + it_restore_db_->Valid(); it_db_->Next(), it_restore_db_->Next()) { + EXPECT_TRUE(it_db_->key() == it_restore_db_->key()); + EXPECT_TRUE(it_db_->value() == it_restore_db_->value()); + } + EXPECT_FALSE(it_db_->Valid() || it_restore_db_->Valid()); + + // compact local db and then compare again + rocksdb::Slice key_begin(folly::stringPrintf("%04d", 0)); + rocksdb::Slice key_end(folly::stringPrintf("%04d", range - 1 + range/2)); + auto s = db_->CompactRange(CompactRangeOptions(), &key_begin, &key_end); + EXPECT_TRUE(s.ok()); + + it_db_ = db_->NewIterator(db_readoptions); + it_restore_db_ = restore_db_->NewIterator(restore_db_readoptions); + for (it_db_->SeekToFirst(), it_restore_db_->SeekToFirst(); it_db_->Valid() && + it_restore_db_->Valid(); it_db_->Next(), it_restore_db_->Next()) { + EXPECT_TRUE(it_db_->key() == it_restore_db_->key()); + EXPECT_TRUE(it_db_->value() == it_restore_db_->value()); + } + EXPECT_FALSE(it_db_->Valid() || it_restore_db_->Valid()); +} + } // namespace admin int main(int argc, char** argv) {