From 6490d242529e4ae142b2012a5b988db1a8773765 Mon Sep 17 00:00:00 2001 From: Binglin Chang Date: Thu, 19 Sep 2024 15:18:40 +0800 Subject: [PATCH] [Feature] Partial support of TDE in non-cloud-native rowset read/write Signed-off-by: Binglin Chang --- be/src/storage/local_primary_key_recover.cpp | 8 +- .../horizontal_update_rowset_writer.cpp | 14 ++- be/src/storage/rowset/rowset.cpp | 52 ++++++----- be/src/storage/rowset/rowset.h | 4 + be/src/storage/rowset/rowset_meta.cpp | 20 +++++ be/src/storage/rowset/rowset_meta.h | 4 + be/src/storage/rowset/rowset_writer.cpp | 90 ++++++++++++++++++- be/src/storage/rowset/rowset_writer.h | 3 + be/src/storage/rowset_update_state.cpp | 8 +- .../engine_storage_migration_task_test.cpp | 16 ++++ gensrc/proto/data.proto | 3 + gensrc/proto/olap_file.proto | 3 + 12 files changed, 191 insertions(+), 34 deletions(-) diff --git a/be/src/storage/local_primary_key_recover.cpp b/be/src/storage/local_primary_key_recover.cpp index 4958e545b8835..93d92c0348756 100644 --- a/be/src/storage/local_primary_key_recover.cpp +++ b/be/src/storage/local_primary_key_recover.cpp @@ -14,6 +14,7 @@ #include "storage/local_primary_key_recover.h" +#include "fs/key_cache.h" #include "storage/chunk_helper.h" #include "storage/tablet_meta_manager.h" #include "storage/update_manager.h" @@ -91,7 +92,12 @@ Status LocalPrimaryKeyRecover::rowset_iterator( std::vector delidxs; for (int idx = 0; idx < rowset->num_delete_files(); idx++) { auto path = Rowset::segment_del_file_path(rowset->rowset_path(), rowset->rowset_id(), idx); - ASSIGN_OR_RETURN(auto read_file, fs->new_random_access_file(path)); + RandomAccessFileOptions opts; + auto& encryption_meta = rowset->rowset_meta()->get_delfile_encryption_meta(idx); + if (!encryption_meta.empty()) { + ASSIGN_OR_RETURN(opts.encryption_info, KeyCache::instance().unwrap_encryption_meta(encryption_meta)); + } + ASSIGN_OR_RETURN(auto read_file, fs->new_random_access_file(opts, path)); del_rfs.push_back(std::move(read_file)); delidxs.push_back(rowset->rowset_meta()->get_meta_pb_without_schema().delfile_idxes(idx)); } diff --git a/be/src/storage/rowset/horizontal_update_rowset_writer.cpp b/be/src/storage/rowset/horizontal_update_rowset_writer.cpp index 525b28fabadd7..c8f9539f41c25 100644 --- a/be/src/storage/rowset/horizontal_update_rowset_writer.cpp +++ b/be/src/storage/rowset/horizontal_update_rowset_writer.cpp @@ -14,6 +14,7 @@ #include "storage/rowset/horizontal_update_rowset_writer.h" +#include "fs/key_cache.h" #include "storage/rowset/rowset.h" #include "storage/rowset/rowset_factory.h" #include "storage/storage_engine.h" @@ -41,7 +42,13 @@ HorizontalUpdateRowsetWriter::~HorizontalUpdateRowsetWriter() { StatusOr> HorizontalUpdateRowsetWriter::_create_update_file_writer() { std::lock_guard l(_lock); std::string path = Rowset::segment_upt_file_path(_context.rowset_path_prefix, _context.rowset_id, _num_uptfile); - ASSIGN_OR_RETURN(auto wfile, _fs->new_writable_file(path)); + WritableFileOptions wopts; + if (config::enable_transparent_data_encryption) { + ASSIGN_OR_RETURN(auto pair, KeyCache::instance().create_encryption_meta_pair_using_current_kek()); + wopts.encryption_info = pair.info; + _writer_options.encryption_meta = std::move(pair.encryption_meta); + } + ASSIGN_OR_RETURN(auto wfile, _fs->new_writable_file(wopts, path)); const auto schema = _context.tablet_schema; auto segment_writer = std::make_unique(std::move(wfile), _num_uptfile, schema, _writer_options); RETURN_IF_ERROR(segment_writer->init()); @@ -58,6 +65,8 @@ Status HorizontalUpdateRowsetWriter::add_chunk(const Chunk& chunk) { RETURN_IF_ERROR(_update_file_writer->finalize(&segment_size, &index_size, &footer_position)); { std::lock_guard l(_lock); + DCHECK(_updatefile_encryption_metas.size() == _num_uptfile); + _updatefile_encryption_metas.emplace_back(_writer_options.encryption_meta); _num_uptfile++; } ASSIGN_OR_RETURN(_update_file_writer, _create_update_file_writer()); @@ -87,11 +96,14 @@ Status HorizontalUpdateRowsetWriter::flush_chunk(const Chunk& chunk, SegmentPB* seg_info->set_update_id(_num_uptfile); seg_info->set_update_data_size(segment_size); seg_info->set_update_path((*segment_writer)->segment_path()); + seg_info->set_update_encryption_meta((*segment_writer)->encryption_meta()); seg_info->set_update_row_size(static_cast(chunk.bytes_usage())); } { std::lock_guard l(_lock); _num_rows_upt += chunk.num_rows(); + DCHECK(_updatefile_encryption_metas.size() == _num_uptfile); + _updatefile_encryption_metas.emplace_back(_writer_options.encryption_meta); _num_uptfile++; _total_update_row_size += static_cast(chunk.bytes_usage()); } diff --git a/be/src/storage/rowset/rowset.cpp b/be/src/storage/rowset/rowset.cpp index 8d39783e1bd6e..85d62b83027f7 100644 --- a/be/src/storage/rowset/rowset.cpp +++ b/be/src/storage/rowset/rowset.cpp @@ -167,24 +167,33 @@ Status Rowset::init() { return Status::OK(); } +StatusOr> Rowset::_load_segment(int32_t idx, const TabletSchemaCSPtr& schema, + std::shared_ptr& fs, + const FooterPointerPB* partial_rowset_footer) { + size_t footer_size_hint = 16 * 1024; + std::string seg_path = segment_file_path(_rowset_path, rowset_id(), idx); + FileInfo seg_info{.path = seg_path, .encryption_meta = rowset_meta()->get_segment_encryption_meta(idx)}; + auto res = Segment::open(fs, seg_info, idx, schema, &footer_size_hint, partial_rowset_footer); + if (!res.ok()) { + auto st = res.status().clone_and_prepend(fmt::format( + "Load segment failed tablet:{} rowset:{} rssid:{} seg:{} path:{}", _rowset_meta->tablet_id(), + rowset_id().to_string(), _rowset_meta->get_rowset_seg_id(), idx, seg_path)); + LOG(WARNING) << st.message(); + return st; + } + return res; +} + // use partial_rowset_footer to indicate the segment footer position and size // if partial_rowset_footer is nullptr, the segment_footer is at the end of the segment_file Status Rowset::do_load() { ASSIGN_OR_RETURN(auto fs, FileSystem::CreateSharedFromString(_rowset_path)); _segments.clear(); - size_t footer_size_hint = 16 * 1024; for (int seg_id = 0; seg_id < num_segments(); ++seg_id) { - std::string seg_path = segment_file_path(_rowset_path, rowset_id(), seg_id); - FileInfo seg_info{seg_path}; - auto res = Segment::open(fs, seg_info, seg_id, _schema, &footer_size_hint, - rowset_meta()->partial_rowset_footer(seg_id)); + auto res = _load_segment(seg_id, _schema, fs, rowset_meta()->partial_rowset_footer(seg_id)); if (!res.ok()) { - auto st = res.status().clone_and_prepend(fmt::format( - "Load rowset failed tablet:{} rowset:{} rssid:{} seg:{} path:{}", _rowset_meta->tablet_id(), - rowset_id().to_string(), _rowset_meta->get_rowset_seg_id(), seg_id, seg_path)); - LOG(WARNING) << st.message(); _segments.clear(); - return st; + return res.status(); } _segments.push_back(std::move(res).value()); } @@ -213,13 +222,9 @@ void Rowset::warmup_lrucache() { Status Rowset::reload() { ASSIGN_OR_RETURN(auto fs, FileSystem::CreateSharedFromString(_rowset_path)); _segments.clear(); - size_t footer_size_hint = 16 * 1024; for (int seg_id = 0; seg_id < num_segments(); ++seg_id) { - std::string seg_path = segment_file_path(_rowset_path, rowset_id(), seg_id); - FileInfo seg_info{.path = seg_path}; - auto res = Segment::open(fs, seg_info, seg_id, _schema, &footer_size_hint); + auto res = _load_segment(seg_id, _schema, fs, nullptr); if (!res.ok()) { - LOG(WARNING) << "Fail to open " << seg_path << ": " << res.status(); _segments.clear(); return res.status(); } @@ -235,12 +240,8 @@ Status Rowset::reload_segment(int32_t segment_id) { return Status::InternalError("Error segment id"); } ASSIGN_OR_RETURN(auto fs, FileSystem::CreateSharedFromString(_rowset_path)); - size_t footer_size_hint = 16 * 1024; - std::string seg_path = segment_file_path(_rowset_path, rowset_id(), segment_id); - FileInfo seg_info{.path = seg_path}; - auto res = Segment::open(fs, seg_info, segment_id, _schema, &footer_size_hint); + auto res = _load_segment(segment_id, _schema, fs, nullptr); if (!res.ok()) { - LOG(WARNING) << "Fail to open " << seg_path << ": " << res.status(); return res.status(); } _segments[segment_id] = std::move(res).value(); @@ -254,12 +255,9 @@ Status Rowset::reload_segment_with_schema(int32_t segment_id, TabletSchemaCSPtr& return Status::InternalError("Error segment id"); } ASSIGN_OR_RETURN(auto fs, FileSystem::CreateSharedFromString(_rowset_path)); - size_t footer_size_hint = 16 * 1024; - std::string seg_path = segment_file_path(_rowset_path, rowset_id(), segment_id); - FileInfo seg_info{.path = seg_path}; - auto res = Segment::open(fs, seg_info, segment_id, schema, &footer_size_hint); + auto res = _load_segment(segment_id, schema, fs, nullptr); if (!res.ok()) { - LOG(WARNING) << "Fail to open " << seg_path << ": " << res.status(); + _segments.clear(); return res.status(); } _segments[segment_id] = std::move(res).value(); @@ -872,7 +870,7 @@ StatusOr> Rowset::get_update_file_iterators(const for (int64_t i = 0; i < num_update_files(); i++) { // open update file std::string seg_path = segment_upt_file_path(_rowset_path, rowset_id(), i); - FileInfo seg_info{.path = seg_path}; + FileInfo seg_info{.path = seg_path, .encryption_meta = rowset_meta()->get_uptfile_encryption_meta(i)}; ASSIGN_OR_RETURN(auto seg_ptr, Segment::open(seg_options.fs, seg_info, i, _schema)); if (seg_ptr->num_rows() == 0) { seg_iterators[i] = new_empty_iterator(schema, config::vector_chunk_size); @@ -904,7 +902,7 @@ StatusOr Rowset::get_update_file_iterator(const Schema& schema // open update file DCHECK(update_file_id < num_update_files()); std::string seg_path = segment_upt_file_path(_rowset_path, rowset_id(), update_file_id); - FileInfo seg_info{.path = seg_path}; + FileInfo seg_info{.path = seg_path, .encryption_meta = rowset_meta()->get_uptfile_encryption_meta(update_file_id)}; ASSIGN_OR_RETURN(auto seg_ptr, Segment::open(seg_options.fs, seg_info, update_file_id, _schema)); if (seg_ptr->num_rows() == 0) { return new_empty_iterator(schema, config::vector_chunk_size); diff --git a/be/src/storage/rowset/rowset.h b/be/src/storage/rowset/rowset.h index d50ecf120c9d0..65b2c768ad1d4 100644 --- a/be/src/storage/rowset/rowset.h +++ b/be/src/storage/rowset/rowset.h @@ -417,6 +417,10 @@ class Rowset : public std::enable_shared_from_this, public BaseRowset { Status _copy_delta_column_group_files(KVStore* kvstore, const std::string& dir, int64_t version); + StatusOr> _load_segment(int32_t idx, const TabletSchemaCSPtr& schema, + std::shared_ptr& fs, + const FooterPointerPB* partial_rowset_footer); + std::vector _segments; std::atomic is_compacting{false}; diff --git a/be/src/storage/rowset/rowset_meta.cpp b/be/src/storage/rowset/rowset_meta.cpp index 165d7605b4cea..d521b47334243 100644 --- a/be/src/storage/rowset/rowset_meta.cpp +++ b/be/src/storage/rowset/rowset_meta.cpp @@ -46,4 +46,24 @@ RowsetMeta::~RowsetMeta() { MEM_TRACKER_SAFE_RELEASE(GlobalEnv::GetInstance()->rowset_metadata_mem_tracker(), _mem_usage); } +static string empty_encryption_meta; + +const string& RowsetMeta::get_segment_encryption_meta(int segment_id) const { + const auto size = _rowset_meta_pb->segment_encryption_metas_size(); + DCHECK(segment_id >= 0 && (segment_id < size || size == 0)); + return segment_id < size ? _rowset_meta_pb->segment_encryption_metas(segment_id) : empty_encryption_meta; +} + +const string& RowsetMeta::get_uptfile_encryption_meta(int upt_file_id) const { + const auto size = _rowset_meta_pb->updatefile_encryption_metas_size(); + DCHECK(upt_file_id >= 0 && (upt_file_id < size || size == 0)); + return upt_file_id < size ? _rowset_meta_pb->updatefile_encryption_metas(upt_file_id) : empty_encryption_meta; +} + +const string& RowsetMeta::get_delfile_encryption_meta(int del_file_id) const { + const auto size = _rowset_meta_pb->delfile_encryption_metas_size(); + DCHECK(del_file_id >= 0 && (del_file_id < size || size == 0)); + return del_file_id < size ? _rowset_meta_pb->delfile_encryption_metas(del_file_id) : empty_encryption_meta; +} + } // namespace starrocks diff --git a/be/src/storage/rowset/rowset_meta.h b/be/src/storage/rowset/rowset_meta.h index b5a16c610dd73..41243f15a576e 100644 --- a/be/src/storage/rowset/rowset_meta.h +++ b/be/src/storage/rowset/rowset_meta.h @@ -289,6 +289,10 @@ class RowsetMeta { return false; } + const string& get_segment_encryption_meta(int segment_id) const; + const string& get_uptfile_encryption_meta(int upt_file_id) const; + const string& get_delfile_encryption_meta(int del_file_id) const; + private: bool _deserialize_from_pb(std::string_view value) { return _rowset_meta_pb->ParseFromArray(value.data(), value.size()); diff --git a/be/src/storage/rowset/rowset_writer.cpp b/be/src/storage/rowset/rowset_writer.cpp index ba7a4b56a11c3..63905fe1e467f 100644 --- a/be/src/storage/rowset/rowset_writer.cpp +++ b/be/src/storage/rowset/rowset_writer.cpp @@ -46,6 +46,7 @@ #include "common/logging.h" #include "common/tracer.h" #include "fs/fs.h" +#include "fs/key_cache.h" #include "io/io_error.h" #include "runtime/exec_env.h" #include "segment_options.h" @@ -154,6 +155,13 @@ StatusOr RowsetWriter::build() { _rowset_meta_pb->set_empty(_num_rows_written == 0); _rowset_meta_pb->set_creation_time(time(nullptr)); _rowset_meta_pb->set_num_segments(_num_segment); + DCHECK(_segment_encryption_metas.size() == _num_segment); + RETURN_IF_UNLIKELY(_segment_encryption_metas.size() != _num_segment, + Status::InternalError(fmt::format("encryption_metas size {} != num segments {}", + _segment_encryption_metas.size(), _num_segment))); + for (auto& encryption_meta : _segment_encryption_metas) { + _rowset_meta_pb->add_segment_encryption_metas(encryption_meta); + } // newly created rowset do not have rowset_id yet, use 0 instead _rowset_meta_pb->set_rowset_seg_id(0); _rowset_meta_pb->set_gtid(_context.gtid); @@ -163,7 +171,15 @@ StatusOr RowsetWriter::build() { if (!_delfile_idxes.empty()) { _rowset_meta_pb->mutable_delfile_idxes()->Add(_delfile_idxes.begin(), _delfile_idxes.end()); } + DCHECK(_delfile_encryption_metas.size() == _num_delfile); + for (auto& encryption_meta : _delfile_encryption_metas) { + _rowset_meta_pb->add_delfile_encryption_metas(encryption_meta); + } _rowset_meta_pb->set_num_delete_files(_num_delfile); + DCHECK(_updatefile_encryption_metas.size() == _num_uptfile); + for (auto& encryption_meta : _updatefile_encryption_metas) { + _rowset_meta_pb->add_updatefile_encryption_metas(encryption_meta); + } _rowset_meta_pb->set_num_update_files(_num_uptfile); _rowset_meta_pb->set_total_update_row_size(_total_update_row_size); _rowset_meta_pb->set_num_rows_upt(_num_rows_upt); @@ -294,6 +310,11 @@ Status RowsetWriter::_flush_segment(const SegmentPB& segment_pb, butil::IOBuf& d _total_index_size += segment_pb.index_size(); _num_rows_written += segment_pb.num_rows(); _total_row_size += segment_pb.row_size(); + DCHECK(_segment_encryption_metas.size() == _num_segment); + RETURN_IF_UNLIKELY(_segment_encryption_metas.size() != _num_segment, + Status::InternalError(fmt::format("encryption_metas size {} != num segments {}", + _segment_encryption_metas.size(), _num_segment))); + _segment_encryption_metas.emplace_back(segment_pb.encryption_meta()); _num_segment++; } @@ -391,6 +412,8 @@ Status RowsetWriter::_flush_delete_file(const SegmentPB& segment_pb, butil::IOBu // _delfile_idxes keep the idx for every delete file, so we need to add the idx to _delfile_idxes if we create a // new delete file _delfile_idxes.emplace_back(_num_segment + _num_delfile); + DCHECK(_delfile_encryption_metas.size() == _num_delfile); + _delfile_encryption_metas.emplace_back(segment_pb.delete_encryption_meta()); _num_delfile++; _num_rows_del += segment_pb.delete_num_rows(); @@ -434,6 +457,8 @@ Status RowsetWriter::_flush_update_file(const SegmentPB& segment_pb, butil::IOBu // 3. update statistic { std::lock_guard l(_lock); + DCHECK(_updatefile_encryption_metas.size() == _num_uptfile); + _updatefile_encryption_metas.emplace_back(segment_pb.update_encryption_meta()); _num_uptfile++; _num_rows_upt += segment_pb.update_num_rows(); _total_update_row_size += segment_pb.update_row_size(); @@ -545,10 +570,21 @@ StatusOr> HorizontalRowsetWriter::_create_segment // temporary segment files. path = Rowset::segment_file_path(_context.rowset_path_prefix, _context.rowset_id, _num_segment); } - ASSIGN_OR_RETURN(auto wfile, _fs->new_writable_file(path)); + WritableFileOptions wopts; + if (config::enable_transparent_data_encryption) { + ASSIGN_OR_RETURN(auto pair, KeyCache::instance().create_encryption_meta_pair_using_current_kek()); + wopts.encryption_info = pair.info; + _writer_options.encryption_meta = std::move(pair.encryption_meta); + } + ASSIGN_OR_RETURN(auto wfile, _fs->new_writable_file(wopts, path)); const auto schema = _context.tablet_schema; auto segment_writer = std::make_unique(std::move(wfile), _num_segment, schema, _writer_options); RETURN_IF_ERROR(segment_writer->init()); + DCHECK(_segment_encryption_metas.size() == _num_segment); + RETURN_IF_UNLIKELY(_segment_encryption_metas.size() != _num_segment, + Status::InternalError(fmt::format("encryption_metas size {} != num segments {}", + _segment_encryption_metas.size(), _num_segment))); + _segment_encryption_metas.emplace_back(_writer_options.encryption_meta); ++_num_segment; return std::move(segment_writer); } @@ -641,6 +677,13 @@ Status HorizontalRowsetWriter::_flush_chunk(const Chunk& chunk, SegmentPB* seg_i Status HorizontalRowsetWriter::flush_chunk_with_deletes(const Chunk& upserts, const Column& deletes, SegmentPB* seg_info) { auto flush_del_file = [&](const Column& deletes, SegmentPB* seg_info) { + WritableFileOptions wopts; + string encryption_meta; + if (config::enable_transparent_data_encryption) { + ASSIGN_OR_RETURN(auto pair, KeyCache::instance().create_encryption_meta_pair_using_current_kek()); + wopts.encryption_info = pair.info; + encryption_meta = std::move(pair.encryption_meta); + } ASSIGN_OR_RETURN(auto wfile, _fs->new_writable_file(Rowset::segment_del_file_path( _context.rowset_path_prefix, _context.rowset_id, _num_delfile))); size_t sz = serde::ColumnArraySerde::max_serialized_size(deletes); @@ -658,10 +701,13 @@ Status HorizontalRowsetWriter::flush_chunk_with_deletes(const Chunk& upserts, co seg_info->set_delete_id(_num_delfile); seg_info->set_delete_data_size(content.size()); seg_info->set_delete_path(wfile->filename()); + seg_info->set_delete_encryption_meta(encryption_meta); } // _delfile_idxes keep the idx for every delete file, so we need to add the idx to _delfile_idxes if we create a // new delete file _delfile_idxes.emplace_back(_num_segment + _num_delfile); + DCHECK(_delfile_encryption_metas.size() == _num_delfile); + _delfile_encryption_metas.emplace_back(encryption_meta); _num_delfile++; _num_rows_del += deletes.size(); return Status::OK(); @@ -715,6 +761,25 @@ Status HorizontalRowsetWriter::add_rowset(RowsetSharedPtr rowset) { _total_row_size += static_cast(rowset->total_row_size()); _total_data_size += static_cast(rowset->rowset_meta()->data_disk_size()); _total_index_size += static_cast(rowset->rowset_meta()->index_disk_size()); + DCHECK(_segment_encryption_metas.size() == _num_segment); + RETURN_IF_UNLIKELY(_segment_encryption_metas.size() != _num_segment, + Status::InternalError(fmt::format("encryption_metas size {} != num segments {}", + _segment_encryption_metas.size(), _num_segment))); + auto& meta_pb = rowset->rowset_meta()->get_meta_pb_without_schema(); + if (meta_pb.segment_encryption_metas_size() == 0) { + for (int i = 0; i < rowset->num_segments(); ++i) { + _segment_encryption_metas.emplace_back(string()); + } + } else { + DCHECK_EQ(meta_pb.segment_encryption_metas_size(), rowset->num_segments()); + RETURN_IF_UNLIKELY( + meta_pb.segment_encryption_metas_size() != rowset->num_segments(), + Status::InternalError(fmt::format("encryption_metas size {} != num segments {}", + meta_pb.segment_encryption_metas_size(), rowset->num_segments()))); + for (int i = 0; i < rowset->num_segments(); ++i) { + _segment_encryption_metas.emplace_back(meta_pb.segment_encryption_metas(i)); + } + } _num_segment += static_cast(rowset->num_segments()); // TODO update zonemap if (rowset->rowset_meta()->has_delete_predicate()) { @@ -773,7 +838,7 @@ Status HorizontalRowsetWriter::_final_merge() { } std::string tmp_segment_file = Rowset::segment_temp_file_path(_context.rowset_path_prefix, _context.rowset_id, seg_id); - FileInfo tmp_segment_info{.path = tmp_segment_file}; + FileInfo tmp_segment_info{.path = tmp_segment_file, .encryption_meta = _segment_encryption_metas[seg_id]}; auto segment_ptr = Segment::open(_fs, tmp_segment_info, seg_id, _context.tablet_schema); if (!segment_ptr.ok()) { LOG(WARNING) << "Fail to open " << tmp_segment_file << ": " << segment_ptr.status(); @@ -848,6 +913,8 @@ Status HorizontalRowsetWriter::_final_merge() { auto chunk_shared_ptr = ChunkHelper::new_chunk(schema, config::vector_chunk_size); auto chunk = chunk_shared_ptr.get(); + _segment_encryption_metas.clear(); + _delfile_encryption_metas.clear(); _num_segment = 0; _num_delfile = 0; _num_rows_written = 0; @@ -1017,6 +1084,8 @@ Status HorizontalRowsetWriter::_final_merge() { auto chunk_shared_ptr = ChunkHelper::new_chunk(schema, config::vector_chunk_size); auto chunk = chunk_shared_ptr.get(); + _segment_encryption_metas.clear(); + _delfile_encryption_metas.clear(); _num_segment = 0; _num_delfile = 0; _num_rows_written = 0; @@ -1125,6 +1194,7 @@ Status HorizontalRowsetWriter::_flush_segment_writer(std::unique_ptrset_index_size(index_size); seg_info->set_segment_id((*segment_writer)->segment_id()); seg_info->set_path((*segment_writer)->segment_path()); + seg_info->set_encryption_meta((*segment_writer)->encryption_meta()); if (_context.tablet_schema && !_context.tablet_schema->indexes()->empty()) { auto mutable_indexes = seg_info->mutable_seg_indexes(); for (const auto& index : *(_context.tablet_schema->indexes())) { @@ -1311,11 +1381,23 @@ Status VerticalRowsetWriter::final_flush() { StatusOr> VerticalRowsetWriter::_create_segment_writer( const std::vector& column_indexes, bool is_key) { std::lock_guard l(_lock); - ASSIGN_OR_RETURN(auto wfile, _fs->new_writable_file(Rowset::segment_file_path(_context.rowset_path_prefix, - _context.rowset_id, _num_segment))); + WritableFileOptions wopts; + if (config::enable_transparent_data_encryption) { + ASSIGN_OR_RETURN(auto pair, KeyCache::instance().create_encryption_meta_pair_using_current_kek()); + wopts.encryption_info = pair.info; + _writer_options.encryption_meta = std::move(pair.encryption_meta); + } + ASSIGN_OR_RETURN(auto wfile, + _fs->new_writable_file(wopts, Rowset::segment_file_path(_context.rowset_path_prefix, + _context.rowset_id, _num_segment))); const auto schema = _context.tablet_schema; auto segment_writer = std::make_unique(std::move(wfile), _num_segment, schema, _writer_options); RETURN_IF_ERROR(segment_writer->init(column_indexes, is_key)); + DCHECK(_segment_encryption_metas.size() == _num_segment); + RETURN_IF_UNLIKELY(_segment_encryption_metas.size() != _num_segment, + Status::InternalError(fmt::format("encryption_metas size {} != num segments {}", + _segment_encryption_metas.size(), _num_segment))); + _segment_encryption_metas.emplace_back(_writer_options.encryption_meta); ++_num_segment; return std::move(segment_writer); } diff --git a/be/src/storage/rowset/rowset_writer.h b/be/src/storage/rowset/rowset_writer.h index 232d86afafbfa..f23c0c4d154b7 100644 --- a/be/src/storage/rowset/rowset_writer.h +++ b/be/src/storage/rowset/rowset_writer.h @@ -192,6 +192,9 @@ class RowsetWriter { int _num_indexfile = 0; vector _delfile_idxes; vector _tmp_segment_files; + std::vector _segment_encryption_metas; + std::vector _delfile_encryption_metas; + std::vector _updatefile_encryption_metas; // mutex lock for vectorized add chunk and flush std::mutex _lock; diff --git a/be/src/storage/rowset_update_state.cpp b/be/src/storage/rowset_update_state.cpp index 606388eb7eb53..347275713824f 100644 --- a/be/src/storage/rowset_update_state.cpp +++ b/be/src/storage/rowset_update_state.cpp @@ -17,6 +17,7 @@ #include "column/binary_column.h" #include "common/tracer.h" #include "fs/fs_util.h" +#include "fs/key_cache.h" #include "gutil/strings/substitute.h" #include "serde/column_array_serde.h" #include "storage/chunk_helper.h" @@ -73,7 +74,12 @@ Status RowsetUpdateState::_load_deletes(Rowset* rowset, uint32_t idx, Column* pk ASSIGN_OR_RETURN(auto fs, FileSystem::CreateSharedFromString(rowset->rowset_path())); auto path = Rowset::segment_del_file_path(rowset->rowset_path(), rowset->rowset_id(), idx); - ASSIGN_OR_RETURN(auto read_file, fs->new_random_access_file(path)); + RandomAccessFileOptions opts; + auto& encryption_meta = rowset->rowset_meta()->get_delfile_encryption_meta(idx); + if (!encryption_meta.empty()) { + ASSIGN_OR_RETURN(opts.encryption_info, KeyCache::instance().unwrap_encryption_meta(encryption_meta)); + } + ASSIGN_OR_RETURN(auto read_file, fs->new_random_access_file(opts, path)); ASSIGN_OR_RETURN(auto file_size, read_file->get_size()); std::vector read_buffer; TRY_CATCH_BAD_ALLOC(read_buffer.resize(file_size)); diff --git a/be/test/storage/task/engine_storage_migration_task_test.cpp b/be/test/storage/task/engine_storage_migration_task_test.cpp index 78ff38624ac45..cdd82e3186d24 100644 --- a/be/test/storage/task/engine_storage_migration_task_test.cpp +++ b/be/test/storage/task/engine_storage_migration_task_test.cpp @@ -20,6 +20,7 @@ #include "common/config.h" #include "exec/pipeline/query_context.h" #include "fs/fs_util.h" +#include "fs/key_cache.h" #include "runtime/current_thread.h" #include "runtime/descriptor_helper.h" #include "runtime/exec_env.h" @@ -128,6 +129,21 @@ class EngineStorageMigrationTaskTest : public testing::Test { static void init() { config::enable_event_based_compaction_framework = false; + config::enable_transparent_data_encryption = true; + // add encryption keys + EncryptionKeyPB pb; + pb.set_id(EncryptionKey::DEFAULT_MASTER_KYE_ID); + pb.set_type(EncryptionKeyTypePB::NORMAL_KEY); + pb.set_algorithm(EncryptionAlgorithmPB::AES_128); + pb.set_plain_key("0000000000000000"); + std::unique_ptr root_encryption_key = EncryptionKey::create_from_pb(pb).value(); + auto val_st = root_encryption_key->generate_key(); + EXPECT_TRUE(val_st.ok()); + std::unique_ptr encryption_key = std::move(val_st.value()); + encryption_key->set_id(2); + KeyCache::instance().add_key(root_encryption_key); + KeyCache::instance().add_key(encryption_key); + /* create duplicated key tablet */ diff --git a/gensrc/proto/data.proto b/gensrc/proto/data.proto index 414031fcf4aff..1254b7ac54472 100644 --- a/gensrc/proto/data.proto +++ b/gensrc/proto/data.proto @@ -127,7 +127,10 @@ message SegmentPB { optional int64 update_num_rows = 16; optional string update_path = 17; optional int64 update_row_size = 18; + optional bytes encryption_meta = 19; + optional bytes delete_encryption_meta = 20; + optional bytes update_encryption_meta = 21; // for index optional int64 seg_index_data_size = 30; diff --git a/gensrc/proto/olap_file.proto b/gensrc/proto/olap_file.proto index f76d68e8edd0a..93863b5630a9b 100644 --- a/gensrc/proto/olap_file.proto +++ b/gensrc/proto/olap_file.proto @@ -179,6 +179,9 @@ message RowsetMetaPB { optional int64 gtid = 62; // total number of upt file's rows. optional int64 num_rows_upt = 63; + repeated bytes segment_encryption_metas = 64; + repeated bytes delfile_encryption_metas = 65; + repeated bytes updatefile_encryption_metas = 66; } enum DataFileType {