From b6a8f18dd97b327111bf6691b64b42ca0199c66f Mon Sep 17 00:00:00 2001 From: adalpane Date: Mon, 26 Aug 2024 12:19:01 +0200 Subject: [PATCH] fix format --- examples/blocking.rs | 6 +- src/blocking/client.rs | 197 +++++++++++++++++++++++++++------------- src/client.rs | 201 ++++++++++++++++++++++++++++------------- src/protocol.rs | 26 +++--- src/reader.rs | 4 +- src/utils.rs | 2 - tests/client.rs | 48 ++++++---- 7 files changed, 323 insertions(+), 161 deletions(-) diff --git a/examples/blocking.rs b/examples/blocking.rs index cd9de0e..b6bd757 100644 --- a/examples/blocking.rs +++ b/examples/blocking.rs @@ -22,7 +22,11 @@ fn main() { shares[0].name ); } else { - let res = app.get_dataframe(&tables[0], None).unwrap().collect().unwrap(); + let res = app + .get_dataframe(&tables[0], None) + .unwrap() + .collect() + .unwrap(); println!("Dataframe:\n {}", res); } } diff --git a/src/blocking/client.rs b/src/blocking/client.rs index 4ad2797..36be608 100644 --- a/src/blocking/client.rs +++ b/src/blocking/client.rs @@ -30,7 +30,7 @@ impl Client { pub fn new( provider_config: ProviderConfig, data_root: Option, - capabilities: Option> + capabilities: Option>, ) -> Result { if provider_config.share_credentials_version > CREDENTIALS_VERSION { return Err(anyhow::anyhow!("'share_credentials_version' in the provider configuration is {}, which is newer than the \ @@ -54,7 +54,10 @@ impl Client { }) } - fn get_client(config: &ProviderConfig, capabilities: HashMap) -> Result { + fn get_client( + config: &ProviderConfig, + capabilities: HashMap, + ) -> Result { let rust_version: &str = &format!("{}", rustc_version_runtime::version()); let user_agent: &str = &format!("Delta-Sharing-Rust/{VERSION} Rust/{rust_version}"); let bearer_token = &format!("Bearer {}", config.bearer_token); @@ -69,10 +72,16 @@ impl Client { header::HeaderValue::from_str(user_agent) .map_err(|e| anyhow::anyhow!("Error setting user agent header:{e}"))?, ); - headers.insert( + headers.insert( header::HeaderName::from_static("delta-sharing-capabilities"), - header::HeaderValue::from_str(&capabilities.iter().map(|(k,v)| format!("{k}={v}")).collect::>().join(";")) - .map_err(|e| anyhow::anyhow!("Error setting delta-sharing-capabilities header:{e}"))?, + header::HeaderValue::from_str( + &capabilities + .iter() + .map(|(k, v)| format!("{k}={v}")) + .collect::>() + .join(";"), + ) + .map_err(|e| anyhow::anyhow!("Error setting delta-sharing-capabilities header:{e}"))?, ); reqwest::blocking::Client::builder() .default_headers(headers) @@ -87,8 +96,10 @@ impl Client { } fn get(&self, target: &str) -> Result { - let url = self.base_url.join(target) - .map_err(|e| anyhow::anyhow!("Error creating GET url: {e}"))?; + let url = self + .base_url + .join(target) + .map_err(|e| anyhow::anyhow!("Error creating GET url: {e}"))?; debug!("--> HTTP GET to: {}", &url); let resp = self.http_client.get(url.as_str()).send()?; let resp_text = resp.text()?; @@ -97,7 +108,9 @@ impl Client { } fn head(&self, target: &str, key: &str) -> Result, anyhow::Error> { - let url = self.base_url.join(target) + let url = self + .base_url + .join(target) .map_err(|e| anyhow::anyhow!("Error creating HEAD url: {e}"))?; debug!("HTTP HEAD to: {}", &url); let resp = self @@ -113,7 +126,9 @@ impl Client { } fn post(&self, target: &str, json: &Map) -> Result { - let url = self.base_url.join(target) + let url = self + .base_url + .join(target) .map_err(|e| anyhow::anyhow!("Error creating POST url: {e}"))?; debug!("--> HTTP POST to: {}", &url); let resp = self.http_client.post(url.as_str()).json(json).send()?; @@ -128,7 +143,8 @@ impl Client { .map_err(|e| anyhow::anyhow!("Error creating POST url: {e}"))?; let mut out = fs::File::create(dest_path) .map_err(|e| anyhow::anyhow!("Failed to create an output file: {e}"))?; - let content = resp.bytes() + let content = resp + .bytes() .map_err(|e| anyhow::anyhow!("Failed to read download bytes: {e}"))?; io::copy(&mut content.as_bytes(), &mut out) .map_err(|e| anyhow::anyhow!("Failed to save the content to output file: {e}")) @@ -136,13 +152,15 @@ impl Client { pub fn list_shares(&self) -> Result, anyhow::Error> { let shares = self.get("shares")?; - let parsed: ShareResponse = serde_json::from_str(&shares).map_err(|e| anyhow::anyhow!("Invalid list shares response: {e}"))?; + let parsed: ShareResponse = serde_json::from_str(&shares) + .map_err(|e| anyhow::anyhow!("Invalid list shares response: {e}"))?; return Ok(parsed.items.clone()); } pub fn list_schemas(&self, share: &Share) -> Result, anyhow::Error> { let schemas = self.get(&format!("shares/{}/schemas", share.name))?; - let parsed: SchemaResponse = serde_json::from_str(&schemas).map_err(|e| anyhow::anyhow!("Invalid list schemas response: {e}"))?; + let parsed: SchemaResponse = serde_json::from_str(&schemas) + .map_err(|e| anyhow::anyhow!("Invalid list schemas response: {e}"))?; return Ok(parsed.items.clone()); } @@ -151,13 +169,15 @@ impl Client { "shares/{}/schemas/{}/tables", schema.share, schema.name ))?; - let parsed: TableResponse = serde_json::from_str(&tables).map_err(|e| anyhow::anyhow!("Invalid list tables response: {e}"))?; + let parsed: TableResponse = serde_json::from_str(&tables) + .map_err(|e| anyhow::anyhow!("Invalid list tables response: {e}"))?; return Ok(parsed.items.clone()); } pub fn list_all_tables(&self, share: &Share) -> Result, anyhow::Error> { let tables = self.get(&format!("shares/{}/all-tables", share.name))?; - let parsed: TableResponse = serde_json::from_str(&tables).map_err(|e| anyhow::anyhow!("Invalid list all tables response: {e}"))?; + let parsed: TableResponse = serde_json::from_str(&tables) + .map_err(|e| anyhow::anyhow!("Invalid list all tables response: {e}"))?; return Ok(parsed.items.clone()); } @@ -167,14 +187,20 @@ impl Client { table.share, table.schema, table.name ))?; let mut meta_lines = meta.lines(); - let protocol: ProtocolResponse = - meta_lines.next().map(|lines| serde_json::from_str::(lines) - .map_err(|e| anyhow::anyhow!("Invalid protocol response - {lines}: {e}"))) - .unwrap_or(Err(anyhow::anyhow!("Empty protocol response")))?; - let metadata: MetadataResponse = - meta_lines.next().map(|lines| serde_json::from_str::(lines) - .map_err(|e| anyhow::anyhow!("Invalid metadata response - {lines}: {e}"))) - .unwrap_or(Err(anyhow::anyhow!("Empty metadata response")))?; + let protocol: ProtocolResponse = meta_lines + .next() + .map(|lines| { + serde_json::from_str::(lines) + .map_err(|e| anyhow::anyhow!("Invalid protocol response - {lines}: {e}")) + }) + .unwrap_or(Err(anyhow::anyhow!("Empty protocol response")))?; + let metadata: MetadataResponse = meta_lines + .next() + .map(|lines| { + serde_json::from_str::(lines) + .map_err(|e| anyhow::anyhow!("Invalid metadata response - {lines}: {e}")) + }) + .unwrap_or(Err(anyhow::anyhow!("Empty metadata response")))?; Ok(TableMetadata { protocol: protocol.protocol, metadata: metadata.metadata, @@ -191,7 +217,10 @@ impl Client { ); match version { Ok(Some(v)) => v - .to_str().ok().and_then(|value| value.parse::().ok()).unwrap_or(-1), + .to_str() + .ok() + .and_then(|value| value.parse::().ok()) + .unwrap_or(-1), _ => -1, } } @@ -207,8 +236,9 @@ impl Client { "predicateHints".to_string(), Value::Array( predicate_hints - .iter().map(|s| Value::String(s.to_string())) - .collect::>() + .iter() + .map(|s| Value::String(s.to_string())) + .collect::>(), ), ); } @@ -219,10 +249,7 @@ impl Client { ); } if let Some(version) = request.as_ref().and_then(|r| r.version) { - map.insert( - "version".to_string(), - Value::Number(Number::from(version)), - ); + map.insert("version".to_string(), Value::Number(Number::from(version))); } let response = self.post( &format!( @@ -232,17 +259,24 @@ impl Client { &map, )?; let mut lines = response.lines(); - let protocol: ProtocolResponse = - lines.next().map(|lines| serde_json::from_str::(lines) - .map_err(|e| anyhow::anyhow!("Invalid protocol response - {lines}: {e}"))) - .unwrap_or(Err(anyhow::anyhow!("Empty protocol response")))?; - let metadata: MetadataResponse = - lines.next().map(|lines| serde_json::from_str::(lines) - .map_err(|e| anyhow::anyhow!("Invalid metadata response - {lines}: {e}"))) - .unwrap_or(Err(anyhow::anyhow!("Empty metadata response")))?; + let protocol: ProtocolResponse = lines + .next() + .map(|lines| { + serde_json::from_str::(lines) + .map_err(|e| anyhow::anyhow!("Invalid protocol response - {lines}: {e}")) + }) + .unwrap_or(Err(anyhow::anyhow!("Empty protocol response")))?; + let metadata: MetadataResponse = lines + .next() + .map(|lines| { + serde_json::from_str::(lines) + .map_err(|e| anyhow::anyhow!("Invalid metadata response - {lines}: {e}")) + }) + .unwrap_or(Err(anyhow::anyhow!("Empty metadata response")))?; let mut files: Vec = Vec::new(); for l in lines { - let file: FileResponse = serde_json::from_str(l).map_err(|e| anyhow::anyhow!("Invalid file info: {e}"))?; + let file: FileResponse = + serde_json::from_str(l).map_err(|e| anyhow::anyhow!("Invalid file info: {e}"))?; files.push(file.file.clone()); } Ok(TableFiles { @@ -254,11 +288,17 @@ impl Client { }) } - fn download_files(&self, table_path: &PathBuf, table_files: &TableFiles) -> Result, anyhow::Error> { + fn download_files( + &self, + table_path: &PathBuf, + table_files: &TableFiles, + ) -> Result, anyhow::Error> { if Path::exists(&table_path) { - fs::remove_dir_all(&table_path).map_err(|e| anyhow::anyhow!("Error cleaning table path: {e}"))?; + fs::remove_dir_all(&table_path) + .map_err(|e| anyhow::anyhow!("Error cleaning table path: {e}"))?; } - fs::create_dir_all(&table_path).map_err(|e| anyhow::anyhow!("Error creating table path: {e}"))?; + fs::create_dir_all(&table_path) + .map_err(|e| anyhow::anyhow!("Error creating table path: {e}"))?; let mut file_paths: Vec = Vec::new(); let count = table_files.files.len(); for (index, file) in table_files.files.clone().into_iter().enumerate() { @@ -266,46 +306,68 @@ impl Client { File::Parquet(ParquetFile { id, url, .. }) => { let dst_path = &table_path.join(format!("{}.snappy.parquet", &id)); let bytes = self.download(url, &dst_path)?; - debug!("Downloaded {}/{} {} ({} bytes)", index+1, count, dst_path.display(), bytes); + debug!( + "Downloaded {}/{} {} ({} bytes)", + index + 1, + count, + dst_path.display(), + bytes + ); file_paths.push(dst_path.clone()); - }, - File::Delta( delta_file) => { + } + File::Delta(delta_file) => { if let Some(url) = delta_file.get_url() { - let dst_path = &table_path.join(format!("{}.snappy.parquet", &delta_file.id)); + let dst_path = + &table_path.join(format!("{}.snappy.parquet", &delta_file.id)); let bytes = self.download(url, &dst_path)?; - debug!("Downloaded {}/{} {} ({} bytes)", index+1, count, dst_path.display(), bytes); + debug!( + "Downloaded {}/{} {} ({} bytes)", + index + 1, + count, + dst_path.display(), + bytes + ); file_paths.push(dst_path.clone()); } - }, + } } } Ok(file_paths.clone()) } - fn load_cached(&self, table_path: &PathBuf, table_files: &TableFiles) -> Result>, anyhow::Error> { + fn load_cached( + &self, + table_path: &PathBuf, + table_files: &TableFiles, + ) -> Result>, anyhow::Error> { // Check if the files exist, load and compare the files. let metadata_path = &table_path.join(METADATA_FILE); if Path::exists(&metadata_path) { - let metadata_str = &fs::read_to_string(&metadata_path).map_err(|e| anyhow::anyhow!("Error reading file path {}: {}", metadata_path.display(), e))?; - let metadata: TableMetadata = serde_json::from_str(&metadata_str).map_err(|e| anyhow::anyhow!( - "Invalid configuration in {}: {}", - metadata_path.display(), - e - ))?; + let metadata_str = &fs::read_to_string(&metadata_path).map_err(|e| { + anyhow::anyhow!("Error reading file path {}: {}", metadata_path.display(), e) + })?; + let metadata: TableMetadata = serde_json::from_str(&metadata_str).map_err(|e| { + anyhow::anyhow!( + "Invalid configuration in {}: {}", + metadata_path.display(), + e + ) + })?; let mut download = metadata != table_files.metadata; if !download { let mut file_paths: Vec = Vec::new(); for file in &table_files.files { let file_id = match file { - File::Parquet(ParquetFile { id, ..}) => id, - File::Delta(DeltaFile { id, .. }) => id + File::Parquet(ParquetFile { id, .. }) => id, + File::Delta(DeltaFile { id, .. }) => id, }; let file_path = &table_path.join(format!("{}.snappy.parquet", &file_id)); if !Path::exists(&file_path) { // File is missing, invalidate cache download = true; - fs::remove_dir_all(&table_path).map_err(|e| anyhow::anyhow!("Error invalidating cache: {e}"))?; + fs::remove_dir_all(&table_path) + .map_err(|e| anyhow::anyhow!("Error invalidating cache: {e}"))?; break; } file_paths.push(file_path.clone()); @@ -318,7 +380,11 @@ impl Client { Ok(None) } - pub fn get_files(&mut self, table: &Table, request: Option) -> Result, anyhow::Error> { + pub fn get_files( + &mut self, + table: &Table, + request: Option, + ) -> Result, anyhow::Error> { let key = table.fully_qualified_name(); let mut download = true; let table_path = Path::new(&self.data_root).join(table.fully_qualified_name()); @@ -350,13 +416,22 @@ impl Client { }, ); } - Ok(self.cache.get(&key).ok_or(anyhow::anyhow!("Error reading {key} from cache"))?.file_paths.clone()) + Ok(self + .cache + .get(&key) + .ok_or(anyhow::anyhow!("Error reading {key} from cache"))? + .file_paths + .clone()) } - pub fn get_dataframe(&mut self, table: &Table, request: Option) -> Result { + pub fn get_dataframe( + &mut self, + table: &Table, + request: Option, + ) -> Result { self.get_files(&table, request)?; let table_path = Path::new(&self.data_root).join(table.fully_qualified_name()); load_parquet_files_as_dataframe(&table_path) .map_err(|e| anyhow::anyhow!("Error loading parquet files: {e}")) } -} \ No newline at end of file +} diff --git a/src/client.rs b/src/client.rs index b70e64e..6e14bea 100644 --- a/src/client.rs +++ b/src/client.rs @@ -30,7 +30,7 @@ impl Client { pub async fn new( provider_config: ProviderConfig, data_root: Option, - capabilities: Option> + capabilities: Option>, ) -> Result { if provider_config.share_credentials_version > CREDENTIALS_VERSION { return Err(anyhow::anyhow!("'share_credentials_version' in the provider configuration is {}, which is newer than the \ @@ -54,7 +54,10 @@ impl Client { }) } - fn get_client(config: &ProviderConfig, capabilities: HashMap) -> Result { + fn get_client( + config: &ProviderConfig, + capabilities: HashMap, + ) -> Result { let rust_version: &str = &format!("{}", rustc_version_runtime::version()); let user_agent: &str = &format!("Delta-Sharing-Rust/{VERSION} Rust/{rust_version}"); let bearer_token = &format!("Bearer {}", config.bearer_token); @@ -71,10 +74,18 @@ impl Client { ); headers.insert( header::HeaderName::from_static("delta-sharing-capabilities"), - header::HeaderValue::from_str(&capabilities.iter().map(|(k,v)| format!("{k}={v}")).collect::>().join(";")) - .map_err(|e| anyhow::anyhow!("Error setting delta-sharing-capabilities header:{e}"))?, + header::HeaderValue::from_str( + &capabilities + .iter() + .map(|(k, v)| format!("{k}={v}")) + .collect::>() + .join(";"), + ) + .map_err(|e| anyhow::anyhow!("Error setting delta-sharing-capabilities header:{e}"))?, ); - reqwest::Client::builder().default_headers(headers).build() + reqwest::Client::builder() + .default_headers(headers) + .build() .map_err(|e| anyhow::anyhow!("Error building Http client: {e}")) } @@ -85,7 +96,9 @@ impl Client { } async fn get(&self, target: &str) -> Result { - let url = self.base_url.join(target) + let url = self + .base_url + .join(target) .map_err(|e| anyhow::anyhow!("Error creating GET url: {e}"))?; debug!("--> HTTP GET to: {}", &url); let resp = self.http_client.get(url.as_str()).send().await?; @@ -95,7 +108,9 @@ impl Client { } async fn head(&self, target: &str, key: &str) -> Result, anyhow::Error> { - let url = self.base_url.join(target) + let url = self + .base_url + .join(target) .map_err(|e| anyhow::anyhow!("Error creating HEAD url: {e}"))?; debug!("HTTP HEAD to: {}", &url); let resp = self @@ -111,12 +126,10 @@ impl Client { } } - async fn post( - &self, - target: &str, - json: &Map, - ) -> Result { - let url = self.base_url.join(target) + async fn post(&self, target: &str, json: &Map) -> Result { + let url = self + .base_url + .join(target) .map_err(|e| anyhow::anyhow!("Error creating POST url: {e}"))?; debug!("--> HTTP POST to: {}", &url); let resp = self @@ -132,11 +145,14 @@ impl Client { async fn download(&self, url: String, dest_path: &Path) -> Result { debug!("--> Download {} to {}", &url, dest_path.display()); - let resp = reqwest::get(url).await + let resp = reqwest::get(url) + .await .map_err(|e| anyhow::anyhow!("Error creating POST url: {e}"))?; let mut out = fs::File::create(dest_path) .map_err(|e| anyhow::anyhow!("Failed to create an output file: {e}"))?; - let content = resp.bytes().await + let content = resp + .bytes() + .await .map_err(|e| anyhow::anyhow!("Failed to read download bytes: {e}"))?; io::copy(&mut content.as_bytes(), &mut out) .map_err(|e| anyhow::anyhow!("Failed to save the content to output file: {e}")) @@ -144,13 +160,15 @@ impl Client { pub async fn list_shares(&self) -> Result, anyhow::Error> { let shares = self.get("shares").await?; - let parsed: ShareResponse = serde_json::from_str(&shares).map_err(|e| anyhow::anyhow!("Invalid list shares response: {e}"))?; + let parsed: ShareResponse = serde_json::from_str(&shares) + .map_err(|e| anyhow::anyhow!("Invalid list shares response: {e}"))?; return Ok(parsed.items.clone()); } pub async fn list_schemas(&self, share: &Share) -> Result, anyhow::Error> { let schemas = self.get(&format!("shares/{}/schemas", share.name)).await?; - let parsed: SchemaResponse = serde_json::from_str(&schemas).map_err(|e| anyhow::anyhow!("Invalid list schemas response: {e}"))?; + let parsed: SchemaResponse = serde_json::from_str(&schemas) + .map_err(|e| anyhow::anyhow!("Invalid list schemas response: {e}"))?; return Ok(parsed.items.clone()); } @@ -161,7 +179,8 @@ impl Client { schema.share, schema.name )) .await?; - let parsed: TableResponse = serde_json::from_str(&tables).map_err(|e| anyhow::anyhow!("Invalid list tables response: {e}"))?; + let parsed: TableResponse = serde_json::from_str(&tables) + .map_err(|e| anyhow::anyhow!("Invalid list tables response: {e}"))?; return Ok(parsed.items.clone()); } @@ -169,7 +188,8 @@ impl Client { let tables = self .get(&format!("shares/{}/all-tables", share.name)) .await?; - let parsed: TableResponse = serde_json::from_str(&tables).map_err(|e| anyhow::anyhow!("Invalid list all tables response: {e}"))?; + let parsed: TableResponse = serde_json::from_str(&tables) + .map_err(|e| anyhow::anyhow!("Invalid list all tables response: {e}"))?; return Ok(parsed.items.clone()); } @@ -181,14 +201,20 @@ impl Client { )) .await?; let mut meta_lines = meta.lines(); - let protocol: ProtocolResponse = - meta_lines.next().map(|lines| serde_json::from_str::(lines) - .map_err(|e| anyhow::anyhow!("Invalid protocol response - {lines}: {e}"))) - .unwrap_or(Err(anyhow::anyhow!("Empty protocol response")))?; - let metadata: MetadataResponse = - meta_lines.next().map(|lines| serde_json::from_str::(lines) - .map_err(|e| anyhow::anyhow!("Invalid metadata response - {lines}: {e}"))) - .unwrap_or(Err(anyhow::anyhow!("Empty metadata response")))?; + let protocol: ProtocolResponse = meta_lines + .next() + .map(|lines| { + serde_json::from_str::(lines) + .map_err(|e| anyhow::anyhow!("Invalid protocol response - {lines}: {e}")) + }) + .unwrap_or(Err(anyhow::anyhow!("Empty protocol response")))?; + let metadata: MetadataResponse = meta_lines + .next() + .map(|lines| { + serde_json::from_str::(lines) + .map_err(|e| anyhow::anyhow!("Invalid metadata response - {lines}: {e}")) + }) + .unwrap_or(Err(anyhow::anyhow!("Empty metadata response")))?; Ok(TableMetadata { protocol: protocol.protocol, metadata: metadata.metadata, @@ -207,7 +233,10 @@ impl Client { .await; match version { Ok(Some(v)) => v - .to_str().ok().and_then(|value| value.parse::().ok()).unwrap_or(-1), + .to_str() + .ok() + .and_then(|value| value.parse::().ok()) + .unwrap_or(-1), _ => -1, } } @@ -215,7 +244,7 @@ impl Client { pub async fn list_table_files( &self, table: &Table, - request: Option + request: Option, ) -> Result { let mut map = Map::new(); if let Some(predicate_hints) = request.as_ref().and_then(|r| r.predicate_hints.as_ref()) { @@ -223,8 +252,9 @@ impl Client { "predicateHints".to_string(), Value::Array( predicate_hints - .iter().map(|s| Value::String(s.to_string())) - .collect::>() + .iter() + .map(|s| Value::String(s.to_string())) + .collect::>(), ), ); } @@ -235,10 +265,7 @@ impl Client { ); } if let Some(version) = request.as_ref().and_then(|r| r.version) { - map.insert( - "version".to_string(), - Value::Number(Number::from(version)), - ); + map.insert("version".to_string(), Value::Number(Number::from(version))); } let response = self .post( @@ -250,17 +277,24 @@ impl Client { ) .await?; let mut lines = response.lines(); - let protocol: ProtocolResponse = - lines.next().map(|lines| serde_json::from_str::(lines) - .map_err(|e| anyhow::anyhow!("Invalid protocol response - {lines}: {e}"))) - .unwrap_or(Err(anyhow::anyhow!("Empty protocol response")))?; - let metadata: MetadataResponse = - lines.next().map(|lines| serde_json::from_str::(lines) - .map_err(|e| anyhow::anyhow!("Invalid metadata response - {lines}: {e}"))) - .unwrap_or(Err(anyhow::anyhow!("Empty metadata response")))?; + let protocol: ProtocolResponse = lines + .next() + .map(|lines| { + serde_json::from_str::(lines) + .map_err(|e| anyhow::anyhow!("Invalid protocol response - {lines}: {e}")) + }) + .unwrap_or(Err(anyhow::anyhow!("Empty protocol response")))?; + let metadata: MetadataResponse = lines + .next() + .map(|lines| { + serde_json::from_str::(lines) + .map_err(|e| anyhow::anyhow!("Invalid metadata response - {lines}: {e}")) + }) + .unwrap_or(Err(anyhow::anyhow!("Empty metadata response")))?; let mut files: Vec = Vec::new(); for l in lines { - let file: FileResponse = serde_json::from_str(l).map_err(|e| anyhow::anyhow!("Invalid file info: {e}"))?; + let file: FileResponse = + serde_json::from_str(l).map_err(|e| anyhow::anyhow!("Invalid file info: {e}"))?; files.push(file.file.clone()); } Ok(TableFiles { @@ -272,11 +306,17 @@ impl Client { }) } - async fn download_files(&self, table_path: &PathBuf, table_files: &TableFiles) -> Result, anyhow::Error> { + async fn download_files( + &self, + table_path: &PathBuf, + table_files: &TableFiles, + ) -> Result, anyhow::Error> { if Path::exists(&table_path) { - fs::remove_dir_all(&table_path).map_err(|e| anyhow::anyhow!("Error cleaning table path: {e}"))?; + fs::remove_dir_all(&table_path) + .map_err(|e| anyhow::anyhow!("Error cleaning table path: {e}"))?; } - fs::create_dir_all(&table_path).map_err(|e| anyhow::anyhow!("Error creating table path: {e}"))?; + fs::create_dir_all(&table_path) + .map_err(|e| anyhow::anyhow!("Error creating table path: {e}"))?; let mut file_paths: Vec = Vec::new(); let count = table_files.files.len(); for (index, file) in table_files.files.clone().into_iter().enumerate() { @@ -284,17 +324,30 @@ impl Client { File::Parquet(ParquetFile { id, url, .. }) => { let dst_path = &table_path.join(format!("{}.snappy.parquet", &id)); let bytes = self.download(url, &dst_path).await?; - debug!("Downloaded {}/{} {} ({} bytes)", index+1, count, dst_path.display(), bytes); + debug!( + "Downloaded {}/{} {} ({} bytes)", + index + 1, + count, + dst_path.display(), + bytes + ); file_paths.push(dst_path.clone()); - }, - File::Delta( delta_file) => { + } + File::Delta(delta_file) => { if let Some(url) = delta_file.get_url() { - let dst_path = &table_path.join(format!("{}.snappy.parquet", &delta_file.id)); + let dst_path = + &table_path.join(format!("{}.snappy.parquet", &delta_file.id)); let bytes = self.download(url, &dst_path).await?; - debug!("Downloaded {}/{} {} ({} bytes)", index+1, count, dst_path.display(), bytes); + debug!( + "Downloaded {}/{} {} ({} bytes)", + index + 1, + count, + dst_path.display(), + bytes + ); file_paths.push(dst_path.clone()) } - }, + } } } Ok(file_paths.clone()) @@ -308,26 +361,31 @@ impl Client { // Check if the files exist, load and compare the files. let metadata_path = &table_path.join(METADATA_FILE); if Path::exists(&metadata_path) { - let metadata_str = &fs::read_to_string(&metadata_path).map_err(|e| anyhow::anyhow!("Error reading file path {}: {}", metadata_path.display(), e))?; - let metadata: TableMetadata = serde_json::from_str(&metadata_str).map_err(|e| anyhow::anyhow!( - "Invalid configuration in {}: {}", - metadata_path.display(), - e - ))?; + let metadata_str = &fs::read_to_string(&metadata_path).map_err(|e| { + anyhow::anyhow!("Error reading file path {}: {}", metadata_path.display(), e) + })?; + let metadata: TableMetadata = serde_json::from_str(&metadata_str).map_err(|e| { + anyhow::anyhow!( + "Invalid configuration in {}: {}", + metadata_path.display(), + e + ) + })?; let mut download = metadata != table_files.metadata; if !download { let mut file_paths: Vec = Vec::new(); for file in &table_files.files { let file_id = match file { - File::Parquet(ParquetFile { id, ..}) => id, - File::Delta(DeltaFile { id, .. }) => id + File::Parquet(ParquetFile { id, .. }) => id, + File::Delta(DeltaFile { id, .. }) => id, }; let file_path = &table_path.join(format!("{}.snappy.parquet", &file_id)); if !Path::exists(&file_path) { // File is missing, invalidate cache download = true; - fs::remove_dir_all(&table_path).map_err(|e| anyhow::anyhow!("Error invalidating cache: {e}"))?; + fs::remove_dir_all(&table_path) + .map_err(|e| anyhow::anyhow!("Error invalidating cache: {e}"))?; break; } file_paths.push(file_path.clone()); @@ -340,7 +398,11 @@ impl Client { Ok(None) } - pub async fn get_files(&mut self, table: &Table, request: Option) -> Result, anyhow::Error> { + pub async fn get_files( + &mut self, + table: &Table, + request: Option, + ) -> Result, anyhow::Error> { let key = table.fully_qualified_name(); let mut download = true; let table_path = Path::new(&self.data_root).join(table.fully_qualified_name()); @@ -372,10 +434,19 @@ impl Client { }, ); } - Ok(self.cache.get(&key).ok_or(anyhow::anyhow!("Error reading {key} from cache"))?.file_paths.clone()) + Ok(self + .cache + .get(&key) + .ok_or(anyhow::anyhow!("Error reading {key} from cache"))? + .file_paths + .clone()) } - pub async fn get_dataframe(&mut self, table: &Table, request: Option) -> Result { + pub async fn get_dataframe( + &mut self, + table: &Table, + request: Option, + ) -> Result { self.get_files(&table, request).await?; let table_path = Path::new(&self.data_root).join(table.fully_qualified_name()); load_parquet_files_as_dataframe(&table_path) diff --git a/src/protocol.rs b/src/protocol.rs index af07e8e..f01ed94 100644 --- a/src/protocol.rs +++ b/src/protocol.rs @@ -42,20 +42,20 @@ pub struct DeltaProtocol { pub min_reader_version: i32, pub min_writer_version: i32, pub reader_features: Vec, - pub writer_features: Vec + pub writer_features: Vec, } #[derive(Deserialize, Debug, Clone, PartialEq, Serialize)] #[serde(untagged)] pub enum Protocol { - Parquet { + Parquet { #[serde(rename = "minReaderVersion")] - min_reader_version: i32 + min_reader_version: i32, }, - Delta { + Delta { #[serde(rename = "deltaProtocol")] - delta_protocol: DeltaProtocol - } + delta_protocol: DeltaProtocol, + }, } #[derive(Deserialize, Debug, Clone, PartialEq, Serialize)] @@ -81,14 +81,14 @@ pub struct ParquetMetadata { pub struct DeltaMetadata { pub size: usize, pub num_files: usize, - pub delta_metadata: ParquetMetadata + pub delta_metadata: ParquetMetadata, } #[derive(Deserialize, Debug, Clone, PartialEq, Serialize)] #[serde(untagged)] pub enum Metadata { Parquet(ParquetMetadata), - Delta(DeltaMetadata) + Delta(DeltaMetadata), } #[derive(Deserialize, Debug, Clone, PartialEq, Serialize)] @@ -120,7 +120,11 @@ pub struct DeltaFile { impl DeltaFile { pub fn get_url(&self) -> Option { - if let Some(value) = self.delta_single_action.get("add").and_then(|add| add.get("path")) { + if let Some(value) = self + .delta_single_action + .get("add") + .and_then(|add| add.get("path")) + { return value.as_str().map(|v| v.to_string()); } else { return None; @@ -132,7 +136,7 @@ impl DeltaFile { #[serde(untagged)] pub enum File { Parquet(ParquetFile), - Delta(DeltaFile) + Delta(DeltaFile), } #[derive(Deserialize, Debug, Clone, PartialEq, Serialize)] @@ -145,4 +149,4 @@ pub struct FilesRequest { pub predicate_hints: Option>, pub limit_hint: Option, pub version: Option, -} \ No newline at end of file +} diff --git a/src/reader.rs b/src/reader.rs index 3434269..1b7ec90 100644 --- a/src/reader.rs +++ b/src/reader.rs @@ -1,7 +1,9 @@ use polars::prelude::*; use std::path::PathBuf; -pub fn load_parquet_files_as_dataframe(parquet_root_dir_path: &PathBuf) -> Result { +pub fn load_parquet_files_as_dataframe( + parquet_root_dir_path: &PathBuf, +) -> Result { let search_pattern = parquet_root_dir_path .join("*.parquet") .display() diff --git a/src/utils.rs b/src/utils.rs index c98dc55..a93f2f7 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -41,5 +41,3 @@ pub struct FileCache { pub table_files: TableFiles, pub file_paths: Vec, } - - diff --git a/tests/client.rs b/tests/client.rs index b1891fb..0ca5dcf 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -114,11 +114,20 @@ async fn get_table_metadata() { match meta.protocol { Protocol::Delta { .. } => assert!(false, "Wrong protocol deserialization"), - Protocol::Parquet { min_reader_version } => assert_eq!(min_reader_version, 1, "Protocol mismatch") + Protocol::Parquet { min_reader_version } => { + assert_eq!(min_reader_version, 1, "Protocol mismatch") + } }; match meta.metadata { Metadata::Delta { .. } => assert!(false, "Wrong metadata deserialization"), - Metadata::Parquet ( ParquetMetadata { id, format, name, partition_columns, configuration, .. }) => { + Metadata::Parquet(ParquetMetadata { + id, + format, + name, + partition_columns, + configuration, + .. + }) => { assert_eq!( id, "cf9c9342-b773-4c7b-a217-037d02ffe5d8", "Metadata ID mismatch" @@ -127,15 +136,8 @@ async fn get_table_metadata() { format.provider, "parquet", "Metadata format provider mismatch" ); - assert_eq!( - name, None, - "Metadata name value should be missing" - ); - assert_eq!( - partition_columns.len(), - 0, - "There should be no partitions" - ); + assert_eq!(name, None, "Metadata name value should be missing"); + assert_eq!(partition_columns.len(), 0, "There should be no partitions"); assert_eq!( configuration["conf_1_name"], "conf_1_value", "Configuration value expected" @@ -199,16 +201,12 @@ async fn list_all_table_files() { table.share, table.schema, table.name ); let app = create_mocked_test_app(body, &url, method("POST")).await; - let files = app - .client - .list_table_files(&table, None) - .await - .unwrap(); + let files = app.client.list_table_files(&table, None).await.unwrap(); assert_eq!(files.files.len(), 2, "File count mismatch"); match &files.files[1] { - File::Parquet(ParquetFile { id, ..}) => assert_eq!(id, "2", "File id mismatch"), - File::Delta(DeltaFile { .. }) => assert!(false, "Wrong file deserialization") + File::Parquet(ParquetFile { id, .. }) => assert_eq!(id, "2", "File id mismatch"), + File::Delta(DeltaFile { .. }) => assert!(false, "Wrong file deserialization"), }; } @@ -341,11 +339,21 @@ async fn get_dataframe() { .unwrap() .to_string(); - let df = c.get_dataframe(&table, None).await.unwrap().collect().unwrap(); + let df = c + .get_dataframe(&table, None) + .await + .unwrap() + .collect() + .unwrap(); assert_eq!(df.shape(), (5, 3), "Dataframe shape mismatch"); // Get the data again, this time it should be served from the local cache (enforced by Expections set on Mocks) - let df1 = c.get_dataframe(&table, None).await.unwrap().collect().unwrap(); + let df1 = c + .get_dataframe(&table, None) + .await + .unwrap() + .collect() + .unwrap(); assert_eq!(df1.shape(), (5, 3), "Dataframe shape mismatch"); assert_eq!( df1.get_row(0).unwrap().0[1],