Skip to content

Commit

Permalink
[Feature] Partial support of TDE in non-cloud-native rowset read/write
Browse files Browse the repository at this point in the history
Signed-off-by: Binglin Chang <decstery@gmail.com>
  • Loading branch information
decster committed Oct 24, 2024
1 parent 6080065 commit 6490d24
Show file tree
Hide file tree
Showing 12 changed files with 191 additions and 34 deletions.
8 changes: 7 additions & 1 deletion be/src/storage/local_primary_key_recover.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include "storage/local_primary_key_recover.h"

#include "fs/key_cache.h"
#include "storage/chunk_helper.h"
#include "storage/tablet_meta_manager.h"
#include "storage/update_manager.h"
Expand Down Expand Up @@ -91,7 +92,12 @@ Status LocalPrimaryKeyRecover::rowset_iterator(
std::vector<uint32_t> delidxs;
for (int idx = 0; idx < rowset->num_delete_files(); idx++) {
auto path = Rowset::segment_del_file_path(rowset->rowset_path(), rowset->rowset_id(), idx);
ASSIGN_OR_RETURN(auto read_file, fs->new_random_access_file(path));
RandomAccessFileOptions opts;
auto& encryption_meta = rowset->rowset_meta()->get_delfile_encryption_meta(idx);
if (!encryption_meta.empty()) {
ASSIGN_OR_RETURN(opts.encryption_info, KeyCache::instance().unwrap_encryption_meta(encryption_meta));
}
ASSIGN_OR_RETURN(auto read_file, fs->new_random_access_file(opts, path));
del_rfs.push_back(std::move(read_file));
delidxs.push_back(rowset->rowset_meta()->get_meta_pb_without_schema().delfile_idxes(idx));
}
Expand Down
14 changes: 13 additions & 1 deletion be/src/storage/rowset/horizontal_update_rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include "storage/rowset/horizontal_update_rowset_writer.h"

#include "fs/key_cache.h"
#include "storage/rowset/rowset.h"
#include "storage/rowset/rowset_factory.h"
#include "storage/storage_engine.h"
Expand Down Expand Up @@ -41,7 +42,13 @@ HorizontalUpdateRowsetWriter::~HorizontalUpdateRowsetWriter() {
StatusOr<std::unique_ptr<SegmentWriter>> HorizontalUpdateRowsetWriter::_create_update_file_writer() {
std::lock_guard<std::mutex> l(_lock);
std::string path = Rowset::segment_upt_file_path(_context.rowset_path_prefix, _context.rowset_id, _num_uptfile);
ASSIGN_OR_RETURN(auto wfile, _fs->new_writable_file(path));
WritableFileOptions wopts;
if (config::enable_transparent_data_encryption) {
ASSIGN_OR_RETURN(auto pair, KeyCache::instance().create_encryption_meta_pair_using_current_kek());
wopts.encryption_info = pair.info;
_writer_options.encryption_meta = std::move(pair.encryption_meta);
}
ASSIGN_OR_RETURN(auto wfile, _fs->new_writable_file(wopts, path));
const auto schema = _context.tablet_schema;
auto segment_writer = std::make_unique<SegmentWriter>(std::move(wfile), _num_uptfile, schema, _writer_options);
RETURN_IF_ERROR(segment_writer->init());
Expand All @@ -58,6 +65,8 @@ Status HorizontalUpdateRowsetWriter::add_chunk(const Chunk& chunk) {
RETURN_IF_ERROR(_update_file_writer->finalize(&segment_size, &index_size, &footer_position));
{
std::lock_guard<std::mutex> l(_lock);
DCHECK(_updatefile_encryption_metas.size() == _num_uptfile);
_updatefile_encryption_metas.emplace_back(_writer_options.encryption_meta);
_num_uptfile++;
}
ASSIGN_OR_RETURN(_update_file_writer, _create_update_file_writer());
Expand Down Expand Up @@ -87,11 +96,14 @@ Status HorizontalUpdateRowsetWriter::flush_chunk(const Chunk& chunk, SegmentPB*
seg_info->set_update_id(_num_uptfile);
seg_info->set_update_data_size(segment_size);
seg_info->set_update_path((*segment_writer)->segment_path());
seg_info->set_update_encryption_meta((*segment_writer)->encryption_meta());
seg_info->set_update_row_size(static_cast<int64_t>(chunk.bytes_usage()));
}
{
std::lock_guard<std::mutex> l(_lock);
_num_rows_upt += chunk.num_rows();
DCHECK(_updatefile_encryption_metas.size() == _num_uptfile);
_updatefile_encryption_metas.emplace_back(_writer_options.encryption_meta);
_num_uptfile++;
_total_update_row_size += static_cast<int64_t>(chunk.bytes_usage());
}
Expand Down
52 changes: 25 additions & 27 deletions be/src/storage/rowset/rowset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,24 +167,33 @@ Status Rowset::init() {
return Status::OK();
}

StatusOr<std::shared_ptr<Segment>> Rowset::_load_segment(int32_t idx, const TabletSchemaCSPtr& schema,
std::shared_ptr<FileSystem>& fs,
const FooterPointerPB* partial_rowset_footer) {
size_t footer_size_hint = 16 * 1024;
std::string seg_path = segment_file_path(_rowset_path, rowset_id(), idx);
FileInfo seg_info{.path = seg_path, .encryption_meta = rowset_meta()->get_segment_encryption_meta(idx)};
auto res = Segment::open(fs, seg_info, idx, schema, &footer_size_hint, partial_rowset_footer);
if (!res.ok()) {
auto st = res.status().clone_and_prepend(fmt::format(
"Load segment failed tablet:{} rowset:{} rssid:{} seg:{} path:{}", _rowset_meta->tablet_id(),
rowset_id().to_string(), _rowset_meta->get_rowset_seg_id(), idx, seg_path));
LOG(WARNING) << st.message();
return st;
}
return res;
}

// use partial_rowset_footer to indicate the segment footer position and size
// if partial_rowset_footer is nullptr, the segment_footer is at the end of the segment_file
Status Rowset::do_load() {
ASSIGN_OR_RETURN(auto fs, FileSystem::CreateSharedFromString(_rowset_path));
_segments.clear();
size_t footer_size_hint = 16 * 1024;
for (int seg_id = 0; seg_id < num_segments(); ++seg_id) {
std::string seg_path = segment_file_path(_rowset_path, rowset_id(), seg_id);
FileInfo seg_info{seg_path};
auto res = Segment::open(fs, seg_info, seg_id, _schema, &footer_size_hint,
rowset_meta()->partial_rowset_footer(seg_id));
auto res = _load_segment(seg_id, _schema, fs, rowset_meta()->partial_rowset_footer(seg_id));
if (!res.ok()) {
auto st = res.status().clone_and_prepend(fmt::format(
"Load rowset failed tablet:{} rowset:{} rssid:{} seg:{} path:{}", _rowset_meta->tablet_id(),
rowset_id().to_string(), _rowset_meta->get_rowset_seg_id(), seg_id, seg_path));
LOG(WARNING) << st.message();
_segments.clear();
return st;
return res.status();
}
_segments.push_back(std::move(res).value());
}
Expand Down Expand Up @@ -213,13 +222,9 @@ void Rowset::warmup_lrucache() {
Status Rowset::reload() {
ASSIGN_OR_RETURN(auto fs, FileSystem::CreateSharedFromString(_rowset_path));
_segments.clear();
size_t footer_size_hint = 16 * 1024;
for (int seg_id = 0; seg_id < num_segments(); ++seg_id) {
std::string seg_path = segment_file_path(_rowset_path, rowset_id(), seg_id);
FileInfo seg_info{.path = seg_path};
auto res = Segment::open(fs, seg_info, seg_id, _schema, &footer_size_hint);
auto res = _load_segment(seg_id, _schema, fs, nullptr);
if (!res.ok()) {
LOG(WARNING) << "Fail to open " << seg_path << ": " << res.status();
_segments.clear();
return res.status();
}
Expand All @@ -235,12 +240,8 @@ Status Rowset::reload_segment(int32_t segment_id) {
return Status::InternalError("Error segment id");
}
ASSIGN_OR_RETURN(auto fs, FileSystem::CreateSharedFromString(_rowset_path));
size_t footer_size_hint = 16 * 1024;
std::string seg_path = segment_file_path(_rowset_path, rowset_id(), segment_id);
FileInfo seg_info{.path = seg_path};
auto res = Segment::open(fs, seg_info, segment_id, _schema, &footer_size_hint);
auto res = _load_segment(segment_id, _schema, fs, nullptr);
if (!res.ok()) {
LOG(WARNING) << "Fail to open " << seg_path << ": " << res.status();
return res.status();
}
_segments[segment_id] = std::move(res).value();
Expand All @@ -254,12 +255,9 @@ Status Rowset::reload_segment_with_schema(int32_t segment_id, TabletSchemaCSPtr&
return Status::InternalError("Error segment id");
}
ASSIGN_OR_RETURN(auto fs, FileSystem::CreateSharedFromString(_rowset_path));
size_t footer_size_hint = 16 * 1024;
std::string seg_path = segment_file_path(_rowset_path, rowset_id(), segment_id);
FileInfo seg_info{.path = seg_path};
auto res = Segment::open(fs, seg_info, segment_id, schema, &footer_size_hint);
auto res = _load_segment(segment_id, schema, fs, nullptr);
if (!res.ok()) {
LOG(WARNING) << "Fail to open " << seg_path << ": " << res.status();
_segments.clear();
return res.status();
}
_segments[segment_id] = std::move(res).value();
Expand Down Expand Up @@ -872,7 +870,7 @@ StatusOr<std::vector<ChunkIteratorPtr>> Rowset::get_update_file_iterators(const
for (int64_t i = 0; i < num_update_files(); i++) {
// open update file
std::string seg_path = segment_upt_file_path(_rowset_path, rowset_id(), i);
FileInfo seg_info{.path = seg_path};
FileInfo seg_info{.path = seg_path, .encryption_meta = rowset_meta()->get_uptfile_encryption_meta(i)};
ASSIGN_OR_RETURN(auto seg_ptr, Segment::open(seg_options.fs, seg_info, i, _schema));
if (seg_ptr->num_rows() == 0) {
seg_iterators[i] = new_empty_iterator(schema, config::vector_chunk_size);
Expand Down Expand Up @@ -904,7 +902,7 @@ StatusOr<ChunkIteratorPtr> Rowset::get_update_file_iterator(const Schema& schema
// open update file
DCHECK(update_file_id < num_update_files());
std::string seg_path = segment_upt_file_path(_rowset_path, rowset_id(), update_file_id);
FileInfo seg_info{.path = seg_path};
FileInfo seg_info{.path = seg_path, .encryption_meta = rowset_meta()->get_uptfile_encryption_meta(update_file_id)};
ASSIGN_OR_RETURN(auto seg_ptr, Segment::open(seg_options.fs, seg_info, update_file_id, _schema));
if (seg_ptr->num_rows() == 0) {
return new_empty_iterator(schema, config::vector_chunk_size);
Expand Down
4 changes: 4 additions & 0 deletions be/src/storage/rowset/rowset.h
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,10 @@ class Rowset : public std::enable_shared_from_this<Rowset>, public BaseRowset {

Status _copy_delta_column_group_files(KVStore* kvstore, const std::string& dir, int64_t version);

StatusOr<std::shared_ptr<Segment>> _load_segment(int32_t idx, const TabletSchemaCSPtr& schema,
std::shared_ptr<FileSystem>& fs,
const FooterPointerPB* partial_rowset_footer);

std::vector<SegmentSharedPtr> _segments;

std::atomic<bool> is_compacting{false};
Expand Down
20 changes: 20 additions & 0 deletions be/src/storage/rowset/rowset_meta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,24 @@ RowsetMeta::~RowsetMeta() {
MEM_TRACKER_SAFE_RELEASE(GlobalEnv::GetInstance()->rowset_metadata_mem_tracker(), _mem_usage);
}

static string empty_encryption_meta;

const string& RowsetMeta::get_segment_encryption_meta(int segment_id) const {
const auto size = _rowset_meta_pb->segment_encryption_metas_size();
DCHECK(segment_id >= 0 && (segment_id < size || size == 0));
return segment_id < size ? _rowset_meta_pb->segment_encryption_metas(segment_id) : empty_encryption_meta;
}

const string& RowsetMeta::get_uptfile_encryption_meta(int upt_file_id) const {
const auto size = _rowset_meta_pb->updatefile_encryption_metas_size();
DCHECK(upt_file_id >= 0 && (upt_file_id < size || size == 0));
return upt_file_id < size ? _rowset_meta_pb->updatefile_encryption_metas(upt_file_id) : empty_encryption_meta;
}

const string& RowsetMeta::get_delfile_encryption_meta(int del_file_id) const {
const auto size = _rowset_meta_pb->delfile_encryption_metas_size();
DCHECK(del_file_id >= 0 && (del_file_id < size || size == 0));
return del_file_id < size ? _rowset_meta_pb->delfile_encryption_metas(del_file_id) : empty_encryption_meta;
}

} // namespace starrocks
4 changes: 4 additions & 0 deletions be/src/storage/rowset/rowset_meta.h
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,10 @@ class RowsetMeta {
return false;
}

const string& get_segment_encryption_meta(int segment_id) const;
const string& get_uptfile_encryption_meta(int upt_file_id) const;
const string& get_delfile_encryption_meta(int del_file_id) const;

private:
bool _deserialize_from_pb(std::string_view value) {
return _rowset_meta_pb->ParseFromArray(value.data(), value.size());
Expand Down
Loading

0 comments on commit 6490d24

Please sign in to comment.