Skip to content

Commit

Permalink
[#24466] DocDB: Add UserFrontiers support to vector lsm
Browse files Browse the repository at this point in the history
Summary:
It could happen that Vector LSM was not flushed before TServer restart/crash.
In this case during bootstrap we should add understand what operations should be reapplied to it.
To address this issue we have UserFrontiers in rocksdb.
Could use the same approach for Vector LSM.

Also added ability to flush Vector LSM when necessary.

One important TODO added:
It could happen that older chunk not yet saved. So we should delay adding this file to metadata, until older chunk is saved.
Jira: DB-13376

Test Plan: *VectorLSMTest.Bootstrap*

Reviewers: mbautin, aleksandr.ponomarenko

Reviewed By: mbautin

Subscribers: ybase

Tags: #jenkins-ready

Differential Revision: https://phorge.dev.yugabyte.com/D39065
  • Loading branch information
spolitov committed Oct 16, 2024
1 parent 509ddb3 commit 28df09d
Show file tree
Hide file tree
Showing 14 changed files with 412 additions and 122 deletions.
18 changes: 15 additions & 3 deletions src/yb/docdb/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ set(YB_PCH_PREFIX docdb)

YRPC_GENERATE(
DOCDB_PROTO_SRCS DOCDB_PROTO_HDRS DOCDB_PROTO_TGTS
MESSAGES TRUE
SOURCE_ROOT ${CMAKE_CURRENT_SOURCE_DIR}/../..
BINARY_ROOT ${CMAKE_CURRENT_BINARY_DIR}/../..
NO_SERVICE_PROTO_FILES docdb.proto vector_lsm.proto)
NO_SERVICE_MESSAGES_PROTO_FILES docdb.proto
NO_SERVICE_PROTO_FILES vector_lsm.proto)

