Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize celldb #7

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 26 additions & 7 deletions validator/db/celldb.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,12 @@ void CellDbBase::execute_sync(std::function<void()> f) {
}

CellDbIn::CellDbIn(td::actor::ActorId<RootDb> root_db, td::actor::ActorId<CellDb> parent, std::string path,
td::Ref<ValidatorManagerOptions> opts)
: root_db_(root_db), parent_(parent), path_(std::move(path)), opts_(opts) {
td::Ref<ValidatorManagerOptions> opts, std::shared_ptr<vm::KeyValue> cell_db)
: root_db_(root_db), parent_(parent), path_(std::move(path)), opts_(opts){
if (cell_db != NULL) {
LOG(INFO) << "Using provided cell_db";
cell_db_ = cell_db;
}
}

void CellDbIn::start_up() {
Expand Down Expand Up @@ -97,8 +101,10 @@ void CellDbIn::start_up() {
LOG(WARNING) << "Set CellDb block cache size to " << td::format::as_size(opts_->get_celldb_cache_size().value());
}
db_options.use_direct_reads = opts_->get_celldb_direct_io();
cell_db_ = std::make_shared<td::RocksDb>(td::RocksDb::open(path_, std::move(db_options)).move_as_ok());

if (cell_db_ == NULL){
LOG(INFO) << "CellDbIn using a new rocksdb instance";
cell_db_ = std::make_shared<td::RocksDb>(td::RocksDb::open(path_, std::move(db_options)).move_as_ok());
}

boc_ = vm::DynamicBagOfCellsDb::create();
boc_->set_celldb_compress_depth(opts_->get_celldb_compress_depth());
Expand Down Expand Up @@ -439,11 +445,19 @@ void CellDbIn::migrate_cells() {
}

void CellDb::load_cell(RootHash hash, td::Promise<td::Ref<vm::DataCell>> promise) {
int ranNum = GetDBRandomNum();
static int64_t ranCount = 0;
ranCount++;
if (ranCount % 1000 == 0) {
LOG(ERROR) << "CellDb mailbox: " << this->get_name() << " " << this->get_actor_info_ptr()->mailbox().reader().calc_size() << ", ranNum: " << ranNum;
// LOG(ERROR) << "yus " << this->get_name() << " " << this->get_actor_info_ptr()->mailbox().reader().calc_size();
ranCount = 0;
}
if (!started_) {
td::actor::send_closure(cell_db_, &CellDbIn::load_cell, hash, std::move(promise));
td::actor::send_closure(cell_db_read_[ranNum], &CellDbIn::load_cell, hash, std::move(promise));
} else {
auto P = td::PromiseCreator::lambda(
[cell_db_in = cell_db_.get(), hash, promise = std::move(promise)](td::Result<td::Ref<vm::DataCell>> R) mutable {
[cell_db_in = cell_db_read_[ranNum].get(), hash, promise = std::move(promise)](td::Result<td::Ref<vm::DataCell>> R) mutable {
if (R.is_error()) {
td::actor::send_closure(cell_db_in, &CellDbIn::load_cell, hash, std::move(promise));
} else {
Expand All @@ -470,7 +484,12 @@ void CellDb::start_up() {
CellDbBase::start_up();
boc_ = vm::DynamicBagOfCellsDb::create();
boc_->set_celldb_compress_depth(opts_->get_celldb_compress_depth());
cell_db_ = td::actor::create_actor<CellDbIn>("celldbin", root_db_, actor_id(this), path_, opts_);
cell_db_ = td::actor::create_actor<CellDbIn>("celldbin", root_db_, actor_id(this), path_, opts_, rocks_db_);

// cell_db_ = td::actor::create_actor<CellDbIn>("celldbin", root_db_, actor_id(this), path_, opts_, rocks_db_);
for (int i = 0; i < THREAD_COUNTS; i++) {
cell_db_read_[i] = td::actor::create_actor<CellDbIn>("celldbin", root_db_, actor_id(this), path_, opts_, rocks_db_);
}
on_load_callback_ = [actor = std::make_shared<td::actor::ActorOwn<CellDbIn::MigrationProxy>>(
td::actor::create_actor<CellDbIn::MigrationProxy>("celldbmigration", cell_db_.get())),
compress_depth = opts_->get_celldb_compress_depth()](const vm::CellLoader::LoadResult& res) {
Expand Down
9 changes: 5 additions & 4 deletions validator/db/celldb.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ class Statistics;
}

namespace ton {

namespace validator {

class RootDb;
Expand Down Expand Up @@ -68,7 +67,7 @@ class CellDbIn : public CellDbBase {
void flush_db_stats();

CellDbIn(td::actor::ActorId<RootDb> root_db, td::actor::ActorId<CellDb> parent, std::string path,
td::Ref<ValidatorManagerOptions> opts);
td::Ref<ValidatorManagerOptions> opts, std::shared_ptr<vm::KeyValue> cell_db);

void start_up() override;
void alarm() override;
Expand Down Expand Up @@ -171,8 +170,8 @@ class CellDb : public CellDbBase {
void get_cell_db_reader(td::Promise<std::shared_ptr<vm::CellDbReader>> promise);
void get_last_deleted_mc_state(td::Promise<BlockSeqno> promise);

CellDb(td::actor::ActorId<RootDb> root_db, std::string path, td::Ref<ValidatorManagerOptions> opts)
: root_db_(root_db), path_(path), opts_(opts) {
CellDb(td::actor::ActorId<RootDb> root_db, std::string path, td::Ref<ValidatorManagerOptions> opts, std::shared_ptr<vm::KeyValue> rocks_db)
: root_db_(root_db), path_(path), opts_(opts), rocks_db_(rocks_db) {
}

void start_up() override;
Expand All @@ -183,6 +182,8 @@ class CellDb : public CellDbBase {
td::Ref<ValidatorManagerOptions> opts_;

td::actor::ActorOwn<CellDbIn> cell_db_;
td::actor::ActorOwn<CellDbIn> cell_db_read_[THREAD_COUNTS];
std::shared_ptr<vm::KeyValue> rocks_db_;

std::unique_ptr<vm::DynamicBagOfCellsDb> boc_;
bool started_ = false;
Expand Down
10 changes: 10 additions & 0 deletions validator/db/db-utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
#include "td/utils/logging.h"

#include <cmath>
#include <iostream>
#include <random>

namespace ton::validator {

Expand Down Expand Up @@ -51,4 +53,12 @@ void PercentileStats::clear() {
values_.clear();
}

int GetDBRandomNum(){
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<> distr(0, THREAD_COUNTS -1);
int random_number = distr(gen);
return random_number;
}

} // namespace ton::validator
4 changes: 4 additions & 0 deletions validator/db/db-utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,8 @@ class PercentileStats {
std::multiset<double> values_;
};

const int THREAD_COUNTS = 10;
int GetDBRandomNum();


} // namespace ton::validator
31 changes: 29 additions & 2 deletions validator/db/rootdb.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,16 @@ void RootDb::get_block_state(ConstBlockHandle handle, td::Promise<td::Ref<ShardS
promise.set_value(S.move_as_ok());
}
});
td::actor::send_closure(cell_db_, &CellDb::load_cell, handle->state(), std::move(P));

int ranNum = GetDBRandomNum();
static int64_t ranCount = 0;
ranCount++;
if (ranCount % 1000 == 0) {
LOG(ERROR) << "RootDb mailbox: " << this->get_name() << " " << this->get_actor_info_ptr()->mailbox().reader().calc_size() << ", ranNum: " << ranNum;
ranCount = 0;
}

td::actor::send_closure(cell_db_read_[ranNum], &CellDb::load_cell, handle->state(), std::move(P));
} else {
promise.set_error(td::Status::Error(ErrorCode::notready, "state not in db"));
}
Expand Down Expand Up @@ -407,7 +416,25 @@ void RootDb::get_hardforks(td::Promise<std::vector<BlockIdExt>> promise) {
}

void RootDb::start_up() {
cell_db_ = td::actor::create_actor<CellDb>("celldb", actor_id(this), root_path_ + "/celldb/", opts_);
td::RocksDbOptions db_options;
auto statistics_ = td::RocksDb::create_statistics();
if (!opts_->get_disable_rocksdb_stats()) {
db_options.snapshot_statistics = std::make_shared<td::RocksDbSnapshotStatistics>();
}
db_options.statistics = statistics_;
if (opts_->get_celldb_cache_size()) {
db_options.block_cache = td::RocksDb::create_cache(opts_->get_celldb_cache_size().value());
LOG(WARNING) << "Set CellDb block cache size to " << td::format::as_size(opts_->get_celldb_cache_size().value());
}
db_options.use_direct_reads = opts_->get_celldb_direct_io();
auto path = root_path_ + "/celldb/";
auto rock_db = std::make_shared<td::RocksDb>(td::RocksDb::open(path, std::move(db_options)).move_as_ok());

cell_db_ = td::actor::create_actor<CellDb>("celldb", actor_id(this), path, opts_, rock_db);
for (int i = 0; i < THREAD_COUNTS; i++){
cell_db_read_[i] = td::actor::create_actor<CellDb>("celldb", actor_id(this), path, opts_, rock_db);
}
// cell_db_ = td::actor::create_actor<CellDb>("celldb", actor_id(this), root_path_ + "/celldb/", opts_);
state_db_ = td::actor::create_actor<StateDb>("statedb", actor_id(this), root_path_ + "/state/");
static_files_db_ = td::actor::create_actor<StaticFilesDb>("staticfilesdb", actor_id(this), root_path_ + "/static/");
archive_db_ = td::actor::create_actor<ArchiveManager>("archive", actor_id(this), root_path_, opts_);
Expand Down
1 change: 1 addition & 0 deletions validator/db/rootdb.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ class RootDb : public Db {
td::Ref<ValidatorManagerOptions> opts_;

td::actor::ActorOwn<CellDb> cell_db_;
td::actor::ActorOwn<CellDb> cell_db_read_[THREAD_COUNTS];
td::actor::ActorOwn<StateDb> state_db_;
td::actor::ActorOwn<StaticFilesDb> static_files_db_;
td::actor::ActorOwn<ArchiveManager> archive_db_;
Expand Down
Loading