diff --git a/config/development.toml b/config/development.toml index c3df2d7..890a679 100644 --- a/config/development.toml +++ b/config/development.toml @@ -12,7 +12,7 @@ allowed = "*" max_age = 3600 [elastic] -address = "localhost:9200" +address = "158.160.44.99:9200" enabled_tls = "true" username = "elastic" password = "elastic" @@ -27,3 +27,4 @@ expired = 3600 address = "localhost:8085" is_truncate = "false" is_normalize = "false" +enabled_tls = "false" diff --git a/config/production.toml b/config/production.toml index 956dd61..eb08070 100644 --- a/config/production.toml +++ b/config/production.toml @@ -27,3 +27,4 @@ expired = 3600 address = "embeddings:8085" is_truncate = "false" is_normalize = "false" +enabled_tls = "false" diff --git a/src/elastic.rs b/src/elastic.rs index 4bea1bd..6495a90 100644 --- a/src/elastic.rs +++ b/src/elastic.rs @@ -1,14 +1,14 @@ use crate::errors::Successful; +use crate::searcher::errors::SearcherResult; use crate::Connectable; -use crate::searcher::errors::SearcherResult; use elasticsearch::auth::Credentials; use elasticsearch::cert::CertificateValidation; use elasticsearch::http::headers::HeaderMap; use elasticsearch::http::response::Response; use elasticsearch::http::transport::{BuildError, SingleNodeConnectionPool, TransportBuilder}; use elasticsearch::http::{Method, Url}; -use elasticsearch::{Elasticsearch, SearchParts}; +use elasticsearch::{Elasticsearch, Error, SearchParts}; use getset::{CopyGetters, Getters}; use serde_derive::Deserialize; use serde_json::Value; @@ -23,15 +23,14 @@ pub struct ElasticClient { } #[derive(Clone, Deserialize, CopyGetters, Getters)] +#[getset(get = "pub")] pub struct ElasticConfig { - #[getset(get = "pub")] address: String, - #[getset(get_copy = "pub")] - enabled_tls: bool, - #[getset(get = "pub")] username: String, - #[getset(get = "pub")] password: String, + #[getset(skip)] + #[getset(get_copy = "pub")] + enabled_tls: bool, } impl ElasticClient { @@ -39,12 +38,12 @@ impl ElasticClient { self.es_client.clone() } - pub async fn send_request( + pub async fn send_native_request( &self, method: Method, body: Option<&[u8]>, target_url: &str, - ) -> Result { + ) -> Result { let es_client = self.es_client(); let elastic = es_client.write().await; elastic @@ -62,6 +61,7 @@ impl ElasticClient { pub async fn search_request( es: EsCxt, query: &Value, + scroll: Option<&str>, indexes: &[&str], result: (i64, i64), ) -> SearcherResult { @@ -69,11 +69,12 @@ impl ElasticClient { let elastic = es.read().await; let response = elastic .search(SearchParts::Index(indexes)) + .allow_no_indices(true) + .pretty(true) .from(offset) .size(size) .body(query) - .pretty(true) - .allow_no_indices(true) + .scroll(scroll.unwrap_or("1m")) .send() .await?; @@ -81,9 +82,7 @@ impl ElasticClient { Ok(response) } - pub async fn extract_response_msg( - response: Response, - ) -> Result { + pub async fn extract_response_msg(response: Response) -> Result { let _ = response.error_for_status_code()?; Ok(Successful::new(200, "Done")) } diff --git a/src/embeddings/native/mod.rs b/src/embeddings/native/mod.rs index 7084e27..ee7465c 100644 --- a/src/embeddings/native/mod.rs +++ b/src/embeddings/native/mod.rs @@ -63,13 +63,9 @@ impl EmbeddingsService for EmbeddingsClient { "normalize": self.is_normalize(), })) .send() - .await - .map_err(EmbeddingsError::from)?; + .await?; - let embed_data = response - .json::>>() - .await - .map_err(EmbeddingsError::from)?; + let embed_data = response.json::>>().await?; let Some(tokens) = embed_data.first() else { let msg = "loaded empty tokens array"; diff --git a/src/searcher/elastic/mod.rs b/src/searcher/elastic/mod.rs index 67fcd4a..2c64aff 100644 --- a/src/searcher/elastic/mod.rs +++ b/src/searcher/elastic/mod.rs @@ -10,7 +10,6 @@ use crate::searcher::errors::{PaginatedResult, SearcherError, SearcherResult}; use crate::searcher::forms::DocumentType; use crate::searcher::forms::{DeletePaginatesForm, ScrollNextForm}; use crate::searcher::forms::{FulltextParams, SemanticParams}; -use crate::searcher::models::Paginated; use crate::searcher::{PaginatorService, SearcherService}; use crate::storage::models::{Document, DocumentVectors}; @@ -30,10 +29,7 @@ impl SearcherService for ElasticClient { Ok(converter::to_unified_paginated(founded, return_as)) } - async fn search_semantic( - &self, - params: &SemanticParams, - ) -> PaginatedResult { + async fn search_semantic(&self, params: &SemanticParams) -> PaginatedResult { let es = self.es_client(); let query = DocumentVectors::build_search_query(params).await; let founded = DocumentVectors::search(es, &query, params).await?; @@ -55,6 +51,7 @@ impl PaginatorService for ElasticClient { .iter() .map(String::as_str) .collect::>(); + let es_client = self.es_client(); let elastic = es_client.read().await; let response = elastic @@ -85,8 +82,7 @@ impl PaginatorService for ElasticClient { .send() .await?; - let paginated = response.json::>().await?; - let paginated = Paginated::new(paginated); + let paginated = search::extract_searcher_result::(response).await?; Ok(converter::to_unified_paginated(paginated, doc_type)) } } diff --git a/src/searcher/elastic/search.rs b/src/searcher/elastic/search.rs index c265e81..624887b 100644 --- a/src/searcher/elastic/search.rs +++ b/src/searcher/elastic/search.rs @@ -24,9 +24,11 @@ impl Searcher for Document { query: &Value, params: &Self::Params, ) -> PaginatedResult { + let scroll = params.scroll_lifetime(); let results = params.result_size(); let indexes = params.folder_ids().split(',').collect::>(); - let response = ElasticClient::search_request(es_cxt, query, &indexes, results).await?; + let response = + ElasticClient::search_request(es_cxt, query, Some(scroll), &indexes, results).await?; let documents = extract_searcher_result::(response).await?; Ok(documents) } @@ -41,15 +43,17 @@ impl Searcher for DocumentVectors { query: &Value, params: &Self::Params, ) -> PaginatedResult { + let scroll = params.scroll_lifetime(); let results = params.result_size(); let indexes = params.folder_ids().split(',').collect::>(); - let response = ElasticClient::search_request(es_cxt, query, &indexes, results).await?; + let response = + ElasticClient::search_request(es_cxt, query, Some(scroll), &indexes, results).await?; let documents = extract_searcher_result::(response).await?; Ok(documents) } } -async fn extract_searcher_result(response: Response) -> PaginatedResult +pub(super) async fn extract_searcher_result(response: Response) -> PaginatedResult where T: SearchQueryBuilder + DocumentsTrait + serde::Serialize, { diff --git a/src/searcher/endpoints.rs b/src/searcher/endpoints.rs index 70dafac..87d647d 100644 --- a/src/searcher/endpoints.rs +++ b/src/searcher/endpoints.rs @@ -2,8 +2,7 @@ use crate::cacher::CacherService; use crate::embeddings::EmbeddingsService; -use crate::errors::{ErrorResponse, JsonResponse, PaginateResponse, Successful, WebError}; -use crate::searcher::forms::DocumentType; +use crate::errors::{ErrorResponse, JsonResponse, PaginateResponse, Successful}; use crate::searcher::forms::{DeletePaginatesForm, DocumentTypeQuery, ScrollNextForm}; use crate::searcher::forms::{FulltextParams, SemanticParams}; use crate::searcher::models::Paginated; @@ -146,10 +145,7 @@ async fn search_semantic( let client = cxt.get_ref(); let mut search_form = form.0; - let query_tokens = embed - .load_from_text(search_form.query()) - .await - .map_err(WebError::from)?; + let query_tokens = embed.load_from_text(search_form.query()).await?; search_form.set_tokens(query_tokens); @@ -219,7 +215,7 @@ async fn delete_paginate_sessions( ), ), request_body( - content = PaginateNextForm, + content = ScrollNextForm, example = json!(ScrollNextForm::test_example(None)) ), responses( @@ -248,7 +244,7 @@ async fn paginate_next( cxt: PaginateContext, #[cfg(feature = "enable-cacher")] cacher: CacherPaginateContext, form: Json, - document_type: Query, + document_type: Query, ) -> PaginateResponse> { let client = cxt.get_ref(); let pag_form = form.0; @@ -259,7 +255,8 @@ async fn paginate_next( return Ok(Json(docs)); } - let documents = client.paginate(&pag_form, &document_type).await?; + let doc_type = document_type.0.get_type(); + let documents = client.paginate(&pag_form, &doc_type).await?; #[cfg(feature = "enable-cacher")] cacher.insert(&pag_form, &documents).await; diff --git a/src/searcher/forms.rs b/src/searcher/forms.rs index 1cbfbb9..77c2884 100644 --- a/src/searcher/forms.rs +++ b/src/searcher/forms.rs @@ -29,17 +29,12 @@ impl DocumentType { #[derive(Default, Deserialize, IntoParams, ToSchema)] pub struct DocumentTypeQuery { document_type: Option, - is_grouped: bool, } impl DocumentTypeQuery { pub fn get_type(&self) -> DocumentType { self.document_type.clone().unwrap_or(DocumentType::Document) } - - pub fn is_grouped(&self) -> bool { - self.is_grouped - } } #[derive(Builder, Debug, Deserialize, Serialize, Getters, CopyGetters, IntoParams, ToSchema)] @@ -110,6 +105,7 @@ pub struct SemanticParams { query: String, #[getset(skip)] + #[serde(skip_serializing_if = "Option::is_none")] query_tokens: Option>, #[schema(example = "test-folder")] diff --git a/src/searcher/mod.rs b/src/searcher/mod.rs index 802d8b1..1154302 100644 --- a/src/searcher/mod.rs +++ b/src/searcher/mod.rs @@ -20,10 +20,7 @@ pub trait SearcherService { return_as: &DocumentType, ) -> PaginatedResult; - async fn search_semantic( - &self, - params: &SemanticParams, - ) -> PaginatedResult; + async fn search_semantic(&self, params: &SemanticParams) -> PaginatedResult; } #[async_trait::async_trait] diff --git a/src/storage/elastic/helper.rs b/src/storage/elastic/helper.rs index 3aa92a3..f8359d2 100644 --- a/src/storage/elastic/helper.rs +++ b/src/storage/elastic/helper.rs @@ -150,7 +150,7 @@ pub async fn filter_folders( let indexes = &[INFO_FOLDER_ID]; let results = (params.result_size(), params.result_offset()); let query = DocumentVectors::build_retrieve_query(¶ms).await; - let response = ElasticClient::search_request(es_cxt, &query, indexes, results).await?; + let response = ElasticClient::search_request(es_cxt, &query, None, indexes, results).await?; let value = response.json::().await?; let Some(founded) = &value[&"hits"][&"hits"].as_array() else { diff --git a/src/storage/elastic/mod.rs b/src/storage/elastic/mod.rs index 21a019f..ac337c4 100644 --- a/src/storage/elastic/mod.rs +++ b/src/storage/elastic/mod.rs @@ -10,7 +10,7 @@ use crate::errors::Successful; use crate::storage::elastic::retrieve::Retrieve; use crate::storage::elastic::store::StoreTrait; use crate::storage::elastic::update::UpdateTrait; -use crate::storage::errors::{StorageError, StorageResult}; +use crate::storage::errors::StorageResult; use crate::storage::forms::{CreateFolderForm, RetrieveParams}; use crate::storage::models::INFO_FOLDER_ID; use crate::storage::models::{Document, DocumentVectors, Folder, FolderType, InfoFolder}; @@ -32,7 +32,7 @@ const CAT_INDICES_URL: &str = "/_cat/indices?format=json"; impl FolderService for ElasticClient { async fn get_folders(&self, show_all: bool) -> StorageResult> { let response = self - .send_request(Method::Get, None, CAT_INDICES_URL) + .send_native_request(Method::Get, None, CAT_INDICES_URL) .await?; let folders = response.json::>().await?; @@ -42,10 +42,9 @@ impl FolderService for ElasticClient { async fn get_folder(&self, folder_id: &str) -> StorageResult { let target_url = format!("/{folder_id}/_stats"); let response = self - .send_request(Method::Get, None, &target_url) + .send_native_request(Method::Get, None, &target_url) .await? - .error_for_status_code() - .map_err(StorageError::from)?; + .error_for_status_code()?; let value = response.json::().await?; let mut folder = Folder::from_value(value).await?; @@ -113,13 +112,15 @@ impl DocumentService for ElasticClient { match folder_type { FolderType::Vectors => { let query = DocumentVectors::build_retrieve_query(params).await; - let response = ElasticClient::search_request(es, &query, &folders, results).await?; + let response = + ElasticClient::search_request(es, &query, None, &folders, results).await?; let value = helper::extract_from_response::(response).await?; Ok(value) } _ => { let query = Document::build_retrieve_query(params).await; - let response = ElasticClient::search_request(es, &query, &folders, results).await?; + let response = + ElasticClient::search_request(es, &query, None, &folders, results).await?; let value = helper::extract_from_response::(response).await?; Ok(value) } @@ -134,10 +135,9 @@ impl DocumentService for ElasticClient { ) -> StorageResult { let s_doc_path = format!("/{}/_doc/{}", folder_id, doc_id); let response = self - .send_request(Method::Get, None, &s_doc_path) + .send_native_request(Method::Get, None, &s_doc_path) .await? - .error_for_status_code() - .map_err(StorageError::from)?; + .error_for_status_code()?; match folder_type { FolderType::Vectors => {