ADD_YB_LIBRARY(docdb_proto
SRCS ${DOCDB_PROTO_SRCS}
Expand Down Expand Up @@ -130,7 +130,19 @@ target_link_libraries(yb_docdb_test_common
yb_test_util
yb_dockv_test_util)

set(YB_TEST_LINK_LIBS yb_common_test_util yb_docdb_test_common ${YB_MIN_TEST_LIBS})
YRPC_GENERATE(
DOCDB_TEST_PROTO_SRCS DOCDB_TEST_PROTO_HDRS DOCDB_TEST_PROTO_TGTS
MESSAGES TRUE
SOURCE_ROOT ${CMAKE_CURRENT_SOURCE_DIR}/../..
BINARY_ROOT ${CMAKE_CURRENT_BINARY_DIR}/../..
NO_SERVICE_PROTO_FILES vector_lsm-test.proto)

ADD_YB_LIBRARY(docdb_test_proto
SRCS ${DOCDB_TEST_PROTO_SRCS}
DEPS protobuf yb_common_proto
NONLINK_DEPS ${DOCDB_TEST_PROTO_TGTS})

set(YB_TEST_LINK_LIBS yb_common_test_util yb_docdb_test_common docdb_test_proto ${YB_MIN_TEST_LIBS})

ADD_YB_TEST(doc_operation-test)
ADD_YB_TEST(docdb_filter_policy-test)
Expand Down
199 changes: 170 additions & 29 deletions src/yb/docdb/vector_lsm-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,16 @@

#include "yb/docdb/docdb_test_base.h"

#include <google/protobuf/any.pb.h>

#include "yb/docdb/docdb_rocksdb_util.h"
#include "yb/docdb/vector_lsm.h"
#include "yb/docdb/vector_lsm-test.pb.h"

#include "yb/dockv/doc_key.h"

#include "yb/rocksdb/metadata.h"

#include "yb/rpc/thread_pool.h"

#include "yb/util/path_util.h"
Expand All @@ -31,8 +36,10 @@ using namespace std::literals;
namespace yb::docdb {

using FloatVectorLSM = VectorLSM<std::vector<float>, float>;
using UsearchIndexFactory = MakeChunkFactory<vectorindex::UsearchIndexFactory, FloatVectorLSM>;
using HnswlibIndexFactory = MakeChunkFactory<vectorindex::HnswlibIndexFactory, FloatVectorLSM>;
using UsearchIndexFactory = MakeVectorIndexFactory<
vectorindex::UsearchIndexFactory, FloatVectorLSM>;
using HnswlibIndexFactory = MakeVectorIndexFactory<
vectorindex::HnswlibIndexFactory, FloatVectorLSM>;

class VectorLSMKeyValueStorageRocksDbWrapper : public VectorLSMKeyValueStorage {
public:
Expand Down Expand Up @@ -96,6 +103,82 @@ class VectorLSMKeyValueStorageRocksDbWrapper : public VectorLSMKeyValueStorage {
const ColumnId column_id_;
};

class TestFrontier : public rocksdb::UserFrontier {
public:
std::unique_ptr<UserFrontier> Clone() const override {
return std::make_unique<TestFrontier>(*this);
}

std::string ToString() const override {
return YB_CLASS_TO_STRING(vertex_id);
}

void ToPB(google::protobuf::Any* any) const override {
VectorLSMTestFrontierPB pb;
pb.set_vertex_id(vertex_id_);
any->PackFrom(pb);
}

bool Equals(const UserFrontier& pre_rhs) const override {
const auto& lhs = *this;
const auto& rhs = down_cast<const TestFrontier&>(pre_rhs);
return YB_CLASS_EQUALS(vertex_id);
}

void Update(const UserFrontier& pre_rhs, rocksdb::UpdateUserValueType update_type) override {
const auto& rhs = down_cast<const TestFrontier&>(pre_rhs);
switch (update_type) {
case rocksdb::UpdateUserValueType::kLargest:
vertex_id_ = std::max(vertex_id_, rhs.vertex_id_);
return;
case rocksdb::UpdateUserValueType::kSmallest:
vertex_id_ = std::min(vertex_id_, rhs.vertex_id_);
return;
}
FATAL_INVALID_ENUM_VALUE(rocksdb::UpdateUserValueType, update_type);
}

bool IsUpdateValid(const UserFrontier& rhs, rocksdb::UpdateUserValueType type) const override {
return true;
}

Slice FilterAsSlice() override {
return Slice();
}

void ResetFilter() override {
}

void FromOpIdPBDeprecated(const OpIdPB& op_id) override {
}

Status FromPB(const google::protobuf::Any& any) override {
VectorLSMTestFrontierPB pb;
if (!any.UnpackTo(&pb)) {
return STATUS_FORMAT(Corruption, "Unpack test frontier failed");
}
vertex_id_ = pb.vertex_id();
return Status::OK();
}

uint64_t GetHybridTimeAsUInt64() const override {
return 0;
}

vectorindex::VertexId vertex_id() const {
return vertex_id_;
}

void SetVertexId(vectorindex::VertexId vertex_id) {
vertex_id_ = vertex_id;
}

private:
vectorindex::VertexId vertex_id_;
};

using TestFrontiers = rocksdb::UserFrontiersBase<TestFrontier>;

class VectorLSMTest : public DocDBTestBase,
public testing::WithParamInterface<vectorindex::ANNMethodKind> {
protected:
Expand All @@ -121,10 +204,17 @@ class VectorLSMTest : public DocDBTestBase,

Status InsertCube(
FloatVectorLSM& lsm, size_t dimensions,
size_t block_size = std::numeric_limits<size_t>::max());
size_t block_size = std::numeric_limits<size_t>::max(),
vectorindex::VertexId min_vertex_id = 0);

void VerifyVectorLSM(FloatVectorLSM& lsm, size_t dimensions);

void CheckQueryVector(
FloatVectorLSM& lsm, size_t dimensions, const FloatVectorLSM::Vector& query_vector,
size_t max_num_results);

void TestBootstrap(bool flush);

rpc::ThreadPool thread_pool_;
std::optional<VectorLSMKeyValueStorageRocksDbWrapper> key_value_storage_;
};
Expand All @@ -133,7 +223,7 @@ std::string VertexKey(vectorindex::VertexId vertex_id) {
return Format("vertex_$0", vertex_id);
}

auto ChunkFactory(vectorindex::ANNMethodKind ann_method) {
auto VectorIndexFactory(vectorindex::ANNMethodKind ann_method) {
switch (ann_method) {
case vectorindex::ANNMethodKind::kUsearch:
return UsearchIndexFactory::Create;
Expand All @@ -143,34 +233,52 @@ auto ChunkFactory(vectorindex::ANNMethodKind ann_method) {
return decltype(&UsearchIndexFactory::Create)(nullptr);
}

Status VectorLSMTest::InsertCube(FloatVectorLSM& lsm, size_t dimensions, size_t block_size) {
HybridTime write_time(1000, 0);
FloatVectorLSM::InsertEntries entries;
FloatVectorLSM::InsertEntries CubeInsertEntries(size_t dimensions) {
FloatVectorLSM::InsertEntries result;
for (vectorindex::VertexId i = 1; i <= (1ULL << dimensions); ++i) {
if (entries.size() >= block_size) {
RETURN_NOT_OK(lsm.Insert(entries, write_time));
entries.clear();
}

auto bits = i - 1;
FloatVector vector(dimensions);
for (size_t d = 0; d != dimensions; ++d) {
vector[d] = 1.f * ((bits >> d) & 1);
}
entries.emplace_back(FloatVectorLSM::InsertEntry {
result.emplace_back(FloatVectorLSM::InsertEntry {
.vertex_id = i,
.base_table_key = KeyBuffer(Slice(VertexKey(i))),
.vector = std::move(vector),
});
}
return lsm.Insert(entries, write_time);
return result;
}

Status VectorLSMTest::InsertCube(
FloatVectorLSM& lsm, size_t dimensions, size_t block_size,
vectorindex::VertexId min_vertex_id) {
HybridTime write_time(1000, 0);
auto entries = CubeInsertEntries(dimensions);
for (size_t i = 0; i < entries.size(); i += block_size) {
auto begin = entries.begin() + i;
auto end = entries.begin() + std::min(i + block_size, entries.size());
if (begin->vertex_id < min_vertex_id) {
ptrdiff_t delta = min_vertex_id - begin->vertex_id;
if (delta >= end - begin) {
continue;
}
begin += delta;
}
FloatVectorLSM::InsertEntries block_entries(begin, end);
TestFrontiers frontiers;
frontiers.Smallest().SetVertexId(block_entries.front().vertex_id);
frontiers.Largest().SetVertexId(block_entries.front().vertex_id);
RETURN_NOT_OK(lsm.Insert(block_entries, write_time, frontiers));
}
return Status::OK();
}

Status VectorLSMTest::OpenVectorLSM(
FloatVectorLSM& lsm, size_t dimensions, size_t points_per_chunk) {
FloatVectorLSM::Options options = {
.storage_dir = JoinPathSegments(rocksdb_dir_, "vector_lsm"),
.chunk_factory = [factory = ChunkFactory(GetParam()), dimensions]() {
.vector_index_factory = [factory = VectorIndexFactory(GetParam()), dimensions]() {
vectorindex::HNSWOptions hnsw_options = {
.dimensions = dimensions,
};
Expand All @@ -179,6 +287,7 @@ Status VectorLSMTest::OpenVectorLSM(
.points_per_chunk = points_per_chunk,
.key_value_storage = &*key_value_storage_,
.thread_pool = &thread_pool_,
.frontiers_factory = [] { return std::make_unique<TestFrontiers>(); },
};
return lsm.Open(std::move(options));
}
Expand All @@ -190,29 +299,46 @@ Status VectorLSMTest::InitVectorLSM(
}

void VectorLSMTest::VerifyVectorLSM(FloatVectorLSM& lsm, size_t dimensions) {
CheckQueryVector(lsm, dimensions, FloatVectorLSM::Vector(dimensions, 0.f), dimensions + 1);
CheckQueryVector(lsm, dimensions, FloatVectorLSM::Vector(dimensions, 1.f), dimensions + 1);
}

void VectorLSMTest::CheckQueryVector(
FloatVectorLSM& lsm, size_t dimensions, const FloatVectorLSM::Vector& query_vector,
size_t max_num_results) {
bool stop = false;
FloatVectorLSM::Vector query_vector(dimensions, 0.f);

FloatVectorLSM::SearchResults expected_results;
for (const auto& entry : CubeInsertEntries(dimensions)) {
expected_results.push_back({
.distance = lsm.TEST_Distance(query_vector, entry.vector),
.base_table_key = entry.base_table_key,
});
}
auto less_condition = [](const auto& lhs, const auto& rhs) {
return lhs.distance == rhs.distance ? lhs.base_table_key < rhs.base_table_key
: lhs.distance < rhs.distance;
};
std::sort(expected_results.begin(), expected_results.end(), less_condition);

expected_results.resize(std::min(expected_results.size(), max_num_results));

while (!stop) {
stop = !lsm.TEST_HasBackgroundInserts();

FloatVectorLSM::SearchOptions options = {
.max_num_results = dimensions + 1,
.max_num_results = max_num_results,
};
auto search_result = ASSERT_RESULT(lsm.Search(query_vector, options));
LOG(INFO) << "Search result: " << AsString(search_result);

ASSERT_EQ(search_result.size(), options.max_num_results);

ASSERT_EQ(search_result[0].distance, 0);
ASSERT_EQ(search_result[0].base_table_key.AsSlice().ToBuffer(), VertexKey(1));
ASSERT_EQ(search_result.size(), expected_results.size());

LOG(INFO) << "Search result: " << AsString(search_result);
std::sort(search_result.begin(), search_result.end(), less_condition);

std::sort(search_result.begin(), search_result.end(), [](const auto& lhs, const auto& rhs) {
return lhs.base_table_key < rhs.base_table_key;
});
for (size_t d = 0; d != dimensions; ++d) {
ASSERT_EQ(search_result[d + 1].distance, 1);
ASSERT_EQ(search_result[d + 1].base_table_key.AsSlice().ToBuffer(), VertexKey(1 + (1 << d)));
for (size_t i = 0; i != expected_results.size(); ++i) {
ASSERT_EQ(search_result[i].distance, expected_results[i].distance);
ASSERT_EQ(search_result[i].base_table_key, expected_results[i].base_table_key);
}
}
}
Expand All @@ -236,22 +362,37 @@ TEST_P(VectorLSMTest, MultipleChunks) {
VerifyVectorLSM(lsm, kDimensions);
}

TEST_P(VectorLSMTest, Bootstrap) {
void VectorLSMTest::TestBootstrap(bool flush) {
constexpr size_t kDimensions = 4;
constexpr size_t kChunkSize = 4;

{
FloatVectorLSM lsm;
ASSERT_OK(InitVectorLSM(lsm, kDimensions, kChunkSize));
if (flush) {
ASSERT_OK(lsm.Flush());
}
}

{
FloatVectorLSM lsm;
ASSERT_OK(OpenVectorLSM(lsm, kDimensions, kChunkSize));
auto frontier_ptr = lsm.GetFlushedFrontier();
auto& frontier = *down_cast<TestFrontier*>(frontier_ptr.get());
ASSERT_OK(InsertCube(lsm, kDimensions, kChunkSize, frontier.vertex_id()));

VerifyVectorLSM(lsm, kDimensions);
}
}

TEST_P(VectorLSMTest, Bootstrap) {
TestBootstrap(false);
}

TEST_P(VectorLSMTest, BootstrapWithFlush) {
TestBootstrap(true);
}

TEST_F(VectorLSMTest, MergeChunkResults) {
using ChunkResults = std::vector<vectorindex::VertexWithDistance<float>>;
ChunkResults a_src = {{5, 1}, {3, 3}, {1, 5}, {7, 7}};
Expand Down
22 changes: 22 additions & 0 deletions src/yb/docdb/vector_lsm-test.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright (c) YugaByte, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
// in compliance with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations
// under the License.
//
syntax = "proto3";

package yb.docdb;

option java_package = "org.yb.docdb";

message VectorLSMTestFrontierPB {
uint64 vertex_id = 1;
}

Loading

0 comments on commit 28df09d

Please sign in to comment.