Skip to content

Commit

Permalink
finalize implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
adalpane committed Aug 2, 2024
1 parent 0105d08 commit 1514c25
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 8 deletions.
9 changes: 5 additions & 4 deletions src/blocking/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,19 +260,20 @@ impl Client {
}
fs::create_dir_all(&table_path).map_err(|e| anyhow::anyhow!("Error creating table path: {e}"))?;
let mut file_paths: Vec<PathBuf> = Vec::new();
for file in table_files.files.clone() {
let count = table_files.files.len();
for (index, file) in table_files.files.clone().into_iter().enumerate() {
match file {
File::Parquet(ParquetFile { id, url, .. }) => {
let dst_path = &table_path.join(format!("{}.snappy.parquet", &id));
let bytes = self.download(url, &dst_path)?;
debug!("Downloaded {} ({} bytes)", dst_path.display(), bytes);
debug!("Downloaded {}/{} {} ({} bytes)", index+1, count, dst_path.display(), bytes);
file_paths.push(dst_path.clone());
},
File::Delta( delta_file) => {
if let Some(url) = delta_file.get_url() {
let dst_path = &table_path.join(format!("{}.snappy.parquet", &delta_file.id));
let bytes = self.download(url, &dst_path)?;
debug!("Downloaded {} ({} bytes)", dst_path.display(), bytes);
debug!("Downloaded {}/{} {} ({} bytes)", index+1, count, dst_path.display(), bytes);
file_paths.push(dst_path.clone());
}
},
Expand Down Expand Up @@ -358,4 +359,4 @@ impl Client {
load_parquet_files_as_dataframe(&table_path)
.map_err(|e| anyhow::anyhow!("Error loading parquet files: {e}"))
}
}
}
7 changes: 4 additions & 3 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,19 +278,20 @@ impl Client {
}
fs::create_dir_all(&table_path).map_err(|e| anyhow::anyhow!("Error creating table path: {e}"))?;
let mut file_paths: Vec<PathBuf> = Vec::new();
for file in table_files.files.clone() {
let count = table_files.files.len();
for (index, file) in table_files.files.clone().into_iter().enumerate() {
match file {
File::Parquet(ParquetFile { id, url, .. }) => {
let dst_path = &table_path.join(format!("{}.snappy.parquet", &id));
let bytes = self.download(url, &dst_path).await?;
debug!("Downloaded {} ({} bytes)", dst_path.display(), bytes);
debug!("Downloaded {}/{} {} ({} bytes)", index+1, count, dst_path.display(), bytes);
file_paths.push(dst_path.clone());
},
File::Delta( delta_file) => {
if let Some(url) = delta_file.get_url() {
let dst_path = &table_path.join(format!("{}.snappy.parquet", &delta_file.id));
let bytes = self.download(url, &dst_path).await?;
debug!("Downloaded {} ({} bytes)", dst_path.display(), bytes);
debug!("Downloaded {}/{} {} ({} bytes)", index+1, count, dst_path.display(), bytes);
file_paths.push(dst_path.clone())
}
},
Expand Down
2 changes: 1 addition & 1 deletion src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ pub struct DeltaFile {

impl DeltaFile {
pub fn get_url(&self) -> Option<String> {
if let Some(value) = self.delta_single_action.get("path") {
if let Some(value) = self.delta_single_action.get("add").and_then(|add| add.get("path")) {
return value.as_str().map(|v| v.to_string());
} else {
return None;
Expand Down

0 comments on commit 1514c25

Please sign in to comment.