-
Notifications
You must be signed in to change notification settings - Fork 118
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
implement backup descriptor for incremental backup #632
base: master
Are you sure you want to change the base?
Changes from 12 commits
3f46dde
13804c0
61c8112
292e719
189c6a0
a8e4fda
a799e57
1f678a7
f1c22ce
3ec6dae
c4340a2
64cdf4c
018a465
d3af48c
2c694fe
e49626f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
|
@@ -23,7 +23,6 @@ | |||
#include "common/timer.h" | ||||
#include "common/timeutil.h" | ||||
#include "rocksdb/utilities/checkpoint.h" | ||||
|
||||
#ifdef PINTEREST_INTERNAL | ||||
// NEVER SET THIS UNLESS PINTEREST INTERNAL USAGE. | ||||
#include "schemas/gen-cpp2/rocksdb_admin_types.h" | ||||
|
@@ -51,12 +50,20 @@ DEFINE_string(s3_incre_backup_prefix, "tmp/backup_test/incremental_backup_test/" | |||
DEFINE_int32(incre_backup_limit_mbs, 100, "the rate limit for s3 client"); | ||||
|
||||
DEFINE_bool(incre_backup_include_meta, false, "whether to backup meta data on s3"); | ||||
namespace admin { | ||||
|
||||
DECLARE_bool(s3_direct_io); | ||||
|
||||
namespace { | ||||
|
||||
const std::string kMetaFilename = "dbmeta"; | ||||
const std::string kS3BackupMs = "s3_backup_ms"; | ||||
const std::string kS3BackupFailure = "s3_backup_failure"; | ||||
const int kS3UtilRecheckSec = 5; | ||||
const std::string backupDesc = "backup_descriptor"; | ||||
|
||||
} | ||||
|
||||
namespace admin { | ||||
|
||||
ApplicationDBBackupManager::ApplicationDBBackupManager( | ||||
ApplicationDBManager* db_manager, | ||||
|
@@ -104,6 +111,27 @@ inline std::string ensure_ends_with_pathsep(const std::string& s) { | |||
return s; | ||||
} | ||||
|
||||
inline bool ends_with(const std::string& str, const std::string& suffix) { | ||||
return str.size() >= suffix.size() && | ||||
0 == str.compare(str.size()-suffix.size(), suffix.size(), suffix); | ||||
} | ||||
|
||||
inline int64_t remove_leading_zero(std::string& num) { | ||||
size_t nonzero_pos = num.find_first_not_of('0'); | ||||
if (nonzero_pos == std::string::npos) { | ||||
num.erase(0, num.size()-1); | ||||
} else { | ||||
num.erase(0, nonzero_pos); | ||||
} | ||||
return std::stol(num); | ||||
} | ||||
|
||||
// No checksum for current implementation, it needs to re-implement if checksum is enabled in sst names. | ||||
inline std::string parse_sst_id(const std::string& sst_name) { | ||||
// 123.sst (w/o checksum) | ||||
return sst_name.substr(0, sst_name.find_first_of('.')); | ||||
} | ||||
|
||||
// copy from admin_hamdler.cpp | ||||
inline bool should_new_s3_client( | ||||
const common::S3Util& s3_util, const uint32_t limit_mbs, const std::string& s3_bucket) { | ||||
|
@@ -142,20 +170,59 @@ std::shared_ptr<common::S3Util> ApplicationDBBackupManager::createLocalS3Util( | |||
return local_s3_util; | ||||
} | ||||
|
||||
// convert the string to a backup_descriptor map, if any error happens, it will return an empty map | ||||
// TODO: add sst_id_min and sst_id_max parsing | ||||
void ApplicationDBBackupManager::parseBackupDesc(const std::string& contents, | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Instead of writing our own parser we can just write the backup descriptor in json and parse the json
This will also be very easy to read manually. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||||
std::unordered_map<std::string, int64_t>* file_to_ts) { | ||||
// Example: | ||||
// contents = "file1=1;file2=2;file3=3;sst_id_min,sstd_id_max;" | ||||
size_t pos = 0; | ||||
|
||||
while (pos < contents.size()) { | ||||
size_t eq_pos = contents.find("=", pos); | ||||
// finish parsing | ||||
if (eq_pos == std::string::npos) return; | ||||
|
||||
std::string key = contents.substr(pos, eq_pos - pos); | ||||
if (key.empty()) return; | ||||
|
||||
size_t semi_pos = contents.find(";", eq_pos); | ||||
if (semi_pos == std::string::npos) return; | ||||
|
||||
std::string value_string = contents.substr(eq_pos + 1, semi_pos - eq_pos - 1); | ||||
if (value_string.empty()) return; | ||||
|
||||
int64_t value = remove_leading_zero(value_string); | ||||
file_to_ts->emplace(key, value); | ||||
|
||||
pos = semi_pos+1; | ||||
} | ||||
} | ||||
|
||||
// convert file_to_ts map, ssd_id_min, ssd_id_max to a string. | ||||
void ApplicationDBBackupManager::makeUpBackupDescString(const std::unordered_map<std::string, int64_t>& | ||||
file_to_ts, int64_t sst_id_min, int64_t sst_id_max, std::string& contents) { | ||||
contents.clear(); | ||||
for (auto& item : file_to_ts) { | ||||
contents += item.first + "=" + std::to_string(item.second) + ";"; | ||||
} | ||||
contents += std::to_string(sst_id_min) + "-" + std::to_string(sst_id_max) + ";"; | ||||
} | ||||
|
||||
bool ApplicationDBBackupManager::backupDBToS3(const std::shared_ptr<ApplicationDB>& db) { | ||||
// ::admin::AdminException e; | ||||
|
||||
common::Timer timer(kS3BackupMs); | ||||
LOG(INFO) << "S3 Backup " << db->db_name() << " to " << ensure_ends_with_pathsep(FLAGS_s3_incre_backup_prefix) << db->db_name(); | ||||
auto ts = common::timeutil::GetCurrentTimestamp(); | ||||
auto local_path = folly::stringPrintf("%ss3_tmp/%s%d/", rocksdb_dir_.c_str(), db->db_name().c_str(), ts); | ||||
auto local_path = folly::stringPrintf("%ss3_tmp/%s%ld/", rocksdb_dir_.c_str(), db->db_name().c_str(), ts); | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we add directory prefix incremental to distinguish regular backup from incremental backup |
||||
boost::system::error_code remove_err; | ||||
boost::system::error_code create_err; | ||||
boost::filesystem::remove_all(local_path, remove_err); | ||||
boost::filesystem::create_directories(local_path, create_err); | ||||
SCOPE_EXIT { boost::filesystem::remove_all(local_path, remove_err); }; | ||||
if (remove_err || create_err) { | ||||
// SetException("Cannot remove/create dir for backup: " + local_path, AdminErrorCode::DB_ADMIN_ERROR, &callback); | ||||
LOG(ERROR) << "Cannot remove/create dir for backup: " << local_path; | ||||
common::Stats::get()->Incr(kS3BackupFailure); | ||||
return false; | ||||
} | ||||
|
@@ -166,7 +233,6 @@ bool ApplicationDBBackupManager::backupDBToS3(const std::shared_ptr<ApplicationD | |||
rocksdb::Checkpoint* checkpoint; | ||||
auto status = rocksdb::Checkpoint::Create(db->rocksdb(), &checkpoint); | ||||
if (!status.ok()) { | ||||
// OKOrSetException(status, AdminErrorCode::DB_ADMIN_ERROR, &callback); | ||||
LOG(ERROR) << "Error happened when trying to initialize checkpoint: " << status.ToString(); | ||||
common::Stats::get()->Incr(kS3BackupFailure); | ||||
return false; | ||||
|
@@ -175,7 +241,6 @@ bool ApplicationDBBackupManager::backupDBToS3(const std::shared_ptr<ApplicationD | |||
auto checkpoint_local_path = local_path + "checkpoint"; | ||||
status = checkpoint->CreateCheckpoint(checkpoint_local_path); | ||||
if (!status.ok()) { | ||||
// OKOrSetException(status, AdminErrorCode::DB_ADMIN_ERROR, &callback); | ||||
LOG(ERROR) << "Error happened when trying to create checkpoint: " << status.ToString(); | ||||
common::Stats::get()->Incr(kS3BackupFailure); | ||||
return false; | ||||
|
@@ -185,17 +250,86 @@ bool ApplicationDBBackupManager::backupDBToS3(const std::shared_ptr<ApplicationD | |||
std::vector<std::string> checkpoint_files; | ||||
status = rocksdb::Env::Default()->GetChildren(checkpoint_local_path, &checkpoint_files); | ||||
if (!status.ok()) { | ||||
// OKOrSetException(status, AdminErrorCode::DB_ADMIN_ERROR, &callback); | ||||
LOG(ERROR) << "Error happened when trying to list files in the checkpoint: " << status.ToString(); | ||||
common::Stats::get()->Incr(kS3BackupFailure); | ||||
return false; | ||||
} | ||||
|
||||
// Upload checkpoint to s3 | ||||
// find the timestamp of last backup | ||||
int64_t last_ts = -1; | ||||
auto itor = db_backups_.find(db->db_name()); | ||||
if (itor != db_backups_.end()) { | ||||
last_ts = itor->second[(int)itor->second.size()-1]; | ||||
} | ||||
|
||||
// if there exists last backup, we need to get the backup_descriptor | ||||
auto local_s3_util = createLocalS3Util(FLAGS_incre_backup_limit_mbs, FLAGS_s3_incre_backup_bucket); | ||||
std::string formatted_s3_dir_path = folly::stringPrintf("%s%s/%d/", | ||||
std::string formatted_s3_dir_path_upload = folly::stringPrintf("%s%s/%ld/", | ||||
ensure_ends_with_pathsep(FLAGS_s3_incre_backup_prefix).c_str(), db->db_name().c_str(), ts); | ||||
std::string formatted_checkpoint_local_path = ensure_ends_with_pathsep(checkpoint_local_path); | ||||
|
||||
std::unordered_map<std::string, int64_t> file_to_ts; | ||||
const string sst_suffix = ".sst"; | ||||
|
||||
if (last_ts >= 0) { | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In this case if machine restarts we wouldnt have last_ts. Can you leave a comment that we need to handle this code doesn't handle this case well? Thanks! |
||||
LOG(INFO) << "find the last backup on " << last_ts; | ||||
std::string local_path_last_backup = folly::stringPrintf("%ss3_tmp/%s%ld/", rocksdb_dir_.c_str(), db->db_name().c_str(), last_ts) + "checkpoint/"; | ||||
std::string formatted_s3_last_backup_dir_path = folly::stringPrintf("%s%s/%ld/", | ||||
ensure_ends_with_pathsep(FLAGS_s3_incre_backup_prefix).c_str(), db->db_name().c_str(), last_ts); | ||||
|
||||
boost::system::error_code remove_last_backup_err; | ||||
boost::system::error_code create_last_backup_err; | ||||
// if we have not removed the last backup, we do not need to download it | ||||
bool need_download = !boost::filesystem::exists(local_path_last_backup + backupDesc); | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good idea to
Generally having two sources of data means we have to deal with inconsistencies between them that's why I am suggesting only use s3. |
||||
boost::filesystem::create_directories(local_path_last_backup, create_last_backup_err); | ||||
SCOPE_EXIT { boost::filesystem::remove_all(local_path_last_backup, remove_last_backup_err); }; | ||||
if (create_err) { | ||||
// SetException("Cannot remove/create dir for backup: " + local_path, AdminErrorCode::DB_ADMIN_ERROR, &callback); | ||||
common::Stats::get()->Incr(kS3BackupFailure); | ||||
return false; | ||||
} | ||||
|
||||
if (need_download) { | ||||
auto resp = local_s3_util->getObject(formatted_s3_last_backup_dir_path + backupDesc, | ||||
local_path_last_backup + backupDesc, FLAGS_s3_direct_io); | ||||
} else { | ||||
assert(boost::filesystem::exists(local_path_last_backup + backupDesc)); | ||||
} | ||||
|
||||
std::string last_backup_desc_contents; | ||||
common::FileUtil::readFileToString(local_path_last_backup + backupDesc, &last_backup_desc_contents); | ||||
|
||||
parseBackupDesc(last_backup_desc_contents, &file_to_ts); | ||||
if (file_to_ts.empty()) { | ||||
LOG(INFO) << "The last backup of " << db->db_name() << " on timestamp " << last_ts << " is broken"; | ||||
} | ||||
|
||||
// Remove all the duplicate files | ||||
checkpoint_files.erase(std::remove_if(checkpoint_files.begin(), checkpoint_files.end(), [&](string file){ | ||||
return file_to_ts.find(file) != file_to_ts.end() && ends_with(file, sst_suffix);}), checkpoint_files.end()); | ||||
|
||||
LOG(INFO) << "finish parsing last backup"; | ||||
} | ||||
|
||||
// Get the min_id and max_id for backup descriptor | ||||
int64_t sst_id_min = INT32_MAX; | ||||
int64_t sst_id_max = 0; | ||||
for (auto& file : checkpoint_files) { | ||||
if (ends_with(file, sst_suffix)) { | ||||
std::string file_id = parse_sst_id(file); | ||||
int64_t file_num = remove_leading_zero(file_id); | ||||
file_to_ts.emplace(file, ts); | ||||
sst_id_max = std::max(file_num, sst_id_max); | ||||
sst_id_min = std::min(file_num, sst_id_min); | ||||
} | ||||
} | ||||
|
||||
// Create the new backup descriptor file | ||||
std::string backup_desc_contents; | ||||
makeUpBackupDescString(file_to_ts, sst_id_min, sst_id_max, backup_desc_contents); | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we need to put min_id and max_id is it an optimization or some sort of check? |
||||
common::FileUtil::createFileWithContent(formatted_checkpoint_local_path, backupDesc, backup_desc_contents); | ||||
checkpoint_files.emplace_back(backupDesc); | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is no reason to use emplace_back here, just use push_back There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does this guarantee that backup descriptor will only be uploaded after all files get uploaded? |
||||
|
||||
auto upload_func = [&](const std::string& dest, const std::string& source) { | ||||
LOG(INFO) << "Copying " << source << " to " << dest; | ||||
auto copy_resp = local_s3_util->putObject(dest, source); | ||||
|
@@ -227,7 +361,7 @@ bool ApplicationDBBackupManager::backupDBToS3(const std::shared_ptr<ApplicationD | |||
executor_->add( | ||||
[&, files = std::move(files), p = std::move(p)]() mutable { | ||||
for (const auto& file : files) { | ||||
if (!upload_func(formatted_s3_dir_path + file, formatted_checkpoint_local_path + file)) { | ||||
if (!upload_func(formatted_s3_dir_path_upload + file, formatted_checkpoint_local_path + file)) { | ||||
p.setValue(false); | ||||
return; | ||||
} | ||||
|
@@ -239,9 +373,7 @@ bool ApplicationDBBackupManager::backupDBToS3(const std::shared_ptr<ApplicationD | |||
for (auto& f : futures) { | ||||
auto res = std::move(f).get(); | ||||
if (!res) { | ||||
// SetException("Error happened when uploading files from checkpoint to S3", | ||||
// AdminErrorCode::DB_ADMIN_ERROR, | ||||
// &callback); | ||||
LOG(ERROR) << "Error happened when uploading files from checkpoint to S3"; | ||||
common::Stats::get()->Incr(kS3BackupFailure); | ||||
return false; | ||||
} | ||||
|
@@ -251,11 +383,9 @@ bool ApplicationDBBackupManager::backupDBToS3(const std::shared_ptr<ApplicationD | |||
if (file == "." || file == "..") { | ||||
continue; | ||||
} | ||||
if (!upload_func(formatted_s3_dir_path + file, formatted_checkpoint_local_path + file)) { | ||||
if (!upload_func(formatted_s3_dir_path_upload + file, formatted_checkpoint_local_path + file)) { | ||||
// If there is error in one file uploading, then we fail the whole backup process | ||||
// SetException("Error happened when uploading files from checkpoint to S3", | ||||
// AdminErrorCode::DB_ADMIN_ERROR, | ||||
// &callback); | ||||
LOG(ERROR) << "Error happened when uploading files from checkpoint to S3"; | ||||
common::Stats::get()->Incr(kS3BackupFailure); | ||||
return false; | ||||
} | ||||
|
@@ -279,21 +409,25 @@ bool ApplicationDBBackupManager::backupDBToS3(const std::shared_ptr<ApplicationD | |||
dbmeta_path = common::FileUtil::createFileWithContent( | ||||
formatted_checkpoint_local_path, kMetaFilename, encoded_meta); | ||||
} catch (std::exception& e) { | ||||
// SetException("Failed to create meta file, " + std::string(e.what()), | ||||
// AdminErrorCode::DB_ADMIN_ERROR, &callback); | ||||
LOG(ERROR) << "Failed to create meta file, " << std::string(e.what()); | ||||
common::Stats::get()->Incr(kS3BackupFailure); | ||||
return false; | ||||
} | ||||
if (!upload_func(formatted_s3_dir_path + kMetaFilename, dbmeta_path)) { | ||||
// SetException("Error happened when upload meta from checkpoint to S3", | ||||
// AdminErrorCode::DB_ADMIN_ERROR, &callback); | ||||
if (!upload_func(formatted_s3_dir_path_upload + kMetaFilename, dbmeta_path)) { | ||||
LOG(ERROR) << "Error happened when upload meta from checkpoint to S3"; | ||||
common::Stats::get()->Incr(kS3BackupFailure); | ||||
return false; | ||||
} | ||||
} | ||||
|
||||
// Delete the directory to remove the snapshot. | ||||
boost::filesystem::remove_all(local_path); | ||||
auto it = db_backups_.find(db->db_name()); | ||||
if (it != db_backups_.end()) { | ||||
it->second.emplace_back(ts); | ||||
} else { | ||||
db_backups_.emplace(db->db_name(), std::vector<int64_t>(1, ts)); | ||||
} | ||||
|
||||
return true; | ||||
} | ||||
|
||||
bool ApplicationDBBackupManager::backupAllDBsToS3() { | ||||
|
@@ -316,4 +450,9 @@ bool ApplicationDBBackupManager::backupAllDBsToS3() { | |||
return ret; | ||||
} | ||||
|
||||
std::vector<int64_t> ApplicationDBBackupManager::getTimeStamp(const std::string& db) { | ||||
if (db_backups_.find(db) != db_backups_.end()) return db_backups_[db]; | ||||
else return std::vector<int64_t>(); | ||||
} | ||||
|
||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the function is not expensive prefer functions that don't modify input to avoid accidential bugs.
For example I'd just return the new string and pass num as constant.