Skip to content

Commit

Permalink
refactor(project): Additional refactoring and fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
breadrock1 committed Oct 27, 2024
1 parent c471df6 commit f8bf5b6
Show file tree
Hide file tree
Showing 29 changed files with 523 additions and 410 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ version = "^0.4"
features = ["rustc-serialize", "serde"]

[dependencies.redis]
optional = true
version = "0.27.3"
features = ["aio", "tokio-comp", "connection-manager", "serde_json", "json"]
optional = true

[dependencies.reqwest]
version = "0.12.8"
Expand Down
6 changes: 3 additions & 3 deletions config/development.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@ allowed = "*"
max_age = 3600

[elastic]
address = "158.160.44.99:9200"
address = "localhost:9200"
enabled_tls = "true"
username = "elastic"
password = "elastic"

[cacher]
address = "localhost:6379"
username = "cacher"
password = "cacher"
username = "redis"
password = "redis"
expired = 3600

[embeddings]
Expand Down
2 changes: 1 addition & 1 deletion src/bin/doc-searcher-init.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
extern crate doc_search;

use doc_search::elastic::ElasticClient;
use doc_search::storage::folders::FolderService;
use doc_search::storage::forms::CreateFolderForm;
use doc_search::storage::models::FolderType;
use doc_search::storage::models::{DEFAULT_FOLDER_ID, INFO_FOLDER_ID};
use doc_search::storage::FolderService;
use doc_search::{config, Connectable};

#[tokio::main]
Expand Down
7 changes: 6 additions & 1 deletion src/bin/doc-searcher-run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ use actix_web::{web, App, HttpServer};
use doc_search::metrics::endpoints::build_scope as build_metrics_scope;
use doc_search::searcher::endpoints::build_scope as build_searcher_scope;
use doc_search::searcher::{PaginatorService, SearcherService};
use doc_search::storage::documents::DocumentService;
use doc_search::storage::endpoints::build_scope as build_storage_scope;
use doc_search::storage::{DocumentService, FolderService};
use doc_search::storage::folders::FolderService;
use doc_search::{config, cors, elastic, logger, swagger, Connectable};

#[cfg(feature = "enable-cacher")]
Expand Down Expand Up @@ -56,10 +57,14 @@ async fn main() -> Result<(), anyhow::Error> {
let cacher_search_cxt: cacher::redis::SemanticParamsCached =
Box::new(cacher_service.clone());
#[cfg(feature = "enable-cacher")]
let cacher_fulltext_cxt: cacher::redis::FullTextParamsCached =
Box::new(cacher_service.clone());
#[cfg(feature = "enable-cacher")]
let cacher_paginate_cxt: cacher::redis::PaginatedCached = Box::new(cacher_service.clone());
#[cfg(feature = "enable-cacher")]
let app = app
.app_data(web::Data::new(cacher_search_cxt))
.app_data(web::Data::new(cacher_fulltext_cxt))
.app_data(web::Data::new(cacher_paginate_cxt));

app.wrap(logger)
Expand Down
16 changes: 15 additions & 1 deletion src/cacher/redis/models.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::searcher::forms::{ScrollNextForm, SemanticParams};
use crate::searcher::forms::{FulltextParams, ScrollNextForm, SemanticParams};
use crate::searcher::models::Paginated;
use crate::storage::models::Document;

Expand All @@ -19,6 +19,20 @@ impl redis::ToRedisArgs for ScrollNextForm {
}
}

impl redis::ToRedisArgs for FulltextParams {
fn write_redis_args<W>(&self, out: &mut W)
where
W: ?Sized + RedisWrite,
{
match serde_json::to_string(self) {
Ok(json_str) => out.write_arg_fmt(json_str),
Err(err) => {
tracing::error!("cacher: failed to serialize paginate form: {err:#?}");
}
}
}
}

impl redis::ToRedisArgs for SemanticParams {
fn write_redis_args<W>(&self, out: &mut W)
where
Expand Down
3 changes: 2 additions & 1 deletion src/elastic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,16 @@ impl ElasticClient {
result: (i64, i64),
) -> SearcherResult<Response> {
let (size, offset) = result;
let scroll = scroll.unwrap_or("1m");
let elastic = es.read().await;
let response = elastic
.search(SearchParts::Index(indexes))
.allow_no_indices(true)
.pretty(true)
.scroll(scroll)
.from(offset)
.size(size)
.body(query)
.scroll(scroll.unwrap_or("1m"))
.send()
.await?;

Expand Down
4 changes: 2 additions & 2 deletions src/searcher/elastic/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::searcher::elastic::extractor::SearchQueryBuilder;
use crate::searcher::elastic::search::Searcher;
use crate::searcher::errors::{PaginatedResult, SearcherError, SearcherResult};
use crate::searcher::forms::DocumentType;
use crate::searcher::forms::{DeletePaginatesForm, ScrollNextForm};
use crate::searcher::forms::{DeleteScrollsForm, ScrollNextForm};
use crate::searcher::forms::{FulltextParams, SemanticParams};
use crate::searcher::{PaginatorService, SearcherService};
use crate::storage::models::{Document, DocumentVectors};
Expand Down Expand Up @@ -45,7 +45,7 @@ impl SearcherService for ElasticClient {

#[async_trait::async_trait]
impl PaginatorService for ElasticClient {
async fn delete_session(&self, form: &DeletePaginatesForm) -> SearcherResult<Successful> {
async fn delete_session(&self, form: &DeleteScrollsForm) -> SearcherResult<Successful> {
let ids = form
.sessions()
.iter()
Expand Down
12 changes: 6 additions & 6 deletions src/searcher/endpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::cacher::CacherService;

use crate::embeddings::EmbeddingsService;
use crate::errors::{ErrorResponse, JsonResponse, PaginateResponse, Successful};
use crate::searcher::forms::{DeletePaginatesForm, DocumentTypeQuery, ScrollNextForm};
use crate::searcher::forms::{DeleteScrollsForm, DocumentTypeQuery, ScrollNextForm};
use crate::searcher::forms::{FulltextParams, SemanticParams};
use crate::searcher::models::Paginated;
use crate::searcher::{PaginatorService, SearcherService};
Expand All @@ -28,7 +28,7 @@ type CacherPaginateContext = Data<Box<dyn CacherService<ScrollNextForm, Paginate
pub fn build_scope() -> Scope {
let scope = web::scope("/search")
.service(search_fulltext)
.service(delete_paginate_sessions)
.service(delete_scrolls)
.service(paginate_next);

#[cfg(feature = "enable-semantic")]
Expand Down Expand Up @@ -168,8 +168,8 @@ async fn search_semantic(
path = "/search/paginate/sessions",
tag = "Search",
request_body(
content = DeletePaginationsForm,
example = json!(DeletePaginatesForm::test_example(None)),
content = DeleteScrollsForm,
example = json!(DeleteScrollsForm::test_example(None)),
),
responses(
(
Expand All @@ -193,9 +193,9 @@ async fn search_semantic(
)
)]
#[delete("/paginate/sessions")]
async fn delete_paginate_sessions(
async fn delete_scrolls(
cxt: PaginateContext,
form: Json<DeletePaginatesForm>,
form: Json<DeleteScrollsForm>,
) -> JsonResponse<Successful> {
let client = cxt.get_ref();
let pagination_form = form.0;
Expand Down
8 changes: 4 additions & 4 deletions src/searcher/forms.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,13 +201,13 @@ impl ScrollNextForm {

#[derive(Builder, Deserialize, Serialize, Getters, IntoParams, ToSchema)]
#[getset(get = "pub")]
pub struct DeletePaginatesForm {
pub struct DeleteScrollsForm {
#[schema(example = "FGluY2x1ZGVfY29udGV4dF91dWlkDXF1ZXJ5QW5kRmV0Y2gBFmOSWhk")]
sessions: Vec<String>,
}

impl DeletePaginatesForm {
pub fn builder() -> DeletePaginatesFormBuilder {
DeletePaginatesFormBuilder::default()
impl DeleteScrollsForm {
pub fn builder() -> DeleteScrollsFormBuilder {
DeleteScrollsFormBuilder::default()
}
}
4 changes: 2 additions & 2 deletions src/searcher/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ pub mod models;
use crate::errors::Successful;
use crate::searcher::errors::{PaginatedResult, SearcherResult};
use crate::searcher::forms::DocumentType;
use crate::searcher::forms::{DeletePaginatesForm, ScrollNextForm};
use crate::searcher::forms::{DeleteScrollsForm, ScrollNextForm};
use crate::searcher::forms::{FulltextParams, SemanticParams};

use serde_json::Value;
Expand All @@ -25,7 +25,7 @@ pub trait SearcherService {

#[async_trait::async_trait]
pub trait PaginatorService {
async fn delete_session(&self, form: &DeletePaginatesForm) -> SearcherResult<Successful>;
async fn delete_session(&self, form: &DeleteScrollsForm) -> SearcherResult<Successful>;

async fn paginate(
&self,
Expand Down
39 changes: 39 additions & 0 deletions src/storage/documents.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
use crate::errors::Successful;
use crate::storage::errors::StorageResult;
use crate::storage::forms::RetrieveParams;
use crate::storage::models::{Document, FolderType};

use serde_json::Value;

#[async_trait::async_trait]
pub trait DocumentService {
async fn get_documents(
&self,
folder_id: &str,
folder_type: &FolderType,
params: &RetrieveParams,
) -> StorageResult<Vec<Value>>;

async fn get_document(
&self,
folder_id: &str,
doc_id: &str,
folder_type: &FolderType,
) -> StorageResult<Value>;

async fn create_document(
&self,
folder_id: &str,
doc: &Document,
folder_type: &FolderType,
) -> StorageResult<Successful>;

async fn update_document(
&self,
folder_id: &str,
doc: &Value,
folder_type: &FolderType,
) -> StorageResult<Successful>;

async fn delete_document(&self, folder_id: &str, doc_id: &str) -> StorageResult<Successful>;
}
132 changes: 132 additions & 0 deletions src/storage/elastic/documents/helper.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
use crate::elastic::ElasticClient;
use crate::errors::Successful;
use crate::storage::elastic::documents::retrieve::Retrieve;
use crate::storage::elastic::documents::store::StoreTrait;
use crate::storage::elastic::documents::update::UpdateTrait;
use crate::storage::elastic::EsCxt;
use crate::storage::errors::{StorageError, StorageResult};
use crate::storage::forms::RetrieveParams;
use crate::storage::models::{Document, DocumentVectors};
use crate::storage::models::{DocumentsTrait, FolderType};

use elasticsearch::http::response::Response;
use serde::Deserialize;
use serde_json::Value;

pub async fn extract_document<'de, T>(response: Response) -> StorageResult<T>
where
T: DocumentsTrait + serde::Deserialize<'de>,
{
let common_object = response.json::<Value>().await?;
let document_json = &common_object[&"_source"];
let document = T::deserialize(document_json.to_owned())?;
Ok(document)
}

pub async fn extract_all_documents<'de, T>(response: Response) -> StorageResult<Vec<Value>>
where
T: Retrieve<'de, T> + DocumentsTrait + serde::Serialize + serde::Deserialize<'de>,
{
let value = response.json::<Value>().await?;
let founded_arr = &value[&"hits"][&"hits"].as_array();
let Some(values) = founded_arr else {
let msg = "returned empty data to get all documents";
tracing::warn!(msg);
return Err(StorageError::SerdeError(msg.to_string()));
};

let documents = values
.iter()
.filter_map(|val| match T::extract_from_response(val) {
Ok(doc) => serde_json::to_value(doc).ok(),
Err(err) => {
tracing::error!("failed to extract documents: {err:#?}");
None
}
})
.collect::<Vec<Value>>();

Ok(documents)
}

impl FolderType {
pub async fn get_document(&self, response: Response) -> StorageResult<Value> {
let common_object = response.json::<Value>().await?;
let document_json = &common_object[&"_source"];

match self {
FolderType::Vectors => {
let document = DocumentVectors::deserialize(document_json.to_owned())?;
let value = serde_json::to_value(document)?;
Ok(value)
}
_ => {
let document = Document::deserialize(document_json.to_owned())?;
let value = serde_json::to_value(document)?;
Ok(value)
}
}
}

pub async fn get_all_documents(
&self,
es: EsCxt,
indexes: &[&str],
params: &RetrieveParams,
) -> StorageResult<Vec<Value>> {
let results = (params.result_size(), params.result_offset());
match self {
FolderType::Vectors => {
let query = DocumentVectors::build_retrieve_query(params).await;
let response =
ElasticClient::search_request(es, &query, None, indexes, results).await?;
let value = extract_all_documents::<DocumentVectors>(response).await?;
Ok(value)
}
_ => {
let query = Document::build_retrieve_query(params).await;
let response =
ElasticClient::search_request(es, &query, None, indexes, results).await?;
let value = extract_all_documents::<Document>(response).await?;
Ok(value)
}
}
}

pub async fn create_document(
&self,
es: EsCxt,
index: &str,
doc: &Document,
) -> StorageResult<Successful> {
match self {
FolderType::Vectors => {
let docs_vec = DocumentVectors::from(doc);
DocumentVectors::store_all(es, index, &docs_vec).await
}
_ => {
let mut doc_cln = doc.to_owned();
doc_cln.exclude_tokens();
Document::store_all(es, index, &doc_cln).await
}
}
}

pub async fn update_document(
&self,
es: EsCxt,
index: &str,
doc: &Value,
) -> StorageResult<Successful> {
match self {
FolderType::Vectors => {
let doc = DocumentVectors::deserialize(doc)?;
DocumentVectors::update(es, index, &doc).await
}
_ => {
let doc = Document::deserialize(doc)?;
Document::update(es, index, &doc).await
}
}
}
}
Loading

0 comments on commit f8bf5b6

Please sign in to comment.