From 0a0dab4fe1011b0c13b20b283df001b9adf0d7ea Mon Sep 17 00:00:00 2001 From: adalpane Date: Fri, 12 Jul 2024 11:15:21 +0200 Subject: [PATCH 1/8] don't panic --- .gitignore | 1 + src/blocking/client.rs | 140 +++++++++++++++++++++------------------- src/client.rs | 141 ++++++++++++++++++++++------------------- 3 files changed, 151 insertions(+), 131 deletions(-) diff --git a/.gitignore b/.gitignore index 4ad9711..a6bd56b 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,4 @@ local-* /target Cargo.lock +.vscode \ No newline at end of file diff --git a/src/blocking/client.rs b/src/blocking/client.rs index a558767..2735472 100644 --- a/src/blocking/client.rs +++ b/src/blocking/client.rs @@ -32,10 +32,10 @@ impl Client { data_root: Option, ) -> Result { if provider_config.share_credentials_version > CREDENTIALS_VERSION { - panic!("'share_credentials_version' in the provider configuration is {}, which is newer than the \ + return Err(anyhow::anyhow!("'share_credentials_version' in the provider configuration is {}, which is newer than the \ version {} supported by the current release. Please upgrade to a newer release.", provider_config.share_credentials_version, - CREDENTIALS_VERSION); + CREDENTIALS_VERSION)); } let cache: HashMap = HashMap::new(); Ok(Self { @@ -46,29 +46,32 @@ impl Client { .as_path() .join("delta_sharing") .to_str() - .unwrap() + .ok_or(anyhow::anyhow!("Error selecting data root folder"))? .to_string(), ), cache: cache, }) } - fn get_client(config: &ProviderConfig) -> Result { + fn get_client(config: &ProviderConfig) -> Result { let rust_version: &str = &format!("{}", rustc_version_runtime::version()); let user_agent: &str = &format!("Delta-Sharing-Rust/{VERSION} Rust/{rust_version}"); let bearer_token = &format!("Bearer {}", config.bearer_token); let mut headers = header::HeaderMap::new(); headers.insert( header::AUTHORIZATION, - header::HeaderValue::from_str(bearer_token).unwrap(), + header::HeaderValue::from_str(bearer_token) + .map_err(|e| anyhow::anyhow!("Error setting authorization header:{e}"))?, ); headers.insert( header::USER_AGENT, - header::HeaderValue::from_str(user_agent).unwrap(), + header::HeaderValue::from_str(user_agent) + .map_err(|e| anyhow::anyhow!("Error setting user agent header:{e}"))?, ); reqwest::blocking::Client::builder() .default_headers(headers) .build() + .map_err(|e| anyhow::anyhow!("Error building Http client: {e}")) } fn build_base_url(endpoint: &String) -> Result { @@ -77,8 +80,9 @@ impl Client { Url::parse(&root_path) } - fn get(&self, target: &str) -> Result { - let url = self.base_url.join(target).unwrap(); + fn get(&self, target: &str) -> Result { + let url = self.base_url.join(target) + .map_err(|e| anyhow::anyhow!("Error creating GET url: {e}"))?; debug!("--> HTTP GET to: {}", &url); let resp = self.http_client.get(url.as_str()).send()?; let resp_text = resp.text()?; @@ -86,23 +90,25 @@ impl Client { return Ok(resp_text); } - fn head(&self, target: &str, key: &str) -> Option { - let url = self.base_url.join(target).unwrap(); + fn head(&self, target: &str, key: &str) -> Result, anyhow::Error> { + let url = self.base_url.join(target) + .map_err(|e| anyhow::anyhow!("Error creating HEAD url: {e}"))?; debug!("HTTP HEAD to: {}", &url); let resp = self .http_client .head(url.as_str()) .send() - .expect("Invalid request"); + .map_err(|e| anyhow::anyhow!("Invalid HEAD request: {e}"))?; let version = resp.headers().get(key); match version { - Some(h) => Some(h.clone()), - None => None, + Some(h) => Ok(Some(h.clone())), + None => Ok(None), } } - fn post(&self, target: &str, json: &Map) -> Result { - let url = self.base_url.join(target).unwrap(); + fn post(&self, target: &str, json: &Map) -> Result { + let url = self.base_url.join(target) + .map_err(|e| anyhow::anyhow!("Error creating POST url: {e}"))?; debug!("--> HTTP POST to: {}", &url); let resp = self.http_client.post(url.as_str()).json(json).send()?; let resp_text = resp.text()?; @@ -110,24 +116,27 @@ impl Client { return Ok(resp_text); } - fn download(&self, url: String, dest_path: &Path) { + fn download(&self, url: String, dest_path: &Path) -> Result { debug!("--> Download {} to {}", &url, dest_path.display()); - let resp = reqwest::blocking::get(url).unwrap(); - let mut out = fs::File::create(dest_path).expect("Failed to create an output file"); - let content = resp.bytes().unwrap(); + let resp = reqwest::blocking::get(url) + .map_err(|e| anyhow::anyhow!("Error creating POST url: {e}"))?; + let mut out = fs::File::create(dest_path) + .map_err(|e| anyhow::anyhow!("Failed to create an output file: {e}"))?; + let content = resp.bytes() + .map_err(|e| anyhow::anyhow!("Failed to read download bytes: {e}"))?; io::copy(&mut content.as_bytes(), &mut out) - .expect("Failed to save the content to output file"); + .map_err(|e| anyhow::anyhow!("Failed to save the content to output file: {e}")) } pub fn list_shares(&self) -> Result, anyhow::Error> { let shares = self.get("shares")?; - let parsed: ShareResponse = serde_json::from_str(&shares).expect("Invalid response"); + let parsed: ShareResponse = serde_json::from_str(&shares).map_err(|e| anyhow::anyhow!("Invalid list shares response: {e}"))?; return Ok(parsed.items.clone()); } pub fn list_schemas(&self, share: &Share) -> Result, anyhow::Error> { let schemas = self.get(&format!("shares/{}/schemas", share.name))?; - let parsed: SchemaResponse = serde_json::from_str(&schemas).expect("Invalid response"); + let parsed: SchemaResponse = serde_json::from_str(&schemas).map_err(|e| anyhow::anyhow!("Invalid list schemas response: {e}"))?; return Ok(parsed.items.clone()); } @@ -136,13 +145,13 @@ impl Client { "shares/{}/schemas/{}/tables", schema.share, schema.name ))?; - let parsed: TableResponse = serde_json::from_str(&tables).expect("Invalid response"); + let parsed: TableResponse = serde_json::from_str(&tables).map_err(|e| anyhow::anyhow!("Invalid list tables response: {e}"))?; return Ok(parsed.items.clone()); } pub fn list_all_tables(&self, share: &Share) -> Result, anyhow::Error> { let tables = self.get(&format!("shares/{}/all-tables", share.name))?; - let parsed: TableResponse = serde_json::from_str(&tables).expect("Invalid response"); + let parsed: TableResponse = serde_json::from_str(&tables).map_err(|e| anyhow::anyhow!("Invalid list all tables response: {e}"))?; return Ok(parsed.items.clone()); } @@ -153,11 +162,13 @@ impl Client { ))?; let mut meta_lines = meta.lines(); let protocol: ProtocolResponse = - serde_json::from_str(meta_lines.next().expect("Invalid response")) - .expect("Invalid protocol"); + meta_lines.next().map(|lines| serde_json::from_str::(lines) + .map_err(|e| anyhow::anyhow!("Invalid protocol response - {lines}: {e}"))) + .unwrap_or(Err(anyhow::anyhow!("Empty protocol response")))?; let metadata: MetadataResponse = - serde_json::from_str(meta_lines.next().expect("Invalid response")) - .expect("Invalid metadata"); + meta_lines.next().map(|lines| serde_json::from_str::(lines) + .map_err(|e| anyhow::anyhow!("Invalid metadata response - {lines}: {e}"))) + .unwrap_or(Err(anyhow::anyhow!("Empty metadata response")))?; Ok(TableMetadata { protocol: protocol.protocol, metadata: metadata.metadata, @@ -173,12 +184,9 @@ impl Client { "delta-table-version", ); match version { - Some(v) => v - .to_str() - .expect("Invalid version number") - .parse::() - .expect("Invalid version number"), - None => -1, + Ok(Some(v)) => v + .to_str().ok().and_then(|value| value.parse::().ok()).unwrap_or(-1), + _ => -1, } } @@ -195,23 +203,22 @@ impl Client { "predicateHints".to_string(), Value::Array( predicate_hints - .unwrap() - .iter() - .map(|s| Value::String(s.to_string())) - .collect::>(), + .map(|hints| hints.iter().map(|s| Value::String(s.to_string())) + .collect::>()) + .unwrap_or_default() ), ); } - if limit_hint.is_some() { + if let Some(limit_hint) = limit_hint { map.insert( "limitHint".to_string(), - Value::Number(Number::from(limit_hint.unwrap())), + Value::Number(Number::from(limit_hint)), ); } - if version.is_some() { + if let Some(version) = version { map.insert( "version".to_string(), - Value::Number(Number::from(version.unwrap())), + Value::Number(Number::from(version)), ); } let response = self.post( @@ -223,14 +230,16 @@ impl Client { )?; let mut lines = response.lines(); let protocol: ProtocolResponse = - serde_json::from_str(lines.next().expect("Invalid response")) - .expect("Invalid protocol"); + lines.next().map(|lines| serde_json::from_str::(lines) + .map_err(|e| anyhow::anyhow!("Invalid protocol response - {lines}: {e}"))) + .unwrap_or(Err(anyhow::anyhow!("Empty protocol response")))?; let metadata: MetadataResponse = - serde_json::from_str(lines.next().expect("Invalid response")) - .expect("Invalid metadata"); + lines.next().map(|lines| serde_json::from_str::(lines) + .map_err(|e| anyhow::anyhow!("Invalid metadata response - {lines}: {e}"))) + .unwrap_or(Err(anyhow::anyhow!("Empty metadata response")))?; let mut files: Vec = Vec::new(); for l in lines { - let file: FileResponse = serde_json::from_str(l).expect("Invalid file info"); + let file: FileResponse = serde_json::from_str(l).map_err(|e| anyhow::anyhow!("Invalid file info: {e}"))?; files.push(file.file.clone()); } Ok(TableFiles { @@ -242,29 +251,30 @@ impl Client { }) } - fn download_files(&self, table_path: &PathBuf, table_files: &TableFiles) -> Vec { + fn download_files(&self, table_path: &PathBuf, table_files: &TableFiles) -> Result, anyhow::Error> { if Path::exists(&table_path) { - fs::remove_dir_all(&table_path).unwrap(); + fs::remove_dir_all(&table_path).map_err(|e| anyhow::anyhow!("Error cleaning table path: {e}"))?; } - fs::create_dir_all(&table_path).unwrap(); + fs::create_dir_all(&table_path).map_err(|e| anyhow::anyhow!("Error creating table path: {e}"))?; let mut file_paths: Vec = Vec::new(); for file in table_files.files.clone() { let dst_path = &table_path.join(format!("{}.snappy.parquet", &file.id)); - self.download(file.url, &dst_path); + let _ = self.download(file.url, &dst_path); file_paths.push(dst_path.clone()); } - file_paths.clone() + Ok(file_paths.clone()) } - fn load_cached(&self, table_path: &PathBuf, table_files: &TableFiles) -> Option> { + fn load_cached(&self, table_path: &PathBuf, table_files: &TableFiles) -> Result>, anyhow::Error> { // Check if the files exist, load and compare the files. let metadata_path = &table_path.join(METADATA_FILE); if Path::exists(&metadata_path) { - let metadata_str = &fs::read_to_string(&metadata_path).unwrap(); - let metadata: TableMetadata = serde_json::from_str(&metadata_str).expect(&format!( - "Invalid configuration in {}", - metadata_path.display() - )); + let metadata_str = &fs::read_to_string(&metadata_path).map_err(|e| anyhow::anyhow!("Error reading file path {}: {}", metadata_path.display(), e))?; + let metadata: TableMetadata = serde_json::from_str(&metadata_str).map_err(|e| anyhow::anyhow!( + "Invalid configuration in {}: {}", + metadata_path.display(), + e + ))?; let mut download = metadata != table_files.metadata; if !download { @@ -274,27 +284,27 @@ impl Client { if !Path::exists(&file_path) { // File is missing, invalidate cache download = true; - fs::remove_dir(&table_path).unwrap(); + fs::remove_dir(&table_path).map_err(|e| anyhow::anyhow!("Error invalidating cache: {e}"))?; break; } file_paths.push(file_path.clone()); } if !download { - return Some(file_paths.clone()); + return Ok(Some(file_paths.clone())); } } } - None + Ok(None) } pub fn get_files(&mut self, table: &Table) -> Result, anyhow::Error> { let key = table.fully_qualified_name(); let mut download = true; let table_path = Path::new(&self.data_root).join(table.fully_qualified_name()); - let table_files = self.list_table_files(table, None, None, None).unwrap(); + let table_files = self.list_table_files(table, None, None, None)?; if let Some(cached) = self.cache.get(&key) { download = cached.table_files.metadata != table_files.metadata; - } else if let Some(cached) = self.load_cached(&table_path, &table_files) { + } else if let Some(cached) = self.load_cached(&table_path, &table_files)? { download = false; self.cache.insert( key.clone(), @@ -306,7 +316,7 @@ impl Client { } if download { info!("--> Downloading data files to {}", &table_path.display()); - let paths = self.download_files(&table_path, &table_files); + let paths = self.download_files(&table_path, &table_files)?; serde_json::to_writer( &fs::File::create(&table_path.join(METADATA_FILE))?, &table_files.metadata, @@ -319,7 +329,7 @@ impl Client { }, ); } - Ok(self.cache.get(&key).unwrap().file_paths.clone()) + Ok(self.cache.get(&key).ok_or(anyhow::anyhow!("Error reading {key} from cache"))?.file_paths.clone()) } pub fn get_dataframe(&mut self, table: &Table) -> PolarResult { diff --git a/src/client.rs b/src/client.rs index 3c0ba1a..95eb86f 100644 --- a/src/client.rs +++ b/src/client.rs @@ -32,10 +32,10 @@ impl Client { data_root: Option, ) -> Result { if provider_config.share_credentials_version > CREDENTIALS_VERSION { - panic!("'share_credentials_version' in the provider configuration is {}, which is newer than the \ + return Err(anyhow::anyhow!("'share_credentials_version' in the provider configuration is {}, which is newer than the \ version {} supported by the current release. Please upgrade to a newer release.", provider_config.share_credentials_version, - CREDENTIALS_VERSION); + CREDENTIALS_VERSION)); } let cache: HashMap = HashMap::new(); Ok(Self { @@ -46,27 +46,30 @@ impl Client { .as_path() .join("delta_sharing") .to_str() - .unwrap() + .ok_or(anyhow::anyhow!("Error selecting data root folder"))? .to_string(), ), cache: cache, }) } - fn get_client(config: &ProviderConfig) -> Result { + fn get_client(config: &ProviderConfig) -> Result { let rust_version: &str = &format!("{}", rustc_version_runtime::version()); let user_agent: &str = &format!("Delta-Sharing-Rust/{VERSION} Rust/{rust_version}"); let bearer_token = &format!("Bearer {}", config.bearer_token); let mut headers = header::HeaderMap::new(); headers.insert( header::AUTHORIZATION, - header::HeaderValue::from_str(bearer_token).unwrap(), + header::HeaderValue::from_str(bearer_token) + .map_err(|e| anyhow::anyhow!("Error setting authorization header:{e}"))?, ); headers.insert( header::USER_AGENT, - header::HeaderValue::from_str(user_agent).unwrap(), + header::HeaderValue::from_str(user_agent) + .map_err(|e| anyhow::anyhow!("Error setting user agent header:{e}"))?, ); reqwest::Client::builder().default_headers(headers).build() + .map_err(|e| anyhow::anyhow!("Error building Http client: {e}")) } fn build_base_url(endpoint: &String) -> Result { @@ -75,8 +78,9 @@ impl Client { Url::parse(&root_path) } - async fn get(&self, target: &str) -> Result { - let url = self.base_url.join(target).unwrap(); + async fn get(&self, target: &str) -> Result { + let url = self.base_url.join(target) + .map_err(|e| anyhow::anyhow!("Error creating GET url: {e}"))?; debug!("--> HTTP GET to: {}", &url); let resp = self.http_client.get(url.as_str()).send().await?; let resp_text = resp.text().await?; @@ -84,19 +88,20 @@ impl Client { return Ok(resp_text); } - async fn head(&self, target: &str, key: &str) -> Option { - let url = self.base_url.join(target).unwrap(); + async fn head(&self, target: &str, key: &str) -> Result, anyhow::Error> { + let url = self.base_url.join(target) + .map_err(|e| anyhow::anyhow!("Error creating HEAD url: {e}"))?; debug!("HTTP HEAD to: {}", &url); let resp = self .http_client .head(url.as_str()) .send() .await - .expect("Invalid request"); + .map_err(|e| anyhow::anyhow!("Invalid HEAD request: {e}"))?; let version = resp.headers().get(key); match version { - Some(h) => Some(h.clone()), - None => None, + Some(h) => Ok(Some(h.clone())), + None => Ok(None), } } @@ -104,8 +109,9 @@ impl Client { &self, target: &str, json: &Map, - ) -> Result { - let url = self.base_url.join(target).unwrap(); + ) -> Result { + let url = self.base_url.join(target) + .map_err(|e| anyhow::anyhow!("Error creating POST url: {e}"))?; debug!("--> HTTP POST to: {}", &url); let resp = self .http_client @@ -118,24 +124,27 @@ impl Client { return Ok(resp_text); } - async fn download(&self, url: String, dest_path: &Path) { + async fn download(&self, url: String, dest_path: &Path) -> Result { debug!("--> Download {} to {}", &url, dest_path.display()); - let resp = reqwest::get(url).await.unwrap(); - let mut out = fs::File::create(dest_path).expect("Failed to create an output file"); - let content = resp.bytes().await.unwrap(); + let resp = reqwest::get(url).await + .map_err(|e| anyhow::anyhow!("Error creating POST url: {e}"))?; + let mut out = fs::File::create(dest_path) + .map_err(|e| anyhow::anyhow!("Failed to create an output file: {e}"))?; + let content = resp.bytes().await + .map_err(|e| anyhow::anyhow!("Failed to read download bytes: {e}"))?; io::copy(&mut content.as_bytes(), &mut out) - .expect("Failed to save the content to output file"); + .map_err(|e| anyhow::anyhow!("Failed to save the content to output file: {e}")) } pub async fn list_shares(&self) -> Result, anyhow::Error> { let shares = self.get("shares").await?; - let parsed: ShareResponse = serde_json::from_str(&shares).expect("Invalid response"); + let parsed: ShareResponse = serde_json::from_str(&shares).map_err(|e| anyhow::anyhow!("Invalid list shares response: {e}"))?; return Ok(parsed.items.clone()); } pub async fn list_schemas(&self, share: &Share) -> Result, anyhow::Error> { let schemas = self.get(&format!("shares/{}/schemas", share.name)).await?; - let parsed: SchemaResponse = serde_json::from_str(&schemas).expect("Invalid response"); + let parsed: SchemaResponse = serde_json::from_str(&schemas).map_err(|e| anyhow::anyhow!("Invalid list schemas response: {e}"))?; return Ok(parsed.items.clone()); } @@ -146,7 +155,7 @@ impl Client { schema.share, schema.name )) .await?; - let parsed: TableResponse = serde_json::from_str(&tables).expect("Invalid response"); + let parsed: TableResponse = serde_json::from_str(&tables).map_err(|e| anyhow::anyhow!("Invalid list tables response: {e}"))?; return Ok(parsed.items.clone()); } @@ -154,7 +163,7 @@ impl Client { let tables = self .get(&format!("shares/{}/all-tables", share.name)) .await?; - let parsed: TableResponse = serde_json::from_str(&tables).expect("Invalid response"); + let parsed: TableResponse = serde_json::from_str(&tables).map_err(|e| anyhow::anyhow!("Invalid list all tables response: {e}"))?; return Ok(parsed.items.clone()); } @@ -167,11 +176,13 @@ impl Client { .await?; let mut meta_lines = meta.lines(); let protocol: ProtocolResponse = - serde_json::from_str(meta_lines.next().expect("Invalid response")) - .expect("Invalid protocol"); + meta_lines.next().map(|lines| serde_json::from_str::(lines) + .map_err(|e| anyhow::anyhow!("Invalid protocol response - {lines}: {e}"))) + .unwrap_or(Err(anyhow::anyhow!("Empty protocol response")))?; let metadata: MetadataResponse = - serde_json::from_str(meta_lines.next().expect("Invalid response")) - .expect("Invalid metadata"); + meta_lines.next().map(|lines| serde_json::from_str::(lines) + .map_err(|e| anyhow::anyhow!("Invalid metadata response - {lines}: {e}"))) + .unwrap_or(Err(anyhow::anyhow!("Empty metadata response")))?; Ok(TableMetadata { protocol: protocol.protocol, metadata: metadata.metadata, @@ -189,12 +200,9 @@ impl Client { ) .await; match version { - Some(v) => v - .to_str() - .expect("Invalid version number") - .parse::() - .expect("Invalid version number"), - None => -1, + Ok(Some(v)) => v + .to_str().ok().and_then(|value| value.parse::().ok()).unwrap_or(-1), + _ => -1, } } @@ -211,23 +219,22 @@ impl Client { "predicateHints".to_string(), Value::Array( predicate_hints - .unwrap() - .iter() - .map(|s| Value::String(s.to_string())) - .collect::>(), + .map(|hints| hints.iter().map(|s| Value::String(s.to_string())) + .collect::>()) + .unwrap_or_default() ), ); } - if limit_hint.is_some() { + if let Some(limit_hint) = limit_hint { map.insert( "limitHint".to_string(), - Value::Number(Number::from(limit_hint.unwrap())), + Value::Number(Number::from(limit_hint)), ); } - if version.is_some() { + if let Some(version) = version { map.insert( "version".to_string(), - Value::Number(Number::from(version.unwrap())), + Value::Number(Number::from(version)), ); } let response = self @@ -241,14 +248,16 @@ impl Client { .await?; let mut lines = response.lines(); let protocol: ProtocolResponse = - serde_json::from_str(lines.next().expect("Invalid response")) - .expect("Invalid protocol"); + lines.next().map(|lines| serde_json::from_str::(lines) + .map_err(|e| anyhow::anyhow!("Invalid protocol response - {lines}: {e}"))) + .unwrap_or(Err(anyhow::anyhow!("Empty protocol response")))?; let metadata: MetadataResponse = - serde_json::from_str(lines.next().expect("Invalid response")) - .expect("Invalid metadata"); + lines.next().map(|lines| serde_json::from_str::(lines) + .map_err(|e| anyhow::anyhow!("Invalid metadata response - {lines}: {e}"))) + .unwrap_or(Err(anyhow::anyhow!("Empty metadata response")))?; let mut files: Vec = Vec::new(); for l in lines { - let file: FileResponse = serde_json::from_str(l).expect("Invalid file info"); + let file: FileResponse = serde_json::from_str(l).map_err(|e| anyhow::anyhow!("Invalid file info: {e}"))?; files.push(file.file.clone()); } Ok(TableFiles { @@ -260,33 +269,34 @@ impl Client { }) } - async fn download_files(&self, table_path: &PathBuf, table_files: &TableFiles) -> Vec { + async fn download_files(&self, table_path: &PathBuf, table_files: &TableFiles) -> Result, anyhow::Error> { if Path::exists(&table_path) { - fs::remove_dir_all(&table_path).unwrap(); + fs::remove_dir_all(&table_path).map_err(|e| anyhow::anyhow!("Error cleaning table path: {e}"))?; } - fs::create_dir_all(&table_path).unwrap(); + fs::create_dir_all(&table_path).map_err(|e| anyhow::anyhow!("Error creating table path: {e}"))?; let mut file_paths: Vec = Vec::new(); for file in table_files.files.clone() { let dst_path = &table_path.join(format!("{}.snappy.parquet", &file.id)); - self.download(file.url, &dst_path).await; + let _ = self.download(file.url, &dst_path).await; file_paths.push(dst_path.clone()); } - file_paths.clone() + Ok(file_paths.clone()) } async fn load_cached( &self, table_path: &PathBuf, table_files: &TableFiles, - ) -> Option> { + ) -> Result>, anyhow::Error> { // Check if the files exist, load and compare the files. let metadata_path = &table_path.join(METADATA_FILE); if Path::exists(&metadata_path) { - let metadata_str = &fs::read_to_string(&metadata_path).unwrap(); - let metadata: TableMetadata = serde_json::from_str(&metadata_str).expect(&format!( - "Invalid configuration in {}", - metadata_path.display() - )); + let metadata_str = &fs::read_to_string(&metadata_path).map_err(|e| anyhow::anyhow!("Error reading file path {}: {}", metadata_path.display(), e))?; + let metadata: TableMetadata = serde_json::from_str(&metadata_str).map_err(|e| anyhow::anyhow!( + "Invalid configuration in {}: {}", + metadata_path.display(), + e + ))?; let mut download = metadata != table_files.metadata; if !download { @@ -296,17 +306,17 @@ impl Client { if !Path::exists(&file_path) { // File is missing, invalidate cache download = true; - fs::remove_dir(&table_path).unwrap(); + fs::remove_dir(&table_path).map_err(|e| anyhow::anyhow!("Error invalidating cache: {e}"))?; break; } file_paths.push(file_path.clone()); } if !download { - return Some(file_paths.clone()); + return Ok(Some(file_paths.clone())); } } } - None + Ok(None) } pub async fn get_files(&mut self, table: &Table) -> Result, anyhow::Error> { @@ -316,7 +326,7 @@ impl Client { let table_files = self.list_table_files(table, None, None, None).await?; if let Some(cached) = self.cache.get(&key) { download = cached.table_files.metadata != table_files.metadata; - } else if let Some(cached) = self.load_cached(&table_path, &table_files).await { + } else if let Some(cached) = self.load_cached(&table_path, &table_files).await? { download = false; self.cache.insert( key.clone(), @@ -328,7 +338,7 @@ impl Client { } if download { info!("--> Downloading data files to {}", &table_path.display()); - let paths = self.download_files(&table_path, &table_files).await; + let paths = self.download_files(&table_path, &table_files).await?; serde_json::to_writer( &fs::File::create(&table_path.join(METADATA_FILE))?, &table_files.metadata, @@ -341,7 +351,7 @@ impl Client { }, ); } - Ok(self.cache.get(&key).unwrap().file_paths.clone()) + Ok(self.cache.get(&key).ok_or(anyhow::anyhow!("Error reading {key} from cache"))?.file_paths.clone()) } pub async fn get_dataframe(&mut self, table: &Table) -> PolarResult { @@ -364,7 +374,6 @@ mod tests { endpoint: "https://sharing.delta.io/delta-sharing/".to_string(), bearer_token: "token".to_string(), }; - let c = super::Client::new(config, None).await; - drop(c); + let _ = super::Client::new(config, None).await.unwrap(); } } From 12573e71993138a5ece8e23edce9a355fee140cf Mon Sep 17 00:00:00 2001 From: adalpane Date: Fri, 12 Jul 2024 11:34:12 +0200 Subject: [PATCH 2/8] allow to specify file request in get dataframe --- examples/async.rs | 2 +- examples/blocking.rs | 2 +- src/blocking/client.rs | 23 ++++++++++------------- src/blocking/mod.rs | 2 +- src/client.rs | 23 ++++++++++------------- src/lib.rs | 2 +- src/protocol.rs | 6 ++++++ src/utils.rs | 7 ++----- tests/blocking.rs | 4 ++-- tests/client.rs | 8 ++++---- tests/common/mod.rs | 3 +++ 11 files changed, 41 insertions(+), 41 deletions(-) diff --git a/examples/async.rs b/examples/async.rs index 9f2f83a..e1b476b 100644 --- a/examples/async.rs +++ b/examples/async.rs @@ -42,7 +42,7 @@ async fn main() { ); } else { let res = app - .get_dataframe(&tables[0]) + .get_dataframe(&tables[0], None) .await .unwrap() .collect() diff --git a/examples/blocking.rs b/examples/blocking.rs index c994b52..0a23ff6 100644 --- a/examples/blocking.rs +++ b/examples/blocking.rs @@ -22,7 +22,7 @@ fn main() { shares[0].name ); } else { - let res = app.get_dataframe(&tables[0]).unwrap().collect().unwrap(); + let res = app.get_dataframe(&tables[0], None).unwrap().collect().unwrap(); println!("Dataframe:\n {}", res); } } diff --git a/src/blocking/client.rs b/src/blocking/client.rs index 2735472..57986be 100644 --- a/src/blocking/client.rs +++ b/src/blocking/client.rs @@ -193,29 +193,26 @@ impl Client { pub fn list_table_files( &self, table: &Table, - predicate_hints: Option>, - limit_hint: Option, - version: Option, + request: Option, ) -> Result { let mut map = Map::new(); - if predicate_hints.is_some() { + if let Some(predicate_hints) = request.as_ref().and_then(|r| r.predicate_hints.as_ref()) { map.insert( "predicateHints".to_string(), Value::Array( predicate_hints - .map(|hints| hints.iter().map(|s| Value::String(s.to_string())) - .collect::>()) - .unwrap_or_default() + .iter().map(|s| Value::String(s.to_string())) + .collect::>() ), ); } - if let Some(limit_hint) = limit_hint { + if let Some(limit_hint) = request.as_ref().and_then(|r| r.limit_hint) { map.insert( "limitHint".to_string(), Value::Number(Number::from(limit_hint)), ); } - if let Some(version) = version { + if let Some(version) = request.as_ref().and_then(|r| r.version) { map.insert( "version".to_string(), Value::Number(Number::from(version)), @@ -297,11 +294,11 @@ impl Client { Ok(None) } - pub fn get_files(&mut self, table: &Table) -> Result, anyhow::Error> { + pub fn get_files(&mut self, table: &Table, request: Option) -> Result, anyhow::Error> { let key = table.fully_qualified_name(); let mut download = true; let table_path = Path::new(&self.data_root).join(table.fully_qualified_name()); - let table_files = self.list_table_files(table, None, None, None)?; + let table_files = self.list_table_files(table, request)?; if let Some(cached) = self.cache.get(&key) { download = cached.table_files.metadata != table_files.metadata; } else if let Some(cached) = self.load_cached(&table_path, &table_files)? { @@ -332,8 +329,8 @@ impl Client { Ok(self.cache.get(&key).ok_or(anyhow::anyhow!("Error reading {key} from cache"))?.file_paths.clone()) } - pub fn get_dataframe(&mut self, table: &Table) -> PolarResult { - self.get_files(&table)?; + pub fn get_dataframe(&mut self, table: &Table, request: Option) -> PolarResult { + self.get_files(&table, request)?; let table_path = Path::new(&self.data_root).join(table.fully_qualified_name()); load_parquet_files_as_dataframe(&table_path) } diff --git a/src/blocking/mod.rs b/src/blocking/mod.rs index 1f3f67f..89ba623 100644 --- a/src/blocking/mod.rs +++ b/src/blocking/mod.rs @@ -38,7 +38,7 @@ //! shares[0].name //! ); //! } else { -//! let res = app.get_dataframe(&tables[0]).unwrap().collect().unwrap(); +//! let res = app.get_dataframe(&tables[0], None).unwrap().collect().unwrap(); //! println!("Dataframe:\n {}", res); //! } //! } diff --git a/src/client.rs b/src/client.rs index 95eb86f..9f8e5b7 100644 --- a/src/client.rs +++ b/src/client.rs @@ -209,29 +209,26 @@ impl Client { pub async fn list_table_files( &self, table: &Table, - predicate_hints: Option>, - limit_hint: Option, - version: Option, + request: Option ) -> Result { let mut map = Map::new(); - if predicate_hints.is_some() { + if let Some(predicate_hints) = request.as_ref().and_then(|r| r.predicate_hints.as_ref()) { map.insert( "predicateHints".to_string(), Value::Array( predicate_hints - .map(|hints| hints.iter().map(|s| Value::String(s.to_string())) - .collect::>()) - .unwrap_or_default() + .iter().map(|s| Value::String(s.to_string())) + .collect::>() ), ); } - if let Some(limit_hint) = limit_hint { + if let Some(limit_hint) = request.as_ref().and_then(|r| r.limit_hint) { map.insert( "limitHint".to_string(), Value::Number(Number::from(limit_hint)), ); } - if let Some(version) = version { + if let Some(version) = request.as_ref().and_then(|r| r.version) { map.insert( "version".to_string(), Value::Number(Number::from(version)), @@ -319,11 +316,11 @@ impl Client { Ok(None) } - pub async fn get_files(&mut self, table: &Table) -> Result, anyhow::Error> { + pub async fn get_files(&mut self, table: &Table, request: Option) -> Result, anyhow::Error> { let key = table.fully_qualified_name(); let mut download = true; let table_path = Path::new(&self.data_root).join(table.fully_qualified_name()); - let table_files = self.list_table_files(table, None, None, None).await?; + let table_files = self.list_table_files(table, request).await?; if let Some(cached) = self.cache.get(&key) { download = cached.table_files.metadata != table_files.metadata; } else if let Some(cached) = self.load_cached(&table_path, &table_files).await? { @@ -354,8 +351,8 @@ impl Client { Ok(self.cache.get(&key).ok_or(anyhow::anyhow!("Error reading {key} from cache"))?.file_paths.clone()) } - pub async fn get_dataframe(&mut self, table: &Table) -> PolarResult { - self.get_files(&table).await?; + pub async fn get_dataframe(&mut self, table: &Table, request: Option) -> PolarResult { + self.get_files(&table, request).await?; let table_path = Path::new(&self.data_root).join(table.fully_qualified_name()); load_parquet_files_as_dataframe(&table_path) } diff --git a/src/lib.rs b/src/lib.rs index 5a37b9f..f7d2367 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -63,7 +63,7 @@ //! ); //! } else { //! let res = app -//! .get_dataframe(&tables[0]) +//! .get_dataframe(&tables[0], None) //! .await //! .unwrap() //! .collect() diff --git a/src/protocol.rs b/src/protocol.rs index d8e69ed..795fc20 100644 --- a/src/protocol.rs +++ b/src/protocol.rs @@ -81,3 +81,9 @@ pub struct TableFiles { pub metadata: TableMetadata, pub files: Vec, } + +pub struct FilesRequest { + pub predicate_hints: Option>, + pub limit_hint: Option, + pub version: Option, +} \ No newline at end of file diff --git a/src/utils.rs b/src/utils.rs index fbd3821..c98dc55 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -36,13 +36,10 @@ pub struct FileResponse { pub file: File, } -#[derive(Deserialize)] -pub struct FileActionResponse { - pub file: File, -} - #[derive(Deserialize, PartialEq, Serialize)] pub struct FileCache { pub table_files: TableFiles, pub file_paths: Vec, } + + diff --git a/tests/blocking.rs b/tests/blocking.rs index 3827211..028b5f0 100644 --- a/tests/blocking.rs +++ b/tests/blocking.rs @@ -117,11 +117,11 @@ fn get_dataframe() { .unwrap() .to_string(); - let df = c.get_dataframe(&table).unwrap().collect().unwrap(); + let df = c.get_dataframe(&table, None).unwrap().collect().unwrap(); assert_eq!(df.shape(), (5, 3), "Dataframe shape mismatch"); // Get the data again, this time it should be served from the local cache (enforced by Expections set on Mocks) - let df1 = c.get_dataframe(&table).unwrap().collect().unwrap(); + let df1 = c.get_dataframe(&table, None).unwrap().collect().unwrap(); assert_eq!(df1.shape(), (5, 3), "Dataframe shape mismatch"); assert_eq!( df1.get_row(0).0[1], diff --git a/tests/client.rs b/tests/client.rs index a8346ba..b0e11f6 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -193,7 +193,7 @@ async fn list_all_table_files() { let app = create_mocked_test_app(body, &url, method("POST")).await; let files = app .client - .list_table_files(&table, None, None, None) + .list_table_files(&table, None) .await .unwrap(); @@ -264,7 +264,7 @@ async fn get_files() { assert!(!Path::exists(&expected_path), "File should not exist"); - let files = c.get_files(&table).await.unwrap(); + let files = c.get_files(&table, None).await.unwrap(); assert_eq!(files.len(), 1, "File count mismatch"); assert_eq!(files[0], expected_path, "File path mismatch"); @@ -330,11 +330,11 @@ async fn get_dataframe() { .unwrap() .to_string(); - let df = c.get_dataframe(&table).await.unwrap().collect().unwrap(); + let df = c.get_dataframe(&table, None).await.unwrap().collect().unwrap(); assert_eq!(df.shape(), (5, 3), "Dataframe shape mismatch"); // Get the data again, this time it should be served from the local cache (enforced by Expections set on Mocks) - let df1 = c.get_dataframe(&table).await.unwrap().collect().unwrap(); + let df1 = c.get_dataframe(&table, None).await.unwrap().collect().unwrap(); assert_eq!(df1.shape(), (5, 3), "Dataframe shape mismatch"); assert_eq!( df1.get_row(0).0[1], diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 8331554..4cb535c 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -9,6 +9,7 @@ use wiremock::matchers::{path, MethodExactMatcher}; use delta_sharing::protocol::*; use delta_sharing::Client; +#[allow(dead_code)] pub struct TestApp { pub client: Client, pub server: MockServer, @@ -18,6 +19,7 @@ pub const TEST_PROTOCOL_RESPONSE: &str = r#"{ "minReaderVersion": 1 }"#; pub const TEST_METADATA_RESPONSE: &str = r#"{ "id": "cf9c9342-b773-4c7b-a217-037d02ffe5d8", "format": { "provider": "parquet" }, "schemaString": "{\"type\":\"struct\",\"fields\":[{\"name\":\"int_field_1\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"double_field_1\",\"type\":\"double\",\"nullable\":true,\"metadata\":{}}]}", "partitionColumns": [], "configuration": {"conf_1_name": "conf_1_value"} }"#; pub const TEST_FILE_RESPONSE: &str = r#"{ "url": "", "id": "1", "partitionValues": {}, "size": 2350, "stats": "{\"numRecords\":1}" }"#; +#[allow(dead_code)] pub async fn create_test_app() -> TestApp { let _ = env_logger::try_init(); @@ -33,6 +35,7 @@ pub async fn create_test_app() -> TestApp { app } +#[allow(dead_code)] pub async fn create_mocked_test_app( body: &str, url: &str, From aa9e9e1d3c631d922c11244c90b8d37630a59993 Mon Sep 17 00:00:00 2001 From: adalpane Date: Fri, 12 Jul 2024 12:50:47 +0200 Subject: [PATCH 3/8] allow to specify capabilities --- examples/async.rs | 2 +- examples/blocking.rs | 2 +- src/blocking/client.rs | 10 ++++++++-- src/blocking/mod.rs | 2 +- src/client.rs | 12 +++++++++--- src/lib.rs | 2 +- tests/blocking.rs | 2 +- tests/common/mod.rs | 2 +- 8 files changed, 23 insertions(+), 11 deletions(-) diff --git a/examples/async.rs b/examples/async.rs index e1b476b..27a63b3 100644 --- a/examples/async.rs +++ b/examples/async.rs @@ -11,7 +11,7 @@ async fn main() { let conf_str = &fs::read_to_string("./config.json").unwrap(); let config: ProviderConfig = serde_json::from_str(conf_str).expect("Invalid configuration"); - let mut app = Client::new(config, None).await.unwrap(); + let mut app = Client::new(config, None, None).await.unwrap(); let shares = app.list_shares().await.unwrap(); if shares.len() == 0 { println!("At least 1 Delta Share is required"); diff --git a/examples/blocking.rs b/examples/blocking.rs index 0a23ff6..cd9de0e 100644 --- a/examples/blocking.rs +++ b/examples/blocking.rs @@ -10,7 +10,7 @@ fn main() { let conf_str = &fs::read_to_string("./config.json").unwrap(); let config: ProviderConfig = serde_json::from_str(conf_str).expect("Invalid configuration"); - let mut app = Client::new(config, None).unwrap(); + let mut app = Client::new(config, None, None).unwrap(); let shares = app.list_shares().unwrap(); if shares.len() == 0 { println!("At least 1 Delta Share is required"); diff --git a/src/blocking/client.rs b/src/blocking/client.rs index 57986be..21da9ac 100644 --- a/src/blocking/client.rs +++ b/src/blocking/client.rs @@ -30,6 +30,7 @@ impl Client { pub fn new( provider_config: ProviderConfig, data_root: Option, + capabilities: Option> ) -> Result { if provider_config.share_credentials_version > CREDENTIALS_VERSION { return Err(anyhow::anyhow!("'share_credentials_version' in the provider configuration is {}, which is newer than the \ @@ -39,7 +40,7 @@ impl Client { } let cache: HashMap = HashMap::new(); Ok(Self { - http_client: Self::get_client(&provider_config)?, + http_client: Self::get_client(&provider_config, capabilities.unwrap_or_default())?, base_url: Self::build_base_url(&provider_config.endpoint)?, data_root: data_root.unwrap_or( env::temp_dir() @@ -53,7 +54,7 @@ impl Client { }) } - fn get_client(config: &ProviderConfig) -> Result { + fn get_client(config: &ProviderConfig, capabilities: HashMap) -> Result { let rust_version: &str = &format!("{}", rustc_version_runtime::version()); let user_agent: &str = &format!("Delta-Sharing-Rust/{VERSION} Rust/{rust_version}"); let bearer_token = &format!("Bearer {}", config.bearer_token); @@ -68,6 +69,11 @@ impl Client { header::HeaderValue::from_str(user_agent) .map_err(|e| anyhow::anyhow!("Error setting user agent header:{e}"))?, ); + headers.insert( + header::HeaderName::from_static("delta-sharing-capabilities"), + header::HeaderValue::from_str(&capabilities.iter().map(|(k,v)| format!("{k}={v}")).collect::>().join(";")) + .map_err(|e| anyhow::anyhow!("Error setting delta-sharing-capabilities header:{e}"))?, + ); reqwest::blocking::Client::builder() .default_headers(headers) .build() diff --git a/src/blocking/mod.rs b/src/blocking/mod.rs index 89ba623..e855e48 100644 --- a/src/blocking/mod.rs +++ b/src/blocking/mod.rs @@ -26,7 +26,7 @@ //! endpoint: "".to_string(), //! bearer_token: "".to_string(), //! }; -//! let mut app = Client::new(config, None).unwrap(); +//! let mut app = Client::new(config, None, None).unwrap(); //! let shares = app.list_shares().unwrap(); //! if shares.len() == 0 { //! println!("At least 1 Delta Share is required"); diff --git a/src/client.rs b/src/client.rs index 9f8e5b7..2fd9580 100644 --- a/src/client.rs +++ b/src/client.rs @@ -30,6 +30,7 @@ impl Client { pub async fn new( provider_config: ProviderConfig, data_root: Option, + capabilities: Option> ) -> Result { if provider_config.share_credentials_version > CREDENTIALS_VERSION { return Err(anyhow::anyhow!("'share_credentials_version' in the provider configuration is {}, which is newer than the \ @@ -39,7 +40,7 @@ impl Client { } let cache: HashMap = HashMap::new(); Ok(Self { - http_client: Self::get_client(&provider_config)?, + http_client: Self::get_client(&provider_config, capabilities.unwrap_or_default())?, base_url: Self::build_base_url(&provider_config.endpoint)?, data_root: data_root.unwrap_or( env::temp_dir() @@ -53,7 +54,7 @@ impl Client { }) } - fn get_client(config: &ProviderConfig) -> Result { + fn get_client(config: &ProviderConfig, capabilities: HashMap) -> Result { let rust_version: &str = &format!("{}", rustc_version_runtime::version()); let user_agent: &str = &format!("Delta-Sharing-Rust/{VERSION} Rust/{rust_version}"); let bearer_token = &format!("Bearer {}", config.bearer_token); @@ -68,6 +69,11 @@ impl Client { header::HeaderValue::from_str(user_agent) .map_err(|e| anyhow::anyhow!("Error setting user agent header:{e}"))?, ); + headers.insert( + header::HeaderName::from_static("delta-sharing-capabilities"), + header::HeaderValue::from_str(&capabilities.iter().map(|(k,v)| format!("{k}={v}")).collect::>().join(";")) + .map_err(|e| anyhow::anyhow!("Error setting delta-sharing-capabilities header:{e}"))?, + ); reqwest::Client::builder().default_headers(headers).build() .map_err(|e| anyhow::anyhow!("Error building Http client: {e}")) } @@ -371,6 +377,6 @@ mod tests { endpoint: "https://sharing.delta.io/delta-sharing/".to_string(), bearer_token: "token".to_string(), }; - let _ = super::Client::new(config, None).await.unwrap(); + let _ = super::Client::new(config, None, None).await.unwrap(); } } diff --git a/src/lib.rs b/src/lib.rs index f7d2367..2bd2716 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -50,7 +50,7 @@ //! endpoint: "".to_string(), //! bearer_token: "".to_string(), //! }; -//! let mut app = Client::new(config, None).await.unwrap(); +//! let mut app = Client::new(config, None, None).await.unwrap(); //! let shares = app.list_shares().await.unwrap(); //! if shares.len() == 0 { //! println!("At least 1 Delta Share is required"); diff --git a/tests/blocking.rs b/tests/blocking.rs index 028b5f0..5e397fe 100644 --- a/tests/blocking.rs +++ b/tests/blocking.rs @@ -21,7 +21,7 @@ fn create_blocking_test_app() -> BlockingTestApp { endpoint: server.uri(), bearer_token: Uuid::new_v4().to_string(), }; - let client = Client::new(config, None).unwrap(); + let client = Client::new(config, None, None).unwrap(); let test_app = BlockingTestApp { client, server }; test_app } diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 4cb535c..3f1e2a9 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -30,7 +30,7 @@ pub async fn create_test_app() -> TestApp { endpoint: server.uri(), bearer_token: Uuid::new_v4().to_string(), }; - let client = Client::new(config, None).await.unwrap(); + let client = Client::new(config, None, None).await.unwrap(); let app = TestApp { client, server }; app } From 3e25443d39a98b339db3069a56898d295193a216 Mon Sep 17 00:00:00 2001 From: adalpane Date: Fri, 12 Jul 2024 14:09:33 +0200 Subject: [PATCH 4/8] deal with delta protocol response --- src/blocking/client.rs | 25 +++++++++++++--- src/client.rs | 25 +++++++++++++--- src/protocol.rs | 65 ++++++++++++++++++++++++++++++++++++++++-- tests/blocking.rs | 2 +- tests/client.rs | 61 +++++++++++++++++++++++---------------- 5 files changed, 141 insertions(+), 37 deletions(-) diff --git a/src/blocking/client.rs b/src/blocking/client.rs index 21da9ac..3e76ec3 100644 --- a/src/blocking/client.rs +++ b/src/blocking/client.rs @@ -261,9 +261,22 @@ impl Client { fs::create_dir_all(&table_path).map_err(|e| anyhow::anyhow!("Error creating table path: {e}"))?; let mut file_paths: Vec = Vec::new(); for file in table_files.files.clone() { - let dst_path = &table_path.join(format!("{}.snappy.parquet", &file.id)); - let _ = self.download(file.url, &dst_path); - file_paths.push(dst_path.clone()); + match file { + File::Parquet(ParquetFile { id, url, .. }) => { + let dst_path = &table_path.join(format!("{}.snappy.parquet", &id)); + let bytes = self.download(url, &dst_path)?; + debug!("Downloaded {} ({} bytes)", dst_path.display(), bytes); + file_paths.push(dst_path.clone()); + }, + File::Delta( DeltaFile { id, url, ..}) => { + if let Some(url) = url { + let dst_path = &table_path.join(format!("{}.snappy.parquet", &id)); + let bytes = self.download(url, &dst_path)?; + debug!("Downloaded {} ({} bytes)", dst_path.display(), bytes); + file_paths.push(dst_path.clone()); + } + }, + } } Ok(file_paths.clone()) } @@ -283,7 +296,11 @@ impl Client { if !download { let mut file_paths: Vec = Vec::new(); for file in &table_files.files { - let file_path = &table_path.join(format!("{}.snappy.parquet", &file.id)); + let file_id = match file { + File::Parquet(ParquetFile { id, ..}) => id, + File::Delta(DeltaFile { id, .. }) => id + }; + let file_path = &table_path.join(format!("{}.snappy.parquet", &file_id)); if !Path::exists(&file_path) { // File is missing, invalidate cache download = true; diff --git a/src/client.rs b/src/client.rs index 2fd9580..4a6b030 100644 --- a/src/client.rs +++ b/src/client.rs @@ -279,9 +279,22 @@ impl Client { fs::create_dir_all(&table_path).map_err(|e| anyhow::anyhow!("Error creating table path: {e}"))?; let mut file_paths: Vec = Vec::new(); for file in table_files.files.clone() { - let dst_path = &table_path.join(format!("{}.snappy.parquet", &file.id)); - let _ = self.download(file.url, &dst_path).await; - file_paths.push(dst_path.clone()); + match file { + File::Parquet(ParquetFile { id, url, .. }) => { + let dst_path = &table_path.join(format!("{}.snappy.parquet", &id)); + let bytes = self.download(url, &dst_path).await?; + debug!("Downloaded {} ({} bytes)", dst_path.display(), bytes); + file_paths.push(dst_path.clone()); + }, + File::Delta( delta_file) => { + if let Some(url) = delta_file.get_url() { + let dst_path = &table_path.join(format!("{}.snappy.parquet", &delta_file.id)); + let bytes = self.download(url, &dst_path).await?; + debug!("Downloaded {} ({} bytes)", dst_path.display(), bytes); + file_paths.push(dst_path.clone()) + } + }, + } } Ok(file_paths.clone()) } @@ -305,7 +318,11 @@ impl Client { if !download { let mut file_paths: Vec = Vec::new(); for file in &table_files.files { - let file_path = &table_path.join(format!("{}.snappy.parquet", &file.id)); + let file_id = match file { + File::Parquet(ParquetFile { id, ..}) => id, + File::Delta(DeltaFile { id, .. }) => id + }; + let file_path = &table_path.join(format!("{}.snappy.parquet", &file_id)); if !Path::exists(&file_path) { // File is missing, invalidate cache download = true; diff --git a/src/protocol.rs b/src/protocol.rs index 795fc20..37a2321 100644 --- a/src/protocol.rs +++ b/src/protocol.rs @@ -38,8 +38,24 @@ impl Table { #[derive(Deserialize, Debug, Clone, PartialEq, Serialize)] #[serde(rename_all = "camelCase")] -pub struct Protocol { +pub struct DeltaProtocol { pub min_reader_version: i32, + pub min_writer_version: i32, + pub reader_features: Vec, + pub writer_features: Vec +} + +#[derive(Deserialize, Debug, Clone, PartialEq, Serialize)] +#[serde(untagged)] +pub enum Protocol { + Parquet { + #[serde(rename = "minReaderVersion")] + min_reader_version: i32 + }, + Delta { + #[serde(rename = "deltaProtocol")] + delta_protocol: DeltaProtocol + } } #[derive(Deserialize, Debug, Clone, PartialEq, Serialize)] @@ -50,7 +66,7 @@ pub struct Format { #[derive(Deserialize, Debug, Clone, PartialEq, Serialize)] #[serde(rename_all = "camelCase")] -pub struct Metadata { +pub struct ParquetMetadata { pub id: String, pub name: Option, pub description: Option, @@ -60,6 +76,21 @@ pub struct Metadata { pub partition_columns: Vec, } +#[derive(Deserialize, Debug, Clone, PartialEq, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct DeltaMetadata { + pub size: usize, + pub num_files: usize, + pub delta_metadata: ParquetMetadata +} + +#[derive(Deserialize, Debug, Clone, PartialEq, Serialize)] +#[serde(untagged)] +pub enum Metadata { + Parquet(ParquetMetadata), + Delta(DeltaMetadata) +} + #[derive(Deserialize, Debug, Clone, PartialEq, Serialize)] pub struct TableMetadata { pub protocol: Protocol, @@ -68,7 +99,7 @@ pub struct TableMetadata { #[derive(Deserialize, Debug, Clone, PartialEq, Serialize)] #[serde(rename_all = "camelCase")] -pub struct File { +pub struct ParquetFile { pub id: String, pub url: String, pub partition_values: Map, @@ -76,6 +107,34 @@ pub struct File { pub stats: Option, } +#[derive(Deserialize, Debug, Clone, PartialEq, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct DeltaFile { + pub id: String, + pub deletion_vector_file_id: Option, + pub version: Option, + pub timestamp: Option, + pub expiration_timestamp: Option, + pub delta_single_action: Map, +} + +impl DeltaFile { + pub fn get_url(&self) -> Option { + if let Some(value) = self.delta_single_action.get("path") { + return value.as_str().map(|v| v.to_string()); + } else { + return None; + } + } +} + +#[derive(Deserialize, Debug, Clone, PartialEq, Serialize)] +#[serde(untagged)] +pub enum File { + Parquet(ParquetFile), + Delta(DeltaFile) +} + #[derive(Deserialize, Debug, Clone, PartialEq, Serialize)] pub struct TableFiles { pub metadata: TableMetadata, diff --git a/tests/blocking.rs b/tests/blocking.rs index 5e397fe..1f58dc3 100644 --- a/tests/blocking.rs +++ b/tests/blocking.rs @@ -74,7 +74,7 @@ fn get_dataframe() { "shares/{}/schemas/{}/tables/{}/query", table.share, table.schema, table.name ); - let mut file: File = + let mut file: ParquetFile = serde_json::from_str(common::TEST_FILE_RESPONSE).expect("Invalid file info"); let file_url_path = "/shares/test.parquet"; file.url = format!("{}{}", &app.server.uri(), &file_url_path); diff --git a/tests/client.rs b/tests/client.rs index b0e11f6..386283d 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -112,28 +112,36 @@ async fn get_table_metadata() { let app = create_mocked_test_app(body, &url, method("GET")).await; let meta = app.client.get_table_metadata(&table).await.unwrap(); - assert_eq!(meta.protocol.min_reader_version, 1, "Protocol mismatch"); - assert_eq!( - meta.metadata.id, "cf9c9342-b773-4c7b-a217-037d02ffe5d8", - "Metadata ID mismatch" - ); - assert_eq!( - meta.metadata.format.provider, "parquet", - "Metadata format provider mismatch" - ); - assert_eq!( - meta.metadata.name, None, - "Metadata name value should be missing" - ); - assert_eq!( - meta.metadata.partition_columns.len(), - 0, - "There should be no partitions" - ); - assert_eq!( - meta.metadata.configuration["conf_1_name"], "conf_1_value", - "Configuration value expected" - ); + match meta.protocol { + Protocol::Delta { .. } => assert!(false, "Wrong protocol deserialization"), + Protocol::Parquet { min_reader_version } => assert_eq!(min_reader_version, 1, "Protocol mismatch") + }; + match meta.metadata { + Metadata::Delta { .. } => assert!(false, "Wrong metadata deserialization"), + Metadata::Parquet ( ParquetMetadata { id, format, name, partition_columns, configuration, .. }) => { + assert_eq!( + id, "cf9c9342-b773-4c7b-a217-037d02ffe5d8", + "Metadata ID mismatch" + ); + assert_eq!( + format.provider, "parquet", + "Metadata format provider mismatch" + ); + assert_eq!( + name, None, + "Metadata name value should be missing" + ); + assert_eq!( + partition_columns.len(), + 0, + "There should be no partitions" + ); + assert_eq!( + configuration["conf_1_name"], "conf_1_value", + "Configuration value expected" + ); + } + }; } #[tokio::test] @@ -198,7 +206,10 @@ async fn list_all_table_files() { .unwrap(); assert_eq!(files.files.len(), 2, "File count mismatch"); - assert_eq!(files.files[1].id, "2", "File id mismatch"); + match &files.files[1] { + File::Parquet(ParquetFile { id, ..}) => assert_eq!(id, "2", "File id mismatch"), + File::Delta(DeltaFile { .. }) => assert!(false, "Wrong file deserialization") + }; } #[tokio::test] @@ -219,7 +230,7 @@ async fn get_files() { "shares/{}/schemas/{}/tables/{}/query", table.share, table.schema, table.name ); - let mut file: File = + let mut file: ParquetFile = serde_json::from_str(common::TEST_FILE_RESPONSE).expect("Invalid file info"); let file_url_path = "/shares/test.parquet"; file.url = format!("{}{}", &app.server.uri(), &file_url_path); @@ -287,7 +298,7 @@ async fn get_dataframe() { "shares/{}/schemas/{}/tables/{}/query", table.share, table.schema, table.name ); - let mut file: File = + let mut file: ParquetFile = serde_json::from_str(common::TEST_FILE_RESPONSE).expect("Invalid file info"); let file_url_path = "/shares/test.parquet"; file.url = format!("{}{}", &app.server.uri(), &file_url_path); From 42a50cc8a52dccd464d8f08b149bb1a35491e52a Mon Sep 17 00:00:00 2001 From: adalpane Date: Thu, 18 Jul 2024 16:12:45 +0200 Subject: [PATCH 5/8] upgrade dependencies --- Cargo.toml | 6 +++--- src/blocking/client.rs | 11 ++++++----- src/client.rs | 5 +++-- src/reader.rs | 5 ++--- tests/blocking.rs | 4 ++-- tests/client.rs | 4 ++-- 6 files changed, 18 insertions(+), 17 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 0e89c90..a09fea0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,8 +23,8 @@ blocking = ["reqwest/blocking"] [dependencies] tokio = { version = "1", features = ["macros", "rt-multi-thread",] } -parquet = "14.0.0" -reqwest = { version = "0.11", features = ["json"] } +parquet = "52.1.0" +reqwest = { version = "0.12", features = ["json"] } url = "2.2" rustc_version_runtime = "0.1" serde = { version = "1.0", features = ["derive"] } @@ -32,7 +32,7 @@ serde_json = "1.0" anyhow = "1.0" log = "0.4" env_logger = "0.9" -polars = { version = "0.22.8", features = ["lazy", "parquet"] } +polars = { version = "0.41.3", features = ["lazy", "parquet"] } [dev-dependencies] wiremock = "0.5" diff --git a/src/blocking/client.rs b/src/blocking/client.rs index 3e76ec3..0cf15e3 100644 --- a/src/blocking/client.rs +++ b/src/blocking/client.rs @@ -2,7 +2,7 @@ use crate::protocol::*; use crate::reader::*; use crate::utils::*; use parquet::data_type::AsBytes; -use polars::prelude::{LazyFrame, Result as PolarResult}; +use polars::prelude::LazyFrame; use reqwest::{header, header::HeaderValue}; use serde_json::{Map, Number, Value}; use std::collections::HashMap; @@ -268,9 +268,9 @@ impl Client { debug!("Downloaded {} ({} bytes)", dst_path.display(), bytes); file_paths.push(dst_path.clone()); }, - File::Delta( DeltaFile { id, url, ..}) => { - if let Some(url) = url { - let dst_path = &table_path.join(format!("{}.snappy.parquet", &id)); + File::Delta( delta_file) => { + if let Some(url) = delta_file.get_url() { + let dst_path = &table_path.join(format!("{}.snappy.parquet", &delta_file.id)); let bytes = self.download(url, &dst_path)?; debug!("Downloaded {} ({} bytes)", dst_path.display(), bytes); file_paths.push(dst_path.clone()); @@ -352,9 +352,10 @@ impl Client { Ok(self.cache.get(&key).ok_or(anyhow::anyhow!("Error reading {key} from cache"))?.file_paths.clone()) } - pub fn get_dataframe(&mut self, table: &Table, request: Option) -> PolarResult { + pub fn get_dataframe(&mut self, table: &Table, request: Option) -> Result { self.get_files(&table, request)?; let table_path = Path::new(&self.data_root).join(table.fully_qualified_name()); load_parquet_files_as_dataframe(&table_path) + .map_err(|e| anyhow::anyhow!("Error loading parquet files: {e}")) } } diff --git a/src/client.rs b/src/client.rs index 4a6b030..faf9e91 100644 --- a/src/client.rs +++ b/src/client.rs @@ -2,7 +2,7 @@ use crate::protocol::*; use crate::reader::*; use crate::utils::*; use parquet::data_type::AsBytes; -use polars::prelude::{LazyFrame, Result as PolarResult}; +use polars::prelude::LazyFrame; use reqwest::{header, header::HeaderValue}; use serde_json::{Map, Number, Value}; use std::collections::HashMap; @@ -374,10 +374,11 @@ impl Client { Ok(self.cache.get(&key).ok_or(anyhow::anyhow!("Error reading {key} from cache"))?.file_paths.clone()) } - pub async fn get_dataframe(&mut self, table: &Table, request: Option) -> PolarResult { + pub async fn get_dataframe(&mut self, table: &Table, request: Option) -> Result { self.get_files(&table, request).await?; let table_path = Path::new(&self.data_root).join(table.fully_qualified_name()); load_parquet_files_as_dataframe(&table_path) + .map_err(|e| anyhow::anyhow!("Error loading parquet files: {e}")) } } diff --git a/src/reader.rs b/src/reader.rs index 6adbdb2..3434269 100644 --- a/src/reader.rs +++ b/src/reader.rs @@ -1,12 +1,11 @@ -use polars::prelude::Result as PolarResult; use polars::prelude::*; use std::path::PathBuf; -pub fn load_parquet_files_as_dataframe(parquet_root_dir_path: &PathBuf) -> PolarResult { +pub fn load_parquet_files_as_dataframe(parquet_root_dir_path: &PathBuf) -> Result { let search_pattern = parquet_root_dir_path .join("*.parquet") .display() .to_string(); - let res = LazyFrame::scan_parquet(search_pattern.into(), Default::default()); + let res = LazyFrame::scan_parquet(search_pattern, Default::default()); res } diff --git a/tests/blocking.rs b/tests/blocking.rs index 1f58dc3..99e699a 100644 --- a/tests/blocking.rs +++ b/tests/blocking.rs @@ -124,8 +124,8 @@ fn get_dataframe() { let df1 = c.get_dataframe(&table, None).unwrap().collect().unwrap(); assert_eq!(df1.shape(), (5, 3), "Dataframe shape mismatch"); assert_eq!( - df1.get_row(0).0[1], - polars::datatypes::AnyValue::Utf8("One"), + df1.get_row(0).unwrap().0[1], + polars::datatypes::AnyValue::String("One"), "Row value mismatch" ); } diff --git a/tests/client.rs b/tests/client.rs index 386283d..b1891fb 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -348,8 +348,8 @@ async fn get_dataframe() { let df1 = c.get_dataframe(&table, None).await.unwrap().collect().unwrap(); assert_eq!(df1.shape(), (5, 3), "Dataframe shape mismatch"); assert_eq!( - df1.get_row(0).0[1], - polars::datatypes::AnyValue::Utf8("One"), + df1.get_row(0).unwrap().0[1], + polars::datatypes::AnyValue::String("One"), "Row value mismatch" ); } From 0105d08a7375781ce9e16580161830afe4759f03 Mon Sep 17 00:00:00 2001 From: adalpane Date: Mon, 29 Jul 2024 13:53:41 +0200 Subject: [PATCH 6/8] fix error in invalidate cache --- src/blocking/client.rs | 2 +- src/client.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/blocking/client.rs b/src/blocking/client.rs index 0cf15e3..3ff6214 100644 --- a/src/blocking/client.rs +++ b/src/blocking/client.rs @@ -304,7 +304,7 @@ impl Client { if !Path::exists(&file_path) { // File is missing, invalidate cache download = true; - fs::remove_dir(&table_path).map_err(|e| anyhow::anyhow!("Error invalidating cache: {e}"))?; + fs::remove_dir_all(&table_path).map_err(|e| anyhow::anyhow!("Error invalidating cache: {e}"))?; break; } file_paths.push(file_path.clone()); diff --git a/src/client.rs b/src/client.rs index faf9e91..4afc491 100644 --- a/src/client.rs +++ b/src/client.rs @@ -326,7 +326,7 @@ impl Client { if !Path::exists(&file_path) { // File is missing, invalidate cache download = true; - fs::remove_dir(&table_path).map_err(|e| anyhow::anyhow!("Error invalidating cache: {e}"))?; + fs::remove_dir_all(&table_path).map_err(|e| anyhow::anyhow!("Error invalidating cache: {e}"))?; break; } file_paths.push(file_path.clone()); From 1514c2525511badf175f936c9637e429f76b36a7 Mon Sep 17 00:00:00 2001 From: adalpane Date: Fri, 2 Aug 2024 14:28:07 +0200 Subject: [PATCH 7/8] finalize implementation --- src/blocking/client.rs | 9 +++++---- src/client.rs | 7 ++++--- src/protocol.rs | 2 +- 3 files changed, 10 insertions(+), 8 deletions(-) diff --git a/src/blocking/client.rs b/src/blocking/client.rs index 3ff6214..4ad2797 100644 --- a/src/blocking/client.rs +++ b/src/blocking/client.rs @@ -260,19 +260,20 @@ impl Client { } fs::create_dir_all(&table_path).map_err(|e| anyhow::anyhow!("Error creating table path: {e}"))?; let mut file_paths: Vec = Vec::new(); - for file in table_files.files.clone() { + let count = table_files.files.len(); + for (index, file) in table_files.files.clone().into_iter().enumerate() { match file { File::Parquet(ParquetFile { id, url, .. }) => { let dst_path = &table_path.join(format!("{}.snappy.parquet", &id)); let bytes = self.download(url, &dst_path)?; - debug!("Downloaded {} ({} bytes)", dst_path.display(), bytes); + debug!("Downloaded {}/{} {} ({} bytes)", index+1, count, dst_path.display(), bytes); file_paths.push(dst_path.clone()); }, File::Delta( delta_file) => { if let Some(url) = delta_file.get_url() { let dst_path = &table_path.join(format!("{}.snappy.parquet", &delta_file.id)); let bytes = self.download(url, &dst_path)?; - debug!("Downloaded {} ({} bytes)", dst_path.display(), bytes); + debug!("Downloaded {}/{} {} ({} bytes)", index+1, count, dst_path.display(), bytes); file_paths.push(dst_path.clone()); } }, @@ -358,4 +359,4 @@ impl Client { load_parquet_files_as_dataframe(&table_path) .map_err(|e| anyhow::anyhow!("Error loading parquet files: {e}")) } -} +} \ No newline at end of file diff --git a/src/client.rs b/src/client.rs index 4afc491..b70e64e 100644 --- a/src/client.rs +++ b/src/client.rs @@ -278,19 +278,20 @@ impl Client { } fs::create_dir_all(&table_path).map_err(|e| anyhow::anyhow!("Error creating table path: {e}"))?; let mut file_paths: Vec = Vec::new(); - for file in table_files.files.clone() { + let count = table_files.files.len(); + for (index, file) in table_files.files.clone().into_iter().enumerate() { match file { File::Parquet(ParquetFile { id, url, .. }) => { let dst_path = &table_path.join(format!("{}.snappy.parquet", &id)); let bytes = self.download(url, &dst_path).await?; - debug!("Downloaded {} ({} bytes)", dst_path.display(), bytes); + debug!("Downloaded {}/{} {} ({} bytes)", index+1, count, dst_path.display(), bytes); file_paths.push(dst_path.clone()); }, File::Delta( delta_file) => { if let Some(url) = delta_file.get_url() { let dst_path = &table_path.join(format!("{}.snappy.parquet", &delta_file.id)); let bytes = self.download(url, &dst_path).await?; - debug!("Downloaded {} ({} bytes)", dst_path.display(), bytes); + debug!("Downloaded {}/{} {} ({} bytes)", index+1, count, dst_path.display(), bytes); file_paths.push(dst_path.clone()) } }, diff --git a/src/protocol.rs b/src/protocol.rs index 37a2321..af07e8e 100644 --- a/src/protocol.rs +++ b/src/protocol.rs @@ -120,7 +120,7 @@ pub struct DeltaFile { impl DeltaFile { pub fn get_url(&self) -> Option { - if let Some(value) = self.delta_single_action.get("path") { + if let Some(value) = self.delta_single_action.get("add").and_then(|add| add.get("path")) { return value.as_str().map(|v| v.to_string()); } else { return None; From b6a8f18dd97b327111bf6691b64b42ca0199c66f Mon Sep 17 00:00:00 2001 From: adalpane Date: Mon, 26 Aug 2024 12:19:01 +0200 Subject: [PATCH 8/8] fix format --- examples/blocking.rs | 6 +- src/blocking/client.rs | 197 +++++++++++++++++++++++++++------------- src/client.rs | 201 ++++++++++++++++++++++++++++------------- src/protocol.rs | 26 +++--- src/reader.rs | 4 +- src/utils.rs | 2 - tests/client.rs | 48 ++++++---- 7 files changed, 323 insertions(+), 161 deletions(-) diff --git a/examples/blocking.rs b/examples/blocking.rs index cd9de0e..b6bd757 100644 --- a/examples/blocking.rs +++ b/examples/blocking.rs @@ -22,7 +22,11 @@ fn main() { shares[0].name ); } else { - let res = app.get_dataframe(&tables[0], None).unwrap().collect().unwrap(); + let res = app + .get_dataframe(&tables[0], None) + .unwrap() + .collect() + .unwrap(); println!("Dataframe:\n {}", res); } } diff --git a/src/blocking/client.rs b/src/blocking/client.rs index 4ad2797..36be608 100644 --- a/src/blocking/client.rs +++ b/src/blocking/client.rs @@ -30,7 +30,7 @@ impl Client { pub fn new( provider_config: ProviderConfig, data_root: Option, - capabilities: Option> + capabilities: Option>, ) -> Result { if provider_config.share_credentials_version > CREDENTIALS_VERSION { return Err(anyhow::anyhow!("'share_credentials_version' in the provider configuration is {}, which is newer than the \ @@ -54,7 +54,10 @@ impl Client { }) } - fn get_client(config: &ProviderConfig, capabilities: HashMap) -> Result { + fn get_client( + config: &ProviderConfig, + capabilities: HashMap, + ) -> Result { let rust_version: &str = &format!("{}", rustc_version_runtime::version()); let user_agent: &str = &format!("Delta-Sharing-Rust/{VERSION} Rust/{rust_version}"); let bearer_token = &format!("Bearer {}", config.bearer_token); @@ -69,10 +72,16 @@ impl Client { header::HeaderValue::from_str(user_agent) .map_err(|e| anyhow::anyhow!("Error setting user agent header:{e}"))?, ); - headers.insert( + headers.insert( header::HeaderName::from_static("delta-sharing-capabilities"), - header::HeaderValue::from_str(&capabilities.iter().map(|(k,v)| format!("{k}={v}")).collect::>().join(";")) - .map_err(|e| anyhow::anyhow!("Error setting delta-sharing-capabilities header:{e}"))?, + header::HeaderValue::from_str( + &capabilities + .iter() + .map(|(k, v)| format!("{k}={v}")) + .collect::>() + .join(";"), + ) + .map_err(|e| anyhow::anyhow!("Error setting delta-sharing-capabilities header:{e}"))?, ); reqwest::blocking::Client::builder() .default_headers(headers) @@ -87,8 +96,10 @@ impl Client { } fn get(&self, target: &str) -> Result { - let url = self.base_url.join(target) - .map_err(|e| anyhow::anyhow!("Error creating GET url: {e}"))?; + let url = self + .base_url + .join(target) + .map_err(|e| anyhow::anyhow!("Error creating GET url: {e}"))?; debug!("--> HTTP GET to: {}", &url); let resp = self.http_client.get(url.as_str()).send()?; let resp_text = resp.text()?; @@ -97,7 +108,9 @@ impl Client { } fn head(&self, target: &str, key: &str) -> Result, anyhow::Error> { - let url = self.base_url.join(target) + let url = self + .base_url + .join(target) .map_err(|e| anyhow::anyhow!("Error creating HEAD url: {e}"))?; debug!("HTTP HEAD to: {}", &url); let resp = self @@ -113,7 +126,9 @@ impl Client { } fn post(&self, target: &str, json: &Map) -> Result { - let url = self.base_url.join(target) + let url = self + .base_url + .join(target) .map_err(|e| anyhow::anyhow!("Error creating POST url: {e}"))?; debug!("--> HTTP POST to: {}", &url); let resp = self.http_client.post(url.as_str()).json(json).send()?; @@ -128,7 +143,8 @@ impl Client { .map_err(|e| anyhow::anyhow!("Error creating POST url: {e}"))?; let mut out = fs::File::create(dest_path) .map_err(|e| anyhow::anyhow!("Failed to create an output file: {e}"))?; - let content = resp.bytes() + let content = resp + .bytes() .map_err(|e| anyhow::anyhow!("Failed to read download bytes: {e}"))?; io::copy(&mut content.as_bytes(), &mut out) .map_err(|e| anyhow::anyhow!("Failed to save the content to output file: {e}")) @@ -136,13 +152,15 @@ impl Client { pub fn list_shares(&self) -> Result, anyhow::Error> { let shares = self.get("shares")?; - let parsed: ShareResponse = serde_json::from_str(&shares).map_err(|e| anyhow::anyhow!("Invalid list shares response: {e}"))?; + let parsed: ShareResponse = serde_json::from_str(&shares) + .map_err(|e| anyhow::anyhow!("Invalid list shares response: {e}"))?; return Ok(parsed.items.clone()); } pub fn list_schemas(&self, share: &Share) -> Result, anyhow::Error> { let schemas = self.get(&format!("shares/{}/schemas", share.name))?; - let parsed: SchemaResponse = serde_json::from_str(&schemas).map_err(|e| anyhow::anyhow!("Invalid list schemas response: {e}"))?; + let parsed: SchemaResponse = serde_json::from_str(&schemas) + .map_err(|e| anyhow::anyhow!("Invalid list schemas response: {e}"))?; return Ok(parsed.items.clone()); } @@ -151,13 +169,15 @@ impl Client { "shares/{}/schemas/{}/tables", schema.share, schema.name ))?; - let parsed: TableResponse = serde_json::from_str(&tables).map_err(|e| anyhow::anyhow!("Invalid list tables response: {e}"))?; + let parsed: TableResponse = serde_json::from_str(&tables) + .map_err(|e| anyhow::anyhow!("Invalid list tables response: {e}"))?; return Ok(parsed.items.clone()); } pub fn list_all_tables(&self, share: &Share) -> Result, anyhow::Error> { let tables = self.get(&format!("shares/{}/all-tables", share.name))?; - let parsed: TableResponse = serde_json::from_str(&tables).map_err(|e| anyhow::anyhow!("Invalid list all tables response: {e}"))?; + let parsed: TableResponse = serde_json::from_str(&tables) + .map_err(|e| anyhow::anyhow!("Invalid list all tables response: {e}"))?; return Ok(parsed.items.clone()); } @@ -167,14 +187,20 @@ impl Client { table.share, table.schema, table.name ))?; let mut meta_lines = meta.lines(); - let protocol: ProtocolResponse = - meta_lines.next().map(|lines| serde_json::from_str::(lines) - .map_err(|e| anyhow::anyhow!("Invalid protocol response - {lines}: {e}"))) - .unwrap_or(Err(anyhow::anyhow!("Empty protocol response")))?; - let metadata: MetadataResponse = - meta_lines.next().map(|lines| serde_json::from_str::(lines) - .map_err(|e| anyhow::anyhow!("Invalid metadata response - {lines}: {e}"))) - .unwrap_or(Err(anyhow::anyhow!("Empty metadata response")))?; + let protocol: ProtocolResponse = meta_lines + .next() + .map(|lines| { + serde_json::from_str::(lines) + .map_err(|e| anyhow::anyhow!("Invalid protocol response - {lines}: {e}")) + }) + .unwrap_or(Err(anyhow::anyhow!("Empty protocol response")))?; + let metadata: MetadataResponse = meta_lines + .next() + .map(|lines| { + serde_json::from_str::(lines) + .map_err(|e| anyhow::anyhow!("Invalid metadata response - {lines}: {e}")) + }) + .unwrap_or(Err(anyhow::anyhow!("Empty metadata response")))?; Ok(TableMetadata { protocol: protocol.protocol, metadata: metadata.metadata, @@ -191,7 +217,10 @@ impl Client { ); match version { Ok(Some(v)) => v - .to_str().ok().and_then(|value| value.parse::().ok()).unwrap_or(-1), + .to_str() + .ok() + .and_then(|value| value.parse::().ok()) + .unwrap_or(-1), _ => -1, } } @@ -207,8 +236,9 @@ impl Client { "predicateHints".to_string(), Value::Array( predicate_hints - .iter().map(|s| Value::String(s.to_string())) - .collect::>() + .iter() + .map(|s| Value::String(s.to_string())) + .collect::>(), ), ); } @@ -219,10 +249,7 @@ impl Client { ); } if let Some(version) = request.as_ref().and_then(|r| r.version) { - map.insert( - "version".to_string(), - Value::Number(Number::from(version)), - ); + map.insert("version".to_string(), Value::Number(Number::from(version))); } let response = self.post( &format!( @@ -232,17 +259,24 @@ impl Client { &map, )?; let mut lines = response.lines(); - let protocol: ProtocolResponse = - lines.next().map(|lines| serde_json::from_str::(lines) - .map_err(|e| anyhow::anyhow!("Invalid protocol response - {lines}: {e}"))) - .unwrap_or(Err(anyhow::anyhow!("Empty protocol response")))?; - let metadata: MetadataResponse = - lines.next().map(|lines| serde_json::from_str::(lines) - .map_err(|e| anyhow::anyhow!("Invalid metadata response - {lines}: {e}"))) - .unwrap_or(Err(anyhow::anyhow!("Empty metadata response")))?; + let protocol: ProtocolResponse = lines + .next() + .map(|lines| { + serde_json::from_str::(lines) + .map_err(|e| anyhow::anyhow!("Invalid protocol response - {lines}: {e}")) + }) + .unwrap_or(Err(anyhow::anyhow!("Empty protocol response")))?; + let metadata: MetadataResponse = lines + .next() + .map(|lines| { + serde_json::from_str::(lines) + .map_err(|e| anyhow::anyhow!("Invalid metadata response - {lines}: {e}")) + }) + .unwrap_or(Err(anyhow::anyhow!("Empty metadata response")))?; let mut files: Vec = Vec::new(); for l in lines { - let file: FileResponse = serde_json::from_str(l).map_err(|e| anyhow::anyhow!("Invalid file info: {e}"))?; + let file: FileResponse = + serde_json::from_str(l).map_err(|e| anyhow::anyhow!("Invalid file info: {e}"))?; files.push(file.file.clone()); } Ok(TableFiles { @@ -254,11 +288,17 @@ impl Client { }) } - fn download_files(&self, table_path: &PathBuf, table_files: &TableFiles) -> Result, anyhow::Error> { + fn download_files( + &self, + table_path: &PathBuf, + table_files: &TableFiles, + ) -> Result, anyhow::Error> { if Path::exists(&table_path) { - fs::remove_dir_all(&table_path).map_err(|e| anyhow::anyhow!("Error cleaning table path: {e}"))?; + fs::remove_dir_all(&table_path) + .map_err(|e| anyhow::anyhow!("Error cleaning table path: {e}"))?; } - fs::create_dir_all(&table_path).map_err(|e| anyhow::anyhow!("Error creating table path: {e}"))?; + fs::create_dir_all(&table_path) + .map_err(|e| anyhow::anyhow!("Error creating table path: {e}"))?; let mut file_paths: Vec = Vec::new(); let count = table_files.files.len(); for (index, file) in table_files.files.clone().into_iter().enumerate() { @@ -266,46 +306,68 @@ impl Client { File::Parquet(ParquetFile { id, url, .. }) => { let dst_path = &table_path.join(format!("{}.snappy.parquet", &id)); let bytes = self.download(url, &dst_path)?; - debug!("Downloaded {}/{} {} ({} bytes)", index+1, count, dst_path.display(), bytes); + debug!( + "Downloaded {}/{} {} ({} bytes)", + index + 1, + count, + dst_path.display(), + bytes + ); file_paths.push(dst_path.clone()); - }, - File::Delta( delta_file) => { + } + File::Delta(delta_file) => { if let Some(url) = delta_file.get_url() { - let dst_path = &table_path.join(format!("{}.snappy.parquet", &delta_file.id)); + let dst_path = + &table_path.join(format!("{}.snappy.parquet", &delta_file.id)); let bytes = self.download(url, &dst_path)?; - debug!("Downloaded {}/{} {} ({} bytes)", index+1, count, dst_path.display(), bytes); + debug!( + "Downloaded {}/{} {} ({} bytes)", + index + 1, + count, + dst_path.display(), + bytes + ); file_paths.push(dst_path.clone()); } - }, + } } } Ok(file_paths.clone()) } - fn load_cached(&self, table_path: &PathBuf, table_files: &TableFiles) -> Result>, anyhow::Error> { + fn load_cached( + &self, + table_path: &PathBuf, + table_files: &TableFiles, + ) -> Result>, anyhow::Error> { // Check if the files exist, load and compare the files. let metadata_path = &table_path.join(METADATA_FILE); if Path::exists(&metadata_path) { - let metadata_str = &fs::read_to_string(&metadata_path).map_err(|e| anyhow::anyhow!("Error reading file path {}: {}", metadata_path.display(), e))?; - let metadata: TableMetadata = serde_json::from_str(&metadata_str).map_err(|e| anyhow::anyhow!( - "Invalid configuration in {}: {}", - metadata_path.display(), - e - ))?; + let metadata_str = &fs::read_to_string(&metadata_path).map_err(|e| { + anyhow::anyhow!("Error reading file path {}: {}", metadata_path.display(), e) + })?; + let metadata: TableMetadata = serde_json::from_str(&metadata_str).map_err(|e| { + anyhow::anyhow!( + "Invalid configuration in {}: {}", + metadata_path.display(), + e + ) + })?; let mut download = metadata != table_files.metadata; if !download { let mut file_paths: Vec = Vec::new(); for file in &table_files.files { let file_id = match file { - File::Parquet(ParquetFile { id, ..}) => id, - File::Delta(DeltaFile { id, .. }) => id + File::Parquet(ParquetFile { id, .. }) => id, + File::Delta(DeltaFile { id, .. }) => id, }; let file_path = &table_path.join(format!("{}.snappy.parquet", &file_id)); if !Path::exists(&file_path) { // File is missing, invalidate cache download = true; - fs::remove_dir_all(&table_path).map_err(|e| anyhow::anyhow!("Error invalidating cache: {e}"))?; + fs::remove_dir_all(&table_path) + .map_err(|e| anyhow::anyhow!("Error invalidating cache: {e}"))?; break; } file_paths.push(file_path.clone()); @@ -318,7 +380,11 @@ impl Client { Ok(None) } - pub fn get_files(&mut self, table: &Table, request: Option) -> Result, anyhow::Error> { + pub fn get_files( + &mut self, + table: &Table, + request: Option, + ) -> Result, anyhow::Error> { let key = table.fully_qualified_name(); let mut download = true; let table_path = Path::new(&self.data_root).join(table.fully_qualified_name()); @@ -350,13 +416,22 @@ impl Client { }, ); } - Ok(self.cache.get(&key).ok_or(anyhow::anyhow!("Error reading {key} from cache"))?.file_paths.clone()) + Ok(self + .cache + .get(&key) + .ok_or(anyhow::anyhow!("Error reading {key} from cache"))? + .file_paths + .clone()) } - pub fn get_dataframe(&mut self, table: &Table, request: Option) -> Result { + pub fn get_dataframe( + &mut self, + table: &Table, + request: Option, + ) -> Result { self.get_files(&table, request)?; let table_path = Path::new(&self.data_root).join(table.fully_qualified_name()); load_parquet_files_as_dataframe(&table_path) .map_err(|e| anyhow::anyhow!("Error loading parquet files: {e}")) } -} \ No newline at end of file +} diff --git a/src/client.rs b/src/client.rs index b70e64e..6e14bea 100644 --- a/src/client.rs +++ b/src/client.rs @@ -30,7 +30,7 @@ impl Client { pub async fn new( provider_config: ProviderConfig, data_root: Option, - capabilities: Option> + capabilities: Option>, ) -> Result { if provider_config.share_credentials_version > CREDENTIALS_VERSION { return Err(anyhow::anyhow!("'share_credentials_version' in the provider configuration is {}, which is newer than the \ @@ -54,7 +54,10 @@ impl Client { }) } - fn get_client(config: &ProviderConfig, capabilities: HashMap) -> Result { + fn get_client( + config: &ProviderConfig, + capabilities: HashMap, + ) -> Result { let rust_version: &str = &format!("{}", rustc_version_runtime::version()); let user_agent: &str = &format!("Delta-Sharing-Rust/{VERSION} Rust/{rust_version}"); let bearer_token = &format!("Bearer {}", config.bearer_token); @@ -71,10 +74,18 @@ impl Client { ); headers.insert( header::HeaderName::from_static("delta-sharing-capabilities"), - header::HeaderValue::from_str(&capabilities.iter().map(|(k,v)| format!("{k}={v}")).collect::>().join(";")) - .map_err(|e| anyhow::anyhow!("Error setting delta-sharing-capabilities header:{e}"))?, + header::HeaderValue::from_str( + &capabilities + .iter() + .map(|(k, v)| format!("{k}={v}")) + .collect::>() + .join(";"), + ) + .map_err(|e| anyhow::anyhow!("Error setting delta-sharing-capabilities header:{e}"))?, ); - reqwest::Client::builder().default_headers(headers).build() + reqwest::Client::builder() + .default_headers(headers) + .build() .map_err(|e| anyhow::anyhow!("Error building Http client: {e}")) } @@ -85,7 +96,9 @@ impl Client { } async fn get(&self, target: &str) -> Result { - let url = self.base_url.join(target) + let url = self + .base_url + .join(target) .map_err(|e| anyhow::anyhow!("Error creating GET url: {e}"))?; debug!("--> HTTP GET to: {}", &url); let resp = self.http_client.get(url.as_str()).send().await?; @@ -95,7 +108,9 @@ impl Client { } async fn head(&self, target: &str, key: &str) -> Result, anyhow::Error> { - let url = self.base_url.join(target) + let url = self + .base_url + .join(target) .map_err(|e| anyhow::anyhow!("Error creating HEAD url: {e}"))?; debug!("HTTP HEAD to: {}", &url); let resp = self @@ -111,12 +126,10 @@ impl Client { } } - async fn post( - &self, - target: &str, - json: &Map, - ) -> Result { - let url = self.base_url.join(target) + async fn post(&self, target: &str, json: &Map) -> Result { + let url = self + .base_url + .join(target) .map_err(|e| anyhow::anyhow!("Error creating POST url: {e}"))?; debug!("--> HTTP POST to: {}", &url); let resp = self @@ -132,11 +145,14 @@ impl Client { async fn download(&self, url: String, dest_path: &Path) -> Result { debug!("--> Download {} to {}", &url, dest_path.display()); - let resp = reqwest::get(url).await + let resp = reqwest::get(url) + .await .map_err(|e| anyhow::anyhow!("Error creating POST url: {e}"))?; let mut out = fs::File::create(dest_path) .map_err(|e| anyhow::anyhow!("Failed to create an output file: {e}"))?; - let content = resp.bytes().await + let content = resp + .bytes() + .await .map_err(|e| anyhow::anyhow!("Failed to read download bytes: {e}"))?; io::copy(&mut content.as_bytes(), &mut out) .map_err(|e| anyhow::anyhow!("Failed to save the content to output file: {e}")) @@ -144,13 +160,15 @@ impl Client { pub async fn list_shares(&self) -> Result, anyhow::Error> { let shares = self.get("shares").await?; - let parsed: ShareResponse = serde_json::from_str(&shares).map_err(|e| anyhow::anyhow!("Invalid list shares response: {e}"))?; + let parsed: ShareResponse = serde_json::from_str(&shares) + .map_err(|e| anyhow::anyhow!("Invalid list shares response: {e}"))?; return Ok(parsed.items.clone()); } pub async fn list_schemas(&self, share: &Share) -> Result, anyhow::Error> { let schemas = self.get(&format!("shares/{}/schemas", share.name)).await?; - let parsed: SchemaResponse = serde_json::from_str(&schemas).map_err(|e| anyhow::anyhow!("Invalid list schemas response: {e}"))?; + let parsed: SchemaResponse = serde_json::from_str(&schemas) + .map_err(|e| anyhow::anyhow!("Invalid list schemas response: {e}"))?; return Ok(parsed.items.clone()); } @@ -161,7 +179,8 @@ impl Client { schema.share, schema.name )) .await?; - let parsed: TableResponse = serde_json::from_str(&tables).map_err(|e| anyhow::anyhow!("Invalid list tables response: {e}"))?; + let parsed: TableResponse = serde_json::from_str(&tables) + .map_err(|e| anyhow::anyhow!("Invalid list tables response: {e}"))?; return Ok(parsed.items.clone()); } @@ -169,7 +188,8 @@ impl Client { let tables = self .get(&format!("shares/{}/all-tables", share.name)) .await?; - let parsed: TableResponse = serde_json::from_str(&tables).map_err(|e| anyhow::anyhow!("Invalid list all tables response: {e}"))?; + let parsed: TableResponse = serde_json::from_str(&tables) + .map_err(|e| anyhow::anyhow!("Invalid list all tables response: {e}"))?; return Ok(parsed.items.clone()); } @@ -181,14 +201,20 @@ impl Client { )) .await?; let mut meta_lines = meta.lines(); - let protocol: ProtocolResponse = - meta_lines.next().map(|lines| serde_json::from_str::(lines) - .map_err(|e| anyhow::anyhow!("Invalid protocol response - {lines}: {e}"))) - .unwrap_or(Err(anyhow::anyhow!("Empty protocol response")))?; - let metadata: MetadataResponse = - meta_lines.next().map(|lines| serde_json::from_str::(lines) - .map_err(|e| anyhow::anyhow!("Invalid metadata response - {lines}: {e}"))) - .unwrap_or(Err(anyhow::anyhow!("Empty metadata response")))?; + let protocol: ProtocolResponse = meta_lines + .next() + .map(|lines| { + serde_json::from_str::(lines) + .map_err(|e| anyhow::anyhow!("Invalid protocol response - {lines}: {e}")) + }) + .unwrap_or(Err(anyhow::anyhow!("Empty protocol response")))?; + let metadata: MetadataResponse = meta_lines + .next() + .map(|lines| { + serde_json::from_str::(lines) + .map_err(|e| anyhow::anyhow!("Invalid metadata response - {lines}: {e}")) + }) + .unwrap_or(Err(anyhow::anyhow!("Empty metadata response")))?; Ok(TableMetadata { protocol: protocol.protocol, metadata: metadata.metadata, @@ -207,7 +233,10 @@ impl Client { .await; match version { Ok(Some(v)) => v - .to_str().ok().and_then(|value| value.parse::().ok()).unwrap_or(-1), + .to_str() + .ok() + .and_then(|value| value.parse::().ok()) + .unwrap_or(-1), _ => -1, } } @@ -215,7 +244,7 @@ impl Client { pub async fn list_table_files( &self, table: &Table, - request: Option + request: Option, ) -> Result { let mut map = Map::new(); if let Some(predicate_hints) = request.as_ref().and_then(|r| r.predicate_hints.as_ref()) { @@ -223,8 +252,9 @@ impl Client { "predicateHints".to_string(), Value::Array( predicate_hints - .iter().map(|s| Value::String(s.to_string())) - .collect::>() + .iter() + .map(|s| Value::String(s.to_string())) + .collect::>(), ), ); } @@ -235,10 +265,7 @@ impl Client { ); } if let Some(version) = request.as_ref().and_then(|r| r.version) { - map.insert( - "version".to_string(), - Value::Number(Number::from(version)), - ); + map.insert("version".to_string(), Value::Number(Number::from(version))); } let response = self .post( @@ -250,17 +277,24 @@ impl Client { ) .await?; let mut lines = response.lines(); - let protocol: ProtocolResponse = - lines.next().map(|lines| serde_json::from_str::(lines) - .map_err(|e| anyhow::anyhow!("Invalid protocol response - {lines}: {e}"))) - .unwrap_or(Err(anyhow::anyhow!("Empty protocol response")))?; - let metadata: MetadataResponse = - lines.next().map(|lines| serde_json::from_str::(lines) - .map_err(|e| anyhow::anyhow!("Invalid metadata response - {lines}: {e}"))) - .unwrap_or(Err(anyhow::anyhow!("Empty metadata response")))?; + let protocol: ProtocolResponse = lines + .next() + .map(|lines| { + serde_json::from_str::(lines) + .map_err(|e| anyhow::anyhow!("Invalid protocol response - {lines}: {e}")) + }) + .unwrap_or(Err(anyhow::anyhow!("Empty protocol response")))?; + let metadata: MetadataResponse = lines + .next() + .map(|lines| { + serde_json::from_str::(lines) + .map_err(|e| anyhow::anyhow!("Invalid metadata response - {lines}: {e}")) + }) + .unwrap_or(Err(anyhow::anyhow!("Empty metadata response")))?; let mut files: Vec = Vec::new(); for l in lines { - let file: FileResponse = serde_json::from_str(l).map_err(|e| anyhow::anyhow!("Invalid file info: {e}"))?; + let file: FileResponse = + serde_json::from_str(l).map_err(|e| anyhow::anyhow!("Invalid file info: {e}"))?; files.push(file.file.clone()); } Ok(TableFiles { @@ -272,11 +306,17 @@ impl Client { }) } - async fn download_files(&self, table_path: &PathBuf, table_files: &TableFiles) -> Result, anyhow::Error> { + async fn download_files( + &self, + table_path: &PathBuf, + table_files: &TableFiles, + ) -> Result, anyhow::Error> { if Path::exists(&table_path) { - fs::remove_dir_all(&table_path).map_err(|e| anyhow::anyhow!("Error cleaning table path: {e}"))?; + fs::remove_dir_all(&table_path) + .map_err(|e| anyhow::anyhow!("Error cleaning table path: {e}"))?; } - fs::create_dir_all(&table_path).map_err(|e| anyhow::anyhow!("Error creating table path: {e}"))?; + fs::create_dir_all(&table_path) + .map_err(|e| anyhow::anyhow!("Error creating table path: {e}"))?; let mut file_paths: Vec = Vec::new(); let count = table_files.files.len(); for (index, file) in table_files.files.clone().into_iter().enumerate() { @@ -284,17 +324,30 @@ impl Client { File::Parquet(ParquetFile { id, url, .. }) => { let dst_path = &table_path.join(format!("{}.snappy.parquet", &id)); let bytes = self.download(url, &dst_path).await?; - debug!("Downloaded {}/{} {} ({} bytes)", index+1, count, dst_path.display(), bytes); + debug!( + "Downloaded {}/{} {} ({} bytes)", + index + 1, + count, + dst_path.display(), + bytes + ); file_paths.push(dst_path.clone()); - }, - File::Delta( delta_file) => { + } + File::Delta(delta_file) => { if let Some(url) = delta_file.get_url() { - let dst_path = &table_path.join(format!("{}.snappy.parquet", &delta_file.id)); + let dst_path = + &table_path.join(format!("{}.snappy.parquet", &delta_file.id)); let bytes = self.download(url, &dst_path).await?; - debug!("Downloaded {}/{} {} ({} bytes)", index+1, count, dst_path.display(), bytes); + debug!( + "Downloaded {}/{} {} ({} bytes)", + index + 1, + count, + dst_path.display(), + bytes + ); file_paths.push(dst_path.clone()) } - }, + } } } Ok(file_paths.clone()) @@ -308,26 +361,31 @@ impl Client { // Check if the files exist, load and compare the files. let metadata_path = &table_path.join(METADATA_FILE); if Path::exists(&metadata_path) { - let metadata_str = &fs::read_to_string(&metadata_path).map_err(|e| anyhow::anyhow!("Error reading file path {}: {}", metadata_path.display(), e))?; - let metadata: TableMetadata = serde_json::from_str(&metadata_str).map_err(|e| anyhow::anyhow!( - "Invalid configuration in {}: {}", - metadata_path.display(), - e - ))?; + let metadata_str = &fs::read_to_string(&metadata_path).map_err(|e| { + anyhow::anyhow!("Error reading file path {}: {}", metadata_path.display(), e) + })?; + let metadata: TableMetadata = serde_json::from_str(&metadata_str).map_err(|e| { + anyhow::anyhow!( + "Invalid configuration in {}: {}", + metadata_path.display(), + e + ) + })?; let mut download = metadata != table_files.metadata; if !download { let mut file_paths: Vec = Vec::new(); for file in &table_files.files { let file_id = match file { - File::Parquet(ParquetFile { id, ..}) => id, - File::Delta(DeltaFile { id, .. }) => id + File::Parquet(ParquetFile { id, .. }) => id, + File::Delta(DeltaFile { id, .. }) => id, }; let file_path = &table_path.join(format!("{}.snappy.parquet", &file_id)); if !Path::exists(&file_path) { // File is missing, invalidate cache download = true; - fs::remove_dir_all(&table_path).map_err(|e| anyhow::anyhow!("Error invalidating cache: {e}"))?; + fs::remove_dir_all(&table_path) + .map_err(|e| anyhow::anyhow!("Error invalidating cache: {e}"))?; break; } file_paths.push(file_path.clone()); @@ -340,7 +398,11 @@ impl Client { Ok(None) } - pub async fn get_files(&mut self, table: &Table, request: Option) -> Result, anyhow::Error> { + pub async fn get_files( + &mut self, + table: &Table, + request: Option, + ) -> Result, anyhow::Error> { let key = table.fully_qualified_name(); let mut download = true; let table_path = Path::new(&self.data_root).join(table.fully_qualified_name()); @@ -372,10 +434,19 @@ impl Client { }, ); } - Ok(self.cache.get(&key).ok_or(anyhow::anyhow!("Error reading {key} from cache"))?.file_paths.clone()) + Ok(self + .cache + .get(&key) + .ok_or(anyhow::anyhow!("Error reading {key} from cache"))? + .file_paths + .clone()) } - pub async fn get_dataframe(&mut self, table: &Table, request: Option) -> Result { + pub async fn get_dataframe( + &mut self, + table: &Table, + request: Option, + ) -> Result { self.get_files(&table, request).await?; let table_path = Path::new(&self.data_root).join(table.fully_qualified_name()); load_parquet_files_as_dataframe(&table_path) diff --git a/src/protocol.rs b/src/protocol.rs index af07e8e..f01ed94 100644 --- a/src/protocol.rs +++ b/src/protocol.rs @@ -42,20 +42,20 @@ pub struct DeltaProtocol { pub min_reader_version: i32, pub min_writer_version: i32, pub reader_features: Vec, - pub writer_features: Vec + pub writer_features: Vec, } #[derive(Deserialize, Debug, Clone, PartialEq, Serialize)] #[serde(untagged)] pub enum Protocol { - Parquet { + Parquet { #[serde(rename = "minReaderVersion")] - min_reader_version: i32 + min_reader_version: i32, }, - Delta { + Delta { #[serde(rename = "deltaProtocol")] - delta_protocol: DeltaProtocol - } + delta_protocol: DeltaProtocol, + }, } #[derive(Deserialize, Debug, Clone, PartialEq, Serialize)] @@ -81,14 +81,14 @@ pub struct ParquetMetadata { pub struct DeltaMetadata { pub size: usize, pub num_files: usize, - pub delta_metadata: ParquetMetadata + pub delta_metadata: ParquetMetadata, } #[derive(Deserialize, Debug, Clone, PartialEq, Serialize)] #[serde(untagged)] pub enum Metadata { Parquet(ParquetMetadata), - Delta(DeltaMetadata) + Delta(DeltaMetadata), } #[derive(Deserialize, Debug, Clone, PartialEq, Serialize)] @@ -120,7 +120,11 @@ pub struct DeltaFile { impl DeltaFile { pub fn get_url(&self) -> Option { - if let Some(value) = self.delta_single_action.get("add").and_then(|add| add.get("path")) { + if let Some(value) = self + .delta_single_action + .get("add") + .and_then(|add| add.get("path")) + { return value.as_str().map(|v| v.to_string()); } else { return None; @@ -132,7 +136,7 @@ impl DeltaFile { #[serde(untagged)] pub enum File { Parquet(ParquetFile), - Delta(DeltaFile) + Delta(DeltaFile), } #[derive(Deserialize, Debug, Clone, PartialEq, Serialize)] @@ -145,4 +149,4 @@ pub struct FilesRequest { pub predicate_hints: Option>, pub limit_hint: Option, pub version: Option, -} \ No newline at end of file +} diff --git a/src/reader.rs b/src/reader.rs index 3434269..1b7ec90 100644 --- a/src/reader.rs +++ b/src/reader.rs @@ -1,7 +1,9 @@ use polars::prelude::*; use std::path::PathBuf; -pub fn load_parquet_files_as_dataframe(parquet_root_dir_path: &PathBuf) -> Result { +pub fn load_parquet_files_as_dataframe( + parquet_root_dir_path: &PathBuf, +) -> Result { let search_pattern = parquet_root_dir_path .join("*.parquet") .display() diff --git a/src/utils.rs b/src/utils.rs index c98dc55..a93f2f7 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -41,5 +41,3 @@ pub struct FileCache { pub table_files: TableFiles, pub file_paths: Vec, } - - diff --git a/tests/client.rs b/tests/client.rs index b1891fb..0ca5dcf 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -114,11 +114,20 @@ async fn get_table_metadata() { match meta.protocol { Protocol::Delta { .. } => assert!(false, "Wrong protocol deserialization"), - Protocol::Parquet { min_reader_version } => assert_eq!(min_reader_version, 1, "Protocol mismatch") + Protocol::Parquet { min_reader_version } => { + assert_eq!(min_reader_version, 1, "Protocol mismatch") + } }; match meta.metadata { Metadata::Delta { .. } => assert!(false, "Wrong metadata deserialization"), - Metadata::Parquet ( ParquetMetadata { id, format, name, partition_columns, configuration, .. }) => { + Metadata::Parquet(ParquetMetadata { + id, + format, + name, + partition_columns, + configuration, + .. + }) => { assert_eq!( id, "cf9c9342-b773-4c7b-a217-037d02ffe5d8", "Metadata ID mismatch" @@ -127,15 +136,8 @@ async fn get_table_metadata() { format.provider, "parquet", "Metadata format provider mismatch" ); - assert_eq!( - name, None, - "Metadata name value should be missing" - ); - assert_eq!( - partition_columns.len(), - 0, - "There should be no partitions" - ); + assert_eq!(name, None, "Metadata name value should be missing"); + assert_eq!(partition_columns.len(), 0, "There should be no partitions"); assert_eq!( configuration["conf_1_name"], "conf_1_value", "Configuration value expected" @@ -199,16 +201,12 @@ async fn list_all_table_files() { table.share, table.schema, table.name ); let app = create_mocked_test_app(body, &url, method("POST")).await; - let files = app - .client - .list_table_files(&table, None) - .await - .unwrap(); + let files = app.client.list_table_files(&table, None).await.unwrap(); assert_eq!(files.files.len(), 2, "File count mismatch"); match &files.files[1] { - File::Parquet(ParquetFile { id, ..}) => assert_eq!(id, "2", "File id mismatch"), - File::Delta(DeltaFile { .. }) => assert!(false, "Wrong file deserialization") + File::Parquet(ParquetFile { id, .. }) => assert_eq!(id, "2", "File id mismatch"), + File::Delta(DeltaFile { .. }) => assert!(false, "Wrong file deserialization"), }; } @@ -341,11 +339,21 @@ async fn get_dataframe() { .unwrap() .to_string(); - let df = c.get_dataframe(&table, None).await.unwrap().collect().unwrap(); + let df = c + .get_dataframe(&table, None) + .await + .unwrap() + .collect() + .unwrap(); assert_eq!(df.shape(), (5, 3), "Dataframe shape mismatch"); // Get the data again, this time it should be served from the local cache (enforced by Expections set on Mocks) - let df1 = c.get_dataframe(&table, None).await.unwrap().collect().unwrap(); + let df1 = c + .get_dataframe(&table, None) + .await + .unwrap() + .collect() + .unwrap(); assert_eq!(df1.shape(), (5, 3), "Dataframe shape mismatch"); assert_eq!( df1.get_row(0).unwrap().0[1